Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 90 additions & 32 deletions core/src/perspectives/PerspectiveProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ export class QuerySubscriptionProxy {
#latestResult: AllInstancesResult|null;
#disposed: boolean = false;
#initialized: Promise<boolean>;
#initResolve?: (value: boolean) => void;
#initReject?: (reason?: any) => void;
#initTimeoutId?: NodeJS.Timeout;
#query: string;

Expand All @@ -78,43 +80,91 @@ export class QuerySubscriptionProxy {
this.#client = client;
this.#callbacks = new Set();
this.#latestResult = null;

// Create the promise once and store its resolve/reject
this.#initialized = new Promise<boolean>((resolve, reject) => {
this.#initResolve = resolve;
this.#initReject = reject;
});
}

async subscribe() {
// initialize the query subscription
const result = await this.#client.subscribeQuery(this.#uuid, this.#query);
this.#subscriptionId = result.subscriptionId;
// Subscribe to query updates
this.#initialized = new Promise<boolean>((resolve, reject) => {
// Add timeout to prevent hanging promises
// Clean up previous subscription attempt if retrying
if (this.#unsubscribe) {
this.#unsubscribe();
this.#unsubscribe = undefined;
}

// Clear any existing timeout
if (this.#initTimeoutId) {
clearTimeout(this.#initTimeoutId);
this.#initTimeoutId = undefined;
}

// Clear any existing keepalive timer to prevent accumulation
if (this.#keepaliveTimer) {
clearTimeout(this.#keepaliveTimer);
this.#keepaliveTimer = undefined;
}

try {
// Initialize the query subscription
const initialResult = await this.#client.subscribeQuery(this.#uuid, this.#query);
this.#subscriptionId = initialResult.subscriptionId;

// Process the initial result immediately for fast UX
if (initialResult.result) {
this.#latestResult = initialResult.result;
this.#notifyCallbacks(initialResult.result);
} else {
console.warn('⚠️ No initial result returned from subscribeQuery!');
}

// Set up timeout for retry
this.#initTimeoutId = setTimeout(() => {
reject(new Error('Subscription initialization timed out after 30 seconds. Resubscribing...'));
this.subscribe();
}, 30000); // 30 seconds timeout
console.error('Subscription initialization timed out after 30 seconds. Resubscribing...');
// Recursively retry subscription, catching any errors
this.subscribe().catch(error => {
console.error('Error during subscription retry after timeout:', error);
});
}, 30000);

// Subscribe to query updates
this.#unsubscribe = this.#client.subscribeToQueryUpdates(
this.#subscriptionId,
(result) => {
(updateResult) => {
// Clear timeout on first message
if (this.#initTimeoutId) {
clearTimeout(this.#initTimeoutId);
this.#initTimeoutId = undefined;
}
resolve(true);

// if the result is one of those repeated initialization results
// and we got a result before, we don't notify the callbacks
// so they don't get confused (we could have gotten another
// more recent result in between)
if(result.isInit && this.#latestResult) {
return

// Resolve the initialization promise (only resolves once)
if (this.#initResolve) {
this.#initResolve(true);
this.#initResolve = undefined; // Prevent double-resolve
this.#initReject = undefined;
}

this.#latestResult = result;
this.#notifyCallbacks(result);
// Skip duplicate init messages
if (updateResult.isInit && this.#latestResult) return;

this.#latestResult = updateResult;
this.#notifyCallbacks(updateResult);
}
);
});
} catch (error) {
console.error('Error setting up subscription:', error);

// Reject the promise if this is the first attempt
if (this.#initReject) {
this.#initReject(error);
this.#initResolve = undefined;
this.#initReject = undefined;
}

throw error; // Re-throw so caller knows it failed
}

