Skip to content

Subscription fixes#634

Merged
lucksus merged 8 commits intodevfrom
subscription-fixes
Nov 27, 2025
Merged

Subscription fixes#634
lucksus merged 8 commits intodevfrom
subscription-fixes

Conversation

@jhweir
Copy link
Contributor

@jhweir jhweir commented Nov 25, 2025

Problem

Perspective query subscriptions were losing messages (especially when sent in quick succession) and sometimes even timing out after 30 seconds without receiving any of the 7 init messages. As subscriptions are now used frequently in Flux this meant portions of the UI would often not load or take up to 30 seconds to appear.

Root Cause

The pubsub system was using Tokio watch channels, which are designed for state synchronization, not message delivery. Watch channels only store the most recent value so when multiple messages are published before receivers can consume them, earlier messages are overwritten and lost. Additionally we were using WatchStream::from_changes() which only updates subscribers when the content of the new message is different from the previous message, resulting in multiple messages with the same data (like the 7 init messages) being de-duplicated.

Example of the data being overwritten:

t=100ms:  Publish 'message 1' → watch channel value = 'message 1'
t=500ms:  Publish 'message 2' → watch channel value = 'message 2' (message 1 LOST!)
t=501ms:  Receiver reads → only gets 'message 2'

Example of the data being de-duplicated:

t=100ms:  Publish message 'a' → watch channel value = a (subscribers notified of change and consume the message)
t=500ms:  Publish message 'a' → watch channel value = a (content identical so subscribers not notified of new message)
t=1000ms:  Publish message 'a' → watch channel value = a (content identical so subscribers not notified of new message)

An attempt to fix this by switching from WatchStream::from_changes() to WatchStream::new().skip(1) solved the de-duplication but didn't fix the root problem because both are built on watch channels with the same fundamental coalescing behavior.

Solution

Replaced watch channels with broadcast channels throughout the pubsub system.

Key Changes in pubsub.rs:

  • Changed subscribers from HashMap<Topic, watch::Sender> to HashMap<Topic, broadcast::Sender>
  • Added 100-message buffer capacity: broadcast::channel(100)
  • Switched from WatchStream to BroadcastStream
  • Removed .skip(1) workaround (no longer needed)
  • Removed remove_dead_subscribers() (broadcast channels handle cleanup automatically)

Why Broadcast Channels:

  • Message buffering: Maintains a queue of up to 100 messages
  • No coalescing: All messages are preserved until consumed
  • Multiple receivers: Each subscriber gets their own receiver with access to the full buffer
  • Proper semantics: Broadcast channels are the correct Tokio primitive for pub/sub patterns

Additional updates

  • The first result from #client.subscribeQuery in the subscribe() function in perspectiveProxy.ts is now returned immediately (~ 5ms instead of 100-500ms) to improve load times. We no longer wait for the first responses in the subscription.
  • We now log instead of throwing the error when no results come back from a subscription within 30 seconds so it resubscribes successfully. Previously this blocked resubscribing from working.

Impact

  • Reliability: Query subscriptions now deliver 100% of messages consistently
  • Performance: Initial results appear at least 100ms faster (direct GraphQL response vs subscription channel)
  • User Experience: Eliminated 30-second timeouts and blank UI states in Flux
  • Code Quality: Using the correct async primitive for the pub/sub use case

Summary by CodeRabbit

  • Bug Fixes

    • More reliable subscription initialization: robust retry on timeouts, single-resolution init promise, suppression of duplicate init notifications, immediate application of initial results when present, and clearer warnings/errors on setup failures and keepalive errors.
  • Chores

    • Simplified message distribution and streaming backend by switching to broadcast-style channels, improving streaming reliability and performance.

✏️ Tip: You can customize this high-level summary in your review settings.


Note

Replace watch-based pubsub with broadcast channels and make query subscription init/keepalive more robust with immediate initial results, retries, and safer timers.

  • Backend (Rust rust-executor/src/pubsub.rs):
    • Replace watch with broadcast channels: HashMap<Topic, broadcast::Sender<Message>>, per-subscriber receivers, 100-message buffer.
    • Use BroadcastStream and handle lag/errors; simplify publish(); remove dead-subscriber cleanup.
    • Trim logging to errors only.
  • Frontend (TS core/src/perspectives/PerspectiveProxy.ts):
    • Query subscription init reworked: single stored promise resolvers, cleanup of prior attempts/timeouts/keepalives.
    • Process initial result from subscribeQuery() immediately; resolve on first update; skip duplicate init messages.
    • Init timeout now triggers auto-resubscribe instead of rejection; improved error handling.
    • Keepalive loop: guarded resubscribe with try/catch to avoid runaway timers.

Written by Cursor Bugbot for commit 751f08a. This will update automatically on new commits. Configure here.

…criptions so resubscribing works & doesn't get blocked by the error
…unction before subscriptions established for optimal UX (result var naming also updated for clarity)
… and message buffer to avoid lost messages in subscriptions (automatically handles dead subscriber removal)
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 25, 2025

Walkthrough

