Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions core/src/model/Ad4mModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1028,18 +1028,30 @@ export class ModelQueryBuilder<T extends Ad4mModel> {

/**
* Subscribes to the query and receives updates when results change.
* Also returns initial results immediately.
*
* This method:
* 1. Creates and initializes a Prolog query subscription
* 2. Sets up the callback to process future updates
* 3. Returns the initial results immediately
*
* The subscription is guaranteed to be ready when this method returns,
* as it waits for the initialization process to complete.
*
* @param callback - Function to call with updated results
* @returns Initial results array
*
* @example
* ```typescript
* await Recipe.query(perspective)
* // Subscribe to cooking recipes
* const initialRecipes = await Recipe.query(perspective)
* .where({ status: "cooking" })
* .subscribe(recipes => {
* // This callback will be called with future updates
* console.log("Currently cooking:", recipes);
* });
*
* // initialRecipes contains the current state
* console.log("Initial recipes:", initialRecipes);
* ```
*/
async subscribe(callback: (results: T[]) => void): Promise<T[]> {
Expand Down Expand Up @@ -1081,16 +1093,29 @@ export class ModelQueryBuilder<T extends Ad4mModel> {
/**
* Subscribes to count updates for matching entities.
*
* This method:
* 1. Creates and initializes a Prolog query subscription for the count
* 2. Sets up the callback to process future count updates
* 3. Returns the initial count immediately
*
* The subscription is guaranteed to be ready when this method returns,
* as it waits for the initialization process to complete.
*
* @param callback - Function to call with updated count
* @returns Initial count
*
* @example
* ```typescript
* await Recipe.query(perspective)
* // Subscribe to active recipe count
* const initialCount = await Recipe.query(perspective)
* .where({ status: "active" })
* .countSubscribe(count => {
* // This callback will be called when the count changes
* console.log("Active items:", count);
* });
*
* // initialCount contains the current count
* console.log("Initial count:", initialCount);
* ```
*/
async countSubscribe(callback: (count: number) => void): Promise<number> {
Expand Down Expand Up @@ -1132,18 +1157,31 @@ export class ModelQueryBuilder<T extends Ad4mModel> {
/**
* Subscribes to paginated results updates.
*
* This method:
* 1. Creates and initializes a Prolog query subscription for the paginated results
* 2. Sets up the callback to process future page updates
* 3. Returns the initial page immediately
*
* The subscription is guaranteed to be ready when this method returns,
* as it waits for the initialization process to complete.
*
* @param pageSize - Number of items per page
* @param pageNumber - Which page to retrieve (1-based)
* @param callback - Function to call with updated pagination results
* @returns Initial pagination results
*
* @example
* ```typescript
* await Recipe.query(perspective)
* // Subscribe to first page of main recipes
* const initialPage = await Recipe.query(perspective)
* .where({ category: "Main" })
* .paginateSubscribe(10, 1, page => {
* // This callback will be called when the page content changes
* console.log("Updated page:", page.results);
* });
*
* // initialPage contains the current page state
* console.log("Initial page:", initialPage.results);
* ```
*/
async paginateSubscribe(
Expand Down
84 changes: 69 additions & 15 deletions core/src/perspectives/PerspectiveProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,23 @@ interface Unsubscribable {
* - Managing callbacks for result updates
* - Subscribing to query updates via GraphQL subscriptions
* - Maintaining the latest query result
* - Ensuring subscription is fully initialized before allowing access
*
* The subscription will remain active as long as keepalive signals are sent.
* Make sure to call dispose() when you're done with the subscription to clean up
* resources and stop keepalive signals.
*
* The subscription goes through an initialization process where it waits for the first
* result to come through the subscription channel. You can await the `initialized`
* promise to ensure the subscription is ready. The initialization will timeout after
* 30 seconds if no result is received.
*
* Example usage:
* ```typescript
* const subscription = await perspective.subscribeInfer("my_query(X)");
* console.log("Initial result:", subscription.result);
* // At this point the subscription is already initialized since subscribeInfer waits
*
* // Set up callback for updates
* // Set up callback for future updates
* const removeCallback = subscription.onResult(result => {
* console.log("New result:", result);
* });
Expand All @@ -56,6 +62,8 @@ export class QuerySubscriptionProxy {
#unsubscribe?: () => void;
#latestResult: AllInstancesResult;
#disposed: boolean = false;
#initialized: Promise<boolean>;
#initTimeoutId?: NodeJS.Timeout;

/** Creates a new query subscription
* @param uuid - The UUID of the perspective
Expand All @@ -70,17 +78,26 @@ export class QuerySubscriptionProxy {
this.#callbacks = new Set();
this.#latestResult = initialResult;

// Call all callbacks with initial result
this.#notifyCallbacks(initialResult);

// Subscribe to query updates
this.#unsubscribe = this.#client.subscribeToQueryUpdates(
this.#subscriptionId,
(result) => {
this.#latestResult = result;
this.#notifyCallbacks(result);
}
);
this.#initialized = new Promise<boolean>((resolve, reject) => {
// Add timeout to prevent hanging promises
this.#initTimeoutId = setTimeout(() => {
reject(new Error('Subscription initialization timed out after 30 seconds'));
}, 30000); // 30 seconds timeout

// Subscribe to query updates
this.#unsubscribe = this.#client.subscribeToQueryUpdates(
this.#subscriptionId,
(result) => {
if (this.#initTimeoutId) {
clearTimeout(this.#initTimeoutId);
this.#initTimeoutId = undefined;
}
resolve(true);
this.#latestResult = result;
this.#notifyCallbacks(result);
}
);
});

// Start keepalive loop using platform-agnostic setTimeout
const keepaliveLoop = async () => {
Expand All @@ -102,6 +119,19 @@ export class QuerySubscriptionProxy {
this.#keepaliveTimer = setTimeout(keepaliveLoop, 30000) as unknown as number;
}

/** Promise that resolves when the subscription has received its first result
* through the subscription channel. This ensures the subscription is fully
* set up before allowing access to results or updates.
*
* The promise will reject if no result is received within 30 seconds.
*
* Note: You typically don't need to await this directly since the subscription
* creation methods (like subscribeInfer) already wait for initialization.
*/
get initialized(): Promise<boolean> {
return this.#initialized;
}

/** Get the latest query result
*
* This returns the most recent result from the query, which could be either:
Expand Down Expand Up @@ -155,6 +185,7 @@ export class QuerySubscriptionProxy {
* 1. Stops the keepalive timer
* 2. Unsubscribes from GraphQL subscription updates
* 3. Clears all registered callbacks
* 4. Cleans up any pending initialization timeout
*
* After calling this method, the subscription is no longer active and
* will not receive any more updates. The instance should be discarded.
Expand All @@ -165,6 +196,10 @@ export class QuerySubscriptionProxy {
if (this.#unsubscribe) {
this.#unsubscribe();
}
if (this.#initTimeoutId) {
clearTimeout(this.#initTimeoutId);
this.#initTimeoutId = undefined;
}
this.#callbacks.clear();
}
}
Expand Down Expand Up @@ -1061,8 +1096,17 @@ export class PerspectiveProxy {
/**
* Creates a subscription for a Prolog query that updates in real-time.
*
* This method:
* 1. Creates the subscription on the Rust side
* 2. Sets up the subscription callback
* 3. Waits for the initial result to come through the subscription channel
* 4. Returns a fully initialized QuerySubscriptionProxy
*
* The returned subscription is guaranteed to be ready to receive updates,
* as this method waits for the initialization process to complete.
*
* @param query - Prolog query string
* @returns QuerySubscriptionProxy instance
* @returns Initialized QuerySubscriptionProxy instance
*
* @example
* ```typescript
Expand All @@ -1072,19 +1116,29 @@ export class PerspectiveProxy {
* property_getter("Todo", Todo, "state", "active")
* `);
*
* // Subscription is already initialized here
* console.log("Initial result:", subscription.result);
*
* // Set up callback for future updates
* subscription.onResult((todos) => {
* console.log("Active todos:", todos);
* });
* ```
*/
async subscribeInfer(query: string): Promise<QuerySubscriptionProxy> {
// Start the subscription on the Rust side first to get the real subscription ID
const result = await this.#client.subscribeQuery(this.uuid, query);
return new QuerySubscriptionProxy(
const subscriptionProxy = new QuerySubscriptionProxy(
this.uuid,
result.subscriptionId,
result.result,
this.#client
);

// Wait for the initial result
await subscriptionProxy.initialized;

return subscriptionProxy;
}

}
21 changes: 21 additions & 0 deletions rust-executor/src/perspectives/perspective_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2041,6 +2041,27 @@ impl PerspectiveInstance {
.lock()
.await
.insert(subscription_id.clone(), subscribed_query);

// Send initial result after a short delay
let uuid = self.persisted.lock().await.uuid.clone();
let subscription_id_clone = subscription_id.clone();
let result_string_clone = result_string.clone();
tokio::spawn(async move {
sleep(Duration::from_millis(100)).await;
let filter = PerspectiveQuerySubscriptionFilter {
uuid,
subscription_id: subscription_id_clone,
result: result_string_clone,
};
get_global_pubsub()
.await
.publish(
&PERSPECTIVE_QUERY_SUBSCRIPTION_TOPIC.to_string(),
&serde_json::to_string(&filter).unwrap(),
)
.await;
});

Ok((subscription_id, result_string))
}

Expand Down
4 changes: 2 additions & 2 deletions rust-executor/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::graphql::graphql_types::GetValue;
use coasys_juniper::{graphql_value, FieldError, FieldResult};
use futures::Stream;
use futures::StreamExt;
use log::{debug, error, warn};
use log::{error, warn};
use serde::de::DeserializeOwned;
use std::collections::HashMap;
use std::pin::Pin;
Expand Down Expand Up @@ -77,7 +77,7 @@ pub(crate) async fn subscribe_and_process<
topic: Topic,
filter: Option<String>,
) -> Pin<Box<dyn Stream<Item = FieldResult<T::Value>> + Send>> {
debug!("Subscribing to topic: {}", topic);
//debug!("Subscribing to topic: {}", topic);
pubsub.remove_dead_subscribers().await;
let receiver = pubsub.subscribe(&topic).await;
let receiver_stream = WatchStream::from_changes(receiver);
Expand Down
35 changes: 19 additions & 16 deletions tests/js/tests/prolog-and-literals.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1747,20 +1747,31 @@ describe("Prolog + Literals", () => {

expect(initialResult.results.length).to.equal(3);
expect(initialResult.totalCount).to.equal(10);
// Reset lastResult to verify we get an update
lastResult = null;

// Add a new recipe and verify subscription updates
const newRecipe = new Recipe(perspective!);
newRecipe.name = "Recipe 11";
await newRecipe.save();

let tries = 0;
while (!lastResult) {
await sleep(500);
tries++;
if (tries > 20) {
throw new Error("Timeout waiting for subscription to update");
}


// Wait for subscription update with a timeout
const maxTries = 50;
const sleepMs = 100;
const timeout = maxTries * sleepMs;

for (let i = 0; i < maxTries; i++) {
if (lastResult) break;
await sleep(sleepMs);
console.log("Waiting for subscription update - try:", i + 1);
}

if (!lastResult) {
throw new Error(`Subscription did not update after ${timeout}ms`);
}

expect(lastResult.totalCount).to.equal(11);

// Clean up
Expand Down Expand Up @@ -2117,10 +2128,8 @@ describe("Prolog + Literals", () => {
const recipe = new BatchRecipe(perspective!);
recipe.name = "Pasta";
recipe.ingredients = ["pasta", "sauce", "cheese"];
console.log("recipe: ", recipe)
console.log("first save")
await recipe.save(batchId);
console.log("saved")


const note = new BatchNote(perspective!);
note.title = "Recipe Notes";
Expand All @@ -2135,9 +2144,7 @@ describe("Prolog + Literals", () => {
expect(notesBeforeCommit.length).to.equal(0);

// Commit batch
console.log("committing batch")
const result = await perspective!.commitBatch(batchId);
console.log("done committed batch")
expect(result.additions.length).to.be.greaterThan(0);
expect(result.removals.length).to.equal(0);

Expand All @@ -2152,11 +2159,8 @@ describe("Prolog + Literals", () => {
expect(notesAfterCommit[0].title).to.equal("Recipe Notes");
expect(notesAfterCommit[0].content).to.equal("Make sure to use fresh ingredients");

console.log("creation batch done")

// Test updating models in batch
const updateBatchId = await perspective!.createBatch();
console.log("update batch created")
recipe.ingredients.push("garlic");
await recipe.update(updateBatchId);

Expand All @@ -2172,7 +2176,6 @@ describe("Prolog + Literals", () => {

// Commit update batch
const updateResult = await perspective!.commitBatch(updateBatchId);
console.log("update batch done")
expect(updateResult.additions.length).to.be.greaterThan(0);

// Verify models are updated
Expand Down