// Start keepalive loop using platform-agnostic setTimeout
const keepaliveLoop = async () => {
Expand All @@ -126,8 +176,14 @@ export class QuerySubscriptionProxy {
console.error('Error in keepalive:', e);
// try to reinitialize the subscription
console.log('Reinitializing subscription for query:', this.#query);
await this.subscribe();
console.log('Subscription reinitialized');
try {
await this.subscribe();
console.log('Subscription reinitialized');
} catch (resubscribeError) {
console.error('Error during resubscription from keepalive:', resubscribeError);
// Don't schedule another keepalive on resubscribe failure
return;
}
}

// Schedule next keepalive if not disposed
Expand All @@ -152,15 +208,17 @@ export class QuerySubscriptionProxy {
return this.#subscriptionId;
}

/** Promise that resolves when the subscription has received its first result
* through the subscription channel. This ensures the subscription is fully
* set up before allowing access to results or updates.
*
* The promise will reject if no result is received within 30 seconds.
*
* Note: You typically don't need to await this directly since the subscription
* creation methods (like subscribeInfer) already wait for initialization.
*/
/** Promise that resolves when the subscription has received its first result
* through the subscription channel. This ensures the subscription is fully
* set up before allowing access to results or updates.
*
* If no result is received within 30 seconds, the subscription will automatically
* retry. The promise will remain pending until a subscription message successfully
* arrives, or until a fatal error occurs during subscription setup.
*
* Note: You typically don't need to await this directly since the subscription
* creation methods (like subscribeInfer) already wait for initialization.
*/
get initialized(): Promise<boolean> {
return this.#initialized;
}
Expand Down
107 changes: 42 additions & 65 deletions rust-executor/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ use crate::graphql::graphql_types::GetValue;
use coasys_juniper::{graphql_value, FieldError, FieldResult};
use futures::Stream;
use futures::StreamExt;
use log::{error, warn};
use log::error;
use serde::de::DeserializeOwned;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::watch;
use tokio::sync::broadcast;
use tokio::sync::Mutex;
use tokio_stream::wrappers::WatchStream;
use tokio_stream::wrappers::BroadcastStream;

type Topic = String;
type Message = String;

pub struct PubSub {
subscribers: Mutex<HashMap<Topic, Vec<watch::Sender<Message>>>>,
subscribers: Mutex<HashMap<Topic, broadcast::Sender<Message>>>,
}

impl PubSub {
Expand All @@ -26,46 +26,18 @@ impl PubSub {
}
}

pub async fn subscribe(&self, topic: &Topic) -> watch::Receiver<Message> {
let (tx, rx) = watch::channel("".to_owned());
pub async fn subscribe(&self, topic: &Topic) -> broadcast::Receiver<Message> {
let mut subscribers = self.subscribers.lock().await;
subscribers
let sender = subscribers
.entry(topic.to_owned())
.or_insert_with(Vec::new)
.push(tx);
rx
}

pub async fn remove_dead_subscribers(&self) {
let mut subscribers = self.subscribers.lock().await;
for (_, subscribers_vec) in subscribers.iter_mut() {
let mut i = 0;
while i < subscribers_vec.len() {
if subscribers_vec[i].is_closed() {
warn!("Found closed subscriber, removing...");
subscribers_vec.remove(i);
} else {
i += 1;
}
}
}
.or_insert_with(|| broadcast::channel(100).0); // 100 message buffer
sender.subscribe()
}

pub async fn publish(&self, topic: &Topic, message: &Message) {
let mut subscribers = self.subscribers.lock().await;

if let Some(subscribers_vec) = subscribers.get_mut(topic) {
let mut i = 0;
while i < subscribers_vec.len() {
let send_res = subscribers_vec[i].send(message.to_owned());
if send_res.is_err() {
warn!("Failed to send message to subscriber: {:?} on topic: {:?}, with subscribers, len: {:?}", send_res, topic, subscribers_vec.len());
warn!("Removing subscriber from topic: {:?}", topic);
subscribers_vec.remove(i);
} else {
i += 1;
}
}
let subscribers = self.subscribers.lock().await;
if let Some(sender) = subscribers.get(topic) {
let _ = sender.send(message.clone()); // Ignore if no receivers
}
}
}
Expand All @@ -77,37 +49,42 @@ pub(crate) async fn subscribe_and_process<
topic: Topic,
filter: Option<String>,
) -> Pin<Box<dyn Stream<Item = FieldResult<T::Value>> + Send>> {
//debug!("Subscribing to topic: {}", topic);
pubsub.remove_dead_subscribers().await;
let receiver = pubsub.subscribe(&topic).await;
let receiver_stream = WatchStream::from_changes(receiver);
let receiver_stream = BroadcastStream::new(receiver);

let mapped_stream = receiver_stream.filter_map(move |msg| {
match serde_json::from_str::<T>(&msg) {
Ok(data) => {
if let Some(filter) = &filter {
if &data
.get_filter()
.expect("Could not get filter on T where we expected to filter")
!= filter
{
return futures::future::ready(None);
let mapped_stream = receiver_stream.filter_map(move |result| {
match result {
Ok(msg) => match serde_json::from_str::<T>(&msg) {
Ok(data) => {
if let Some(filter) = &filter {
if &data
.get_filter()
.expect("Could not get filter on T where we expected to filter")
!= filter
{
return futures::future::ready(None);
}
}
let value = data.get_value();
futures::future::ready(Some(Ok(value)))
}
let value = data.get_value(); // Get the underlying value using the GetValue trait
futures::future::ready(Some(Ok(value)))
}
Err(e) => {
let type_name = std::any::type_name::<T>();
error!("Failed to deserialize pubsub message: {:?}", e);
error!("Type: {}", type_name);
error!("Message: {:?}", msg);
Err(e) => {
let type_name = std::any::type_name::<T>();
error!("Failed to deserialize pubsub message: {:?}", e);
error!("Type: {}", type_name);
error!("Message: {:?}", msg);

let field_error = FieldError::new(
e,
graphql_value!({ "type": "INTERNAL_ERROR_COULD_NOT_SERIALIZE" }),
);
futures::future::ready(Some(Err(field_error)))
let field_error = FieldError::new(
e,
graphql_value!({ "type": "INTERNAL_ERROR_COULD_NOT_SERIALIZE" }),
);
futures::future::ready(Some(Err(field_error)))
}
},
Err(e) => {
error!("Broadcast stream error: {:?}", e);
// Skip lagged messages
futures::future::ready(None)
}
}
});
Expand Down