PerspectiveProxy: added private init-control fields and moved init-promise creation to constructor; subscribe() now clears prior state, applies any immediate result, starts a 30s init timeout with automatic retry, resolves init once on first update and suppresses duplicate init notifications. Rust PubSub: switched per-topic storage to a single broadcast::Sender (buffer=100), updated subscribe/publish and stream processing to broadcast semantics.

Changes

Cohort / File(s) Summary
TypeScript Proxy Subscription
core/src/perspectives/PerspectiveProxy.ts
Adds private #initResolve/#initReject, moves #initialized creation into constructor, rewrites subscribe() to clear previous subscription/timers, apply initial result if present, set a 30s init timeout that logs and retries by calling subscribe(), subscribe to updates and resolve init promise once on first meaningful update, suppress duplicate init notifications, and add guarded error/keepalive handling.
Rust Pub/Sub Channel Migration
rust-executor/src/pubsub.rs
Replaces per-topic Vec<watch::Sender<Message>> with broadcast::Sender<Message> per topic; adds PubSub::new(), changes subscribe() to return broadcast::Receiver<Message> and create broadcast channels (buffer=100) on first subscribe, simplifies publish() via broadcast::send(), removes dead-subscriber cleanup, and adapts stream processing from WatchStream to BroadcastStream with handling for lagged and deserialization errors.

Sequence Diagram(s)

sequenceDiagram
    participant Sub as Subscriber
    participant Proxy as PerspectiveProxy
    participant Handler as UpdateHandler

    rect rgba(220,235,255,0.6)
    Note over Proxy: subscribe() startup
    Sub->>Proxy: subscribe()
    Proxy->>Proxy: clear prior subscription, timers
    Proxy->>Proxy: create `#initialized` promise via constructor fields
    Proxy->>Proxy: call subscribeQuery() -> maybe initial result
    end

    alt initial result present
        Proxy->>Proxy: set `#latestResult`, notify callbacks (non-init)
    else no immediate result
        Proxy->>Proxy: start 30s init timeout (retry on expiry)
    end

    Proxy->>Handler: subscribe to updates stream
    Handler->>Proxy: first update (init/regular)
    Proxy->>Proxy: clear init timeout, resolve `#initialized` once
    alt duplicate init (latestResult exists)
        Proxy->>Proxy: skip notifying callbacks
    else
        Proxy->>Sub: notify callbacks with update
    end

    alt timeout (no update)
        Proxy->>Proxy: log error, retry subscribe()
    end
Loading
sequenceDiagram
    participant Pub as Publisher
    participant PubSub as PubSub
    participant Sub as Subscriber

    rect rgba(235,245,230,0.6)
    Note over PubSub: per-topic broadcast model
    Sub->>PubSub: subscribe(topic)
    PubSub->>PubSub: create/get broadcast::Sender (buffer=100)
    PubSub-->>Sub: broadcast::Receiver
    end

    Pub->>PubSub: publish(topic, message)
    PubSub->>PubSub: broadcast::Sender::send(message)
    PubSub->>Sub: receivers get messages (lagged messages may be skipped)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Inspect rust-executor/src/pubsub.rs for correct broadcast semantics: channel creation, buffer sizing, handling of lagged messages, error paths from broadcast::Receiver, and deserialization/error mapping in the stream.
  • Review core/src/perspectives/PerspectiveProxy.ts init lifecycle: #initResolve/#initReject usage, #initialized promise behavior, retry recursion (ensure no uncontrolled infinite loop), timeout/keepalive clearing, and correct suppression of duplicate init notifications.

Possibly related PRs

Poem

🐰 I twitched my whiskers at the queue,
A hundred buffers hummed anew,
The Proxy waits, then greets the light—
If silence reigns, it tries the night.
Hop, retry, and forward true.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title check ❓ Inconclusive The title 'Subscription fixes' is vague and generic. While it refers to actual changes in the PR (fixing subscription issues in both TypeScript and Rust), it does not convey meaningful details about what subscriptions are being fixed or the nature of the improvements. Consider a more specific title that describes the primary fix, such as 'Replace watch channels with broadcast channels in pubsub' or 'Fix message loss in perspective subscriptions by using broadcast channels'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch subscription-fixes

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
core/src/perspectives/PerspectiveProxy.ts (1)

162-170: Stale docstring: promise no longer rejects on timeout.

Line 166 states the promise will reject after 30 seconds, but the current implementation logs an error and resubscribes instead. Update the documentation to reflect the new retry behavior.

      * The promise will reject if no result is received within 30 seconds.
+     * If no result is received within 30 seconds, the subscription will
+     * automatically attempt to resubscribe.
🧹 Nitpick comments (1)
rust-executor/src/pubsub.rs (1)

59-67: Consider returning an error instead of panicking on unexpected filter state.

