diff --git a/core/src/perspectives/PerspectiveProxy.ts b/core/src/perspectives/PerspectiveProxy.ts index 256ff86b9..2aadf4b9f 100644 --- a/core/src/perspectives/PerspectiveProxy.ts +++ b/core/src/perspectives/PerspectiveProxy.ts @@ -64,6 +64,8 @@ export class QuerySubscriptionProxy { #latestResult: AllInstancesResult|null; #disposed: boolean = false; #initialized: Promise; + #initResolve?: (value: boolean) => void; + #initReject?: (reason?: any) => void; #initTimeoutId?: NodeJS.Timeout; #query: string; @@ -78,43 +80,91 @@ export class QuerySubscriptionProxy { this.#client = client; this.#callbacks = new Set(); this.#latestResult = null; + + // Create the promise once and store its resolve/reject + this.#initialized = new Promise((resolve, reject) => { + this.#initResolve = resolve; + this.#initReject = reject; + }); } async subscribe() { - // initialize the query subscription - const result = await this.#client.subscribeQuery(this.#uuid, this.#query); - this.#subscriptionId = result.subscriptionId; - // Subscribe to query updates - this.#initialized = new Promise((resolve, reject) => { - // Add timeout to prevent hanging promises + // Clean up previous subscription attempt if retrying + if (this.#unsubscribe) { + this.#unsubscribe(); + this.#unsubscribe = undefined; + } + + // Clear any existing timeout + if (this.#initTimeoutId) { + clearTimeout(this.#initTimeoutId); + this.#initTimeoutId = undefined; + } + + // Clear any existing keepalive timer to prevent accumulation + if (this.#keepaliveTimer) { + clearTimeout(this.#keepaliveTimer); + this.#keepaliveTimer = undefined; + } + + try { + // Initialize the query subscription + const initialResult = await this.#client.subscribeQuery(this.#uuid, this.#query); + this.#subscriptionId = initialResult.subscriptionId; + + // Process the initial result immediately for fast UX + if (initialResult.result) { + this.#latestResult = initialResult.result; + this.#notifyCallbacks(initialResult.result); + } else { + console.warn('⚠️ No initial result returned from subscribeQuery!'); + } + + // Set up timeout for retry this.#initTimeoutId = setTimeout(() => { - reject(new Error('Subscription initialization timed out after 30 seconds. Resubscribing...')); - this.subscribe(); - }, 30000); // 30 seconds timeout + console.error('Subscription initialization timed out after 30 seconds. Resubscribing...'); + // Recursively retry subscription, catching any errors + this.subscribe().catch(error => { + console.error('Error during subscription retry after timeout:', error); + }); + }, 30000); // Subscribe to query updates this.#unsubscribe = this.#client.subscribeToQueryUpdates( this.#subscriptionId, - (result) => { + (updateResult) => { + // Clear timeout on first message if (this.#initTimeoutId) { clearTimeout(this.#initTimeoutId); this.#initTimeoutId = undefined; } - resolve(true); - - // if the result is one of those repeated initialization results - // and we got a result before, we don't notify the callbacks - // so they don't get confused (we could have gotten another - // more recent result in between) - if(result.isInit && this.#latestResult) { - return + + // Resolve the initialization promise (only resolves once) + if (this.#initResolve) { + this.#initResolve(true); + this.#initResolve = undefined; // Prevent double-resolve + this.#initReject = undefined; } - this.#latestResult = result; - this.#notifyCallbacks(result); + // Skip duplicate init messages + if (updateResult.isInit && this.#latestResult) return; + + this.#latestResult = updateResult; + this.#notifyCallbacks(updateResult); } ); - }); + } catch (error) { + console.error('Error setting up subscription:', error); + + // Reject the promise if this is the first attempt + if (this.#initReject) { + this.#initReject(error); + this.#initResolve = undefined; + this.#initReject = undefined; + } + + throw error; // Re-throw so caller knows it failed + } // Start keepalive loop using platform-agnostic setTimeout const keepaliveLoop = async () => { @@ -126,8 +176,14 @@ export class QuerySubscriptionProxy { console.error('Error in keepalive:', e); // try to reinitialize the subscription console.log('Reinitializing subscription for query:', this.#query); - await this.subscribe(); - console.log('Subscription reinitialized'); + try { + await this.subscribe(); + console.log('Subscription reinitialized'); + } catch (resubscribeError) { + console.error('Error during resubscription from keepalive:', resubscribeError); + // Don't schedule another keepalive on resubscribe failure + return; + } } // Schedule next keepalive if not disposed @@ -152,15 +208,17 @@ export class QuerySubscriptionProxy { return this.#subscriptionId; } - /** Promise that resolves when the subscription has received its first result - * through the subscription channel. This ensures the subscription is fully - * set up before allowing access to results or updates. - * - * The promise will reject if no result is received within 30 seconds. - * - * Note: You typically don't need to await this directly since the subscription - * creation methods (like subscribeInfer) already wait for initialization. - */ +/** Promise that resolves when the subscription has received its first result + * through the subscription channel. This ensures the subscription is fully + * set up before allowing access to results or updates. + * + * If no result is received within 30 seconds, the subscription will automatically + * retry. The promise will remain pending until a subscription message successfully + * arrives, or until a fatal error occurs during subscription setup. + * + * Note: You typically don't need to await this directly since the subscription + * creation methods (like subscribeInfer) already wait for initialization. + */ get initialized(): Promise { return this.#initialized; } diff --git a/rust-executor/src/pubsub.rs b/rust-executor/src/pubsub.rs index 638fe973e..2497c2321 100644 --- a/rust-executor/src/pubsub.rs +++ b/rust-executor/src/pubsub.rs @@ -3,20 +3,20 @@ use crate::graphql::graphql_types::GetValue; use coasys_juniper::{graphql_value, FieldError, FieldResult}; use futures::Stream; use futures::StreamExt; -use log::{error, warn}; +use log::error; use serde::de::DeserializeOwned; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; -use tokio::sync::watch; +use tokio::sync::broadcast; use tokio::sync::Mutex; -use tokio_stream::wrappers::WatchStream; +use tokio_stream::wrappers::BroadcastStream; type Topic = String; type Message = String; pub struct PubSub { - subscribers: Mutex>>>, + subscribers: Mutex>>, } impl PubSub { @@ -26,46 +26,18 @@ impl PubSub { } } - pub async fn subscribe(&self, topic: &Topic) -> watch::Receiver { - let (tx, rx) = watch::channel("".to_owned()); + pub async fn subscribe(&self, topic: &Topic) -> broadcast::Receiver { let mut subscribers = self.subscribers.lock().await; - subscribers + let sender = subscribers .entry(topic.to_owned()) - .or_insert_with(Vec::new) - .push(tx); - rx - } - - pub async fn remove_dead_subscribers(&self) { - let mut subscribers = self.subscribers.lock().await; - for (_, subscribers_vec) in subscribers.iter_mut() { - let mut i = 0; - while i < subscribers_vec.len() { - if subscribers_vec[i].is_closed() { - warn!("Found closed subscriber, removing..."); - subscribers_vec.remove(i); - } else { - i += 1; - } - } - } + .or_insert_with(|| broadcast::channel(100).0); // 100 message buffer + sender.subscribe() } pub async fn publish(&self, topic: &Topic, message: &Message) { - let mut subscribers = self.subscribers.lock().await; - - if let Some(subscribers_vec) = subscribers.get_mut(topic) { - let mut i = 0; - while i < subscribers_vec.len() { - let send_res = subscribers_vec[i].send(message.to_owned()); - if send_res.is_err() { - warn!("Failed to send message to subscriber: {:?} on topic: {:?}, with subscribers, len: {:?}", send_res, topic, subscribers_vec.len()); - warn!("Removing subscriber from topic: {:?}", topic); - subscribers_vec.remove(i); - } else { - i += 1; - } - } + let subscribers = self.subscribers.lock().await; + if let Some(sender) = subscribers.get(topic) { + let _ = sender.send(message.clone()); // Ignore if no receivers } } } @@ -77,37 +49,42 @@ pub(crate) async fn subscribe_and_process< topic: Topic, filter: Option, ) -> Pin> + Send>> { - //debug!("Subscribing to topic: {}", topic); - pubsub.remove_dead_subscribers().await; let receiver = pubsub.subscribe(&topic).await; - let receiver_stream = WatchStream::from_changes(receiver); + let receiver_stream = BroadcastStream::new(receiver); - let mapped_stream = receiver_stream.filter_map(move |msg| { - match serde_json::from_str::(&msg) { - Ok(data) => { - if let Some(filter) = &filter { - if &data - .get_filter() - .expect("Could not get filter on T where we expected to filter") - != filter - { - return futures::future::ready(None); + let mapped_stream = receiver_stream.filter_map(move |result| { + match result { + Ok(msg) => match serde_json::from_str::(&msg) { + Ok(data) => { + if let Some(filter) = &filter { + if &data + .get_filter() + .expect("Could not get filter on T where we expected to filter") + != filter + { + return futures::future::ready(None); + } } + let value = data.get_value(); + futures::future::ready(Some(Ok(value))) } - let value = data.get_value(); // Get the underlying value using the GetValue trait - futures::future::ready(Some(Ok(value))) - } - Err(e) => { - let type_name = std::any::type_name::(); - error!("Failed to deserialize pubsub message: {:?}", e); - error!("Type: {}", type_name); - error!("Message: {:?}", msg); + Err(e) => { + let type_name = std::any::type_name::(); + error!("Failed to deserialize pubsub message: {:?}", e); + error!("Type: {}", type_name); + error!("Message: {:?}", msg); - let field_error = FieldError::new( - e, - graphql_value!({ "type": "INTERNAL_ERROR_COULD_NOT_SERIALIZE" }), - ); - futures::future::ready(Some(Err(field_error))) + let field_error = FieldError::new( + e, + graphql_value!({ "type": "INTERNAL_ERROR_COULD_NOT_SERIALIZE" }), + ); + futures::future::ready(Some(Err(field_error))) + } + }, + Err(e) => { + error!("Broadcast stream error: {:?}", e); + // Skip lagged messages + futures::future::ready(None) } } });