The .expect() on line 62 will panic if get_filter() returns None when a filter is expected. While this indicates a programming error, panicking in a stream context may be harder to debug than returning a FieldError.

                     if let Some(filter) = &filter {
-                        if &data
-                            .get_filter()
-                            .expect("Could not get filter on T where we expected to filter")
-                            != filter
-                        {
+                        let data_filter = match data.get_filter() {
+                            Some(f) => f,
+                            None => {
+                                error!("get_filter() returned None when filter was expected");
+                                return futures::future::ready(None);
+                            }
+                        };
+                        if &data_filter != filter {
                             return futures::future::ready(None);
                         }
                     }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c7dad90 and aa1704e.

📒 Files selected for processing (2)
  • core/src/perspectives/PerspectiveProxy.ts (1 hunks)
  • rust-executor/src/pubsub.rs (3 hunks)
🔇 Additional comments (5)
core/src/perspectives/PerspectiveProxy.ts (2)

84-94: LGTM - Immediate initial result processing improves UX.

Processing and dispatching the initial result immediately (before waiting for subscription confirmation) aligns with the PR objectives for faster initial load times.


114-121: Good guard against duplicate initial results.

The early return when updateResult.isInit && this.#latestResult correctly prevents callbacks from receiving stale initialization data when a more recent result was already processed immediately in lines 89-91.

rust-executor/src/pubsub.rs (3)

29-35: LGTM - Broadcast channel with 100-message buffer.

The implementation correctly creates a new broadcast channel on first subscription and returns receivers via sender.subscribe(). The 100-message buffer provides adequate headroom for bursts as described in the PR objectives.


37-42: LGTM - Standard pub/sub semantics.

Ignoring send errors when no receivers exist is correct behavior for a pub/sub system.


84-88: Lagged message handling is acceptable given the buffer size.

Skipping lagged messages with an error log is reasonable. With the 100-message buffer, lag should only occur under extreme conditions. If this becomes an issue in production, consider monitoring the error logs for "Broadcast stream error" occurrences.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between aa1704e and 1e47856.

📒 Files selected for processing (1)
  • core/src/perspectives/PerspectiveProxy.ts (3 hunks)
🔇 Additional comments (3)
core/src/perspectives/PerspectiveProxy.ts (3)

83-88: Good fix: Promise created once and resolve/reject stored for reuse.

This addresses the previous concern about recursive resubscription replacing the promise. The promise is now created once in the constructor, and the resolve/reject functions are stored as instance variables, ensuring that retries can resolve the same promise without leaving callers hanging.


91-103: Good cleanup: Previous subscription and timeout properly disposed.

The cleanup logic (lines 93-96 and 99-102) properly disposes of previous subscription attempts before retrying, preventing resource leaks. This addresses the concern from the previous review about leaked subscriptions.


127-146: LGTM: Promise resolution and message handling logic is sound.

The implementation correctly:

  • Clears the timeout on first message (lines 129-132)
  • Resolves the promise once and prevents double-resolution (lines 135-139)
  • Skips duplicate init messages when a result already exists (line 142)
  • Processes and notifies callbacks appropriately (lines 144-145)

The design waits for the first subscription message rather than just the initial result, which ensures the subscription channel is actively working.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
core/src/perspectives/PerspectiveProxy.ts (1)

91-97: Verify backend subscription lifecycle on retries to avoid leaking server-side subscriptions

On retries, subscribe() currently:

  • Calls this.#unsubscribe() to stop the Apollo subscription client-side.
  • Immediately calls subscribeQuery() again and overwrites #subscriptionId with the new one.

The only place that calls disposeQuerySubscription() is dispose(), and it uses the current #subscriptionId. If subscribeQuery() allocates a new backend subscription ID each time, retries will leave the previous IDs undisposed on the Rust side until process restart.

Please verify how PerspectiveClient.subscribeQuery and disposeQuerySubscription are implemented:

  • If subscribeQuery is idempotent and reuses the same server-side subscription record, you’re fine.
  • If it creates a new record per call, consider disposing the old #subscriptionId before overwriting it, or teaching the backend to reuse/replace existing subscriptions for the same (uuid, query).

For example, on retry:

-        if (this.#unsubscribe) {
-            this.#unsubscribe();
-            this.#unsubscribe = undefined;
-        }
+        if (this.#unsubscribe) {
+            this.#unsubscribe();
+            this.#unsubscribe = undefined;
+        }
+        // Optionally also clean up the previous backend subscription before creating a new one
+        if (this.#subscriptionId) {
+            this.#client.disposeQuerySubscription(this.#uuid, this.#subscriptionId)
+                .catch(e => console.error('Error disposing previous query subscription during retry:', e));
+        }

Also applies to: 284-287

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1e47856 and 500cbd7.

📒 Files selected for processing (1)
  • core/src/perspectives/PerspectiveProxy.ts (3 hunks)
🔇 Additional comments (1)
core/src/perspectives/PerspectiveProxy.ts (1)

67-88: Init promise wiring across retries looks solid

Storing #initResolve/#initReject and creating #initialized once in the constructor cleanly fixes the previous “promise replaced on retry” issue. Clearing the resolvers on first resolve/reject matches the updated initialized JSDoc (resolve once on first message or reject on fatal setup error) and avoids leaking closures. No changes needed here.

Also applies to: 199-209

@lucksus lucksus merged commit 85cdb51 into dev Nov 27, 2025
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants