Conversation
…criptions so resubscribing works & doesn't get blocked by the error
…unction before subscriptions established for optimal UX (result var naming also updated for clarity)
… and message buffer to avoid lost messages in subscriptions (automatically handles dead subscriber removal)
WalkthroughPerspectiveProxy: added private init-control fields and moved init-promise creation to constructor; subscribe() now clears prior state, applies any immediate result, starts a 30s init timeout with automatic retry, resolves init once on first update and suppresses duplicate init notifications. Rust PubSub: switched per-topic storage to a single Changes
Sequence Diagram(s)sequenceDiagram
participant Sub as Subscriber
participant Proxy as PerspectiveProxy
participant Handler as UpdateHandler
rect rgba(220,235,255,0.6)
Note over Proxy: subscribe() startup
Sub->>Proxy: subscribe()
Proxy->>Proxy: clear prior subscription, timers
Proxy->>Proxy: create `#initialized` promise via constructor fields
Proxy->>Proxy: call subscribeQuery() -> maybe initial result
end
alt initial result present
Proxy->>Proxy: set `#latestResult`, notify callbacks (non-init)
else no immediate result
Proxy->>Proxy: start 30s init timeout (retry on expiry)
end
Proxy->>Handler: subscribe to updates stream
Handler->>Proxy: first update (init/regular)
Proxy->>Proxy: clear init timeout, resolve `#initialized` once
alt duplicate init (latestResult exists)
Proxy->>Proxy: skip notifying callbacks
else
Proxy->>Sub: notify callbacks with update
end
alt timeout (no update)
Proxy->>Proxy: log error, retry subscribe()
end
sequenceDiagram
participant Pub as Publisher
participant PubSub as PubSub
participant Sub as Subscriber
rect rgba(235,245,230,0.6)
Note over PubSub: per-topic broadcast model
Sub->>PubSub: subscribe(topic)
PubSub->>PubSub: create/get broadcast::Sender (buffer=100)
PubSub-->>Sub: broadcast::Receiver
end
Pub->>PubSub: publish(topic, message)
PubSub->>PubSub: broadcast::Sender::send(message)
PubSub->>Sub: receivers get messages (lagged messages may be skipped)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
core/src/perspectives/PerspectiveProxy.ts (1)
162-170: Stale docstring: promise no longer rejects on timeout.Line 166 states the promise will reject after 30 seconds, but the current implementation logs an error and resubscribes instead. Update the documentation to reflect the new retry behavior.
* The promise will reject if no result is received within 30 seconds. + * If no result is received within 30 seconds, the subscription will + * automatically attempt to resubscribe.
🧹 Nitpick comments (1)
rust-executor/src/pubsub.rs (1)
59-67: Consider returning an error instead of panicking on unexpected filter state.The
.expect()on line 62 will panic ifget_filter()returnsNonewhen a filter is expected. While this indicates a programming error, panicking in a stream context may be harder to debug than returning aFieldError.if let Some(filter) = &filter { - if &data - .get_filter() - .expect("Could not get filter on T where we expected to filter") - != filter - { + let data_filter = match data.get_filter() { + Some(f) => f, + None => { + error!("get_filter() returned None when filter was expected"); + return futures::future::ready(None); + } + }; + if &data_filter != filter { return futures::future::ready(None); } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
core/src/perspectives/PerspectiveProxy.ts(1 hunks)rust-executor/src/pubsub.rs(3 hunks)
🔇 Additional comments (5)
core/src/perspectives/PerspectiveProxy.ts (2)
84-94: LGTM - Immediate initial result processing improves UX.Processing and dispatching the initial result immediately (before waiting for subscription confirmation) aligns with the PR objectives for faster initial load times.
114-121: Good guard against duplicate initial results.The early return when
updateResult.isInit && this.#latestResultcorrectly prevents callbacks from receiving stale initialization data when a more recent result was already processed immediately in lines 89-91.rust-executor/src/pubsub.rs (3)
29-35: LGTM - Broadcast channel with 100-message buffer.The implementation correctly creates a new broadcast channel on first subscription and returns receivers via
sender.subscribe(). The 100-message buffer provides adequate headroom for bursts as described in the PR objectives.
37-42: LGTM - Standard pub/sub semantics.Ignoring send errors when no receivers exist is correct behavior for a pub/sub system.
84-88: Lagged message handling is acceptable given the buffer size.Skipping lagged messages with an error log is reasonable. With the 100-message buffer, lag should only occur under extreme conditions. If this becomes an issue in production, consider monitoring the error logs for
"Broadcast stream error"occurrences.
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
core/src/perspectives/PerspectiveProxy.ts(3 hunks)
🔇 Additional comments (3)
core/src/perspectives/PerspectiveProxy.ts (3)
83-88: Good fix: Promise created once and resolve/reject stored for reuse.This addresses the previous concern about recursive resubscription replacing the promise. The promise is now created once in the constructor, and the resolve/reject functions are stored as instance variables, ensuring that retries can resolve the same promise without leaving callers hanging.
91-103: Good cleanup: Previous subscription and timeout properly disposed.The cleanup logic (lines 93-96 and 99-102) properly disposes of previous subscription attempts before retrying, preventing resource leaks. This addresses the concern from the previous review about leaked subscriptions.
127-146: LGTM: Promise resolution and message handling logic is sound.The implementation correctly:
- Clears the timeout on first message (lines 129-132)
- Resolves the promise once and prevents double-resolution (lines 135-139)
- Skips duplicate init messages when a result already exists (line 142)
- Processes and notifies callbacks appropriately (lines 144-145)
The design waits for the first subscription message rather than just the initial result, which ensures the subscription channel is actively working.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
core/src/perspectives/PerspectiveProxy.ts (1)
91-97: Verify backend subscription lifecycle on retries to avoid leaking server-side subscriptionsOn retries,
subscribe()currently:
- Calls
this.#unsubscribe()to stop the Apollo subscription client-side.- Immediately calls
subscribeQuery()again and overwrites#subscriptionIdwith the new one.The only place that calls
disposeQuerySubscription()isdispose(), and it uses the current#subscriptionId. IfsubscribeQuery()allocates a new backend subscription ID each time, retries will leave the previous IDs undisposed on the Rust side until process restart.Please verify how
PerspectiveClient.subscribeQueryanddisposeQuerySubscriptionare implemented:
- If
subscribeQueryis idempotent and reuses the same server-side subscription record, you’re fine.- If it creates a new record per call, consider disposing the old
#subscriptionIdbefore overwriting it, or teaching the backend to reuse/replace existing subscriptions for the same(uuid, query).For example, on retry:
- if (this.#unsubscribe) { - this.#unsubscribe(); - this.#unsubscribe = undefined; - } + if (this.#unsubscribe) { + this.#unsubscribe(); + this.#unsubscribe = undefined; + } + // Optionally also clean up the previous backend subscription before creating a new one + if (this.#subscriptionId) { + this.#client.disposeQuerySubscription(this.#uuid, this.#subscriptionId) + .catch(e => console.error('Error disposing previous query subscription during retry:', e)); + }Also applies to: 284-287
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
core/src/perspectives/PerspectiveProxy.ts(3 hunks)
🔇 Additional comments (1)
core/src/perspectives/PerspectiveProxy.ts (1)
67-88: Init promise wiring across retries looks solidStoring
#initResolve/#initRejectand creating#initializedonce in the constructor cleanly fixes the previous “promise replaced on retry” issue. Clearing the resolvers on first resolve/reject matches the updatedinitializedJSDoc (resolve once on first message or reject on fatal setup error) and avoids leaking closures. No changes needed here.Also applies to: 199-209
Problem
Perspective query subscriptions were losing messages (especially when sent in quick succession) and sometimes even timing out after 30 seconds without receiving any of the 7 init messages. As subscriptions are now used frequently in Flux this meant portions of the UI would often not load or take up to 30 seconds to appear.
Root Cause
The pubsub system was using Tokio watch channels, which are designed for state synchronization, not message delivery. Watch channels only store the most recent value so when multiple messages are published before receivers can consume them, earlier messages are overwritten and lost. Additionally we were using
WatchStream::from_changes()which only updates subscribers when the content of the new message is different from the previous message, resulting in multiple messages with the same data (like the 7 init messages) being de-duplicated.Example of the data being overwritten:
Example of the data being de-duplicated:
An attempt to fix this by switching from
WatchStream::from_changes()toWatchStream::new().skip(1)solved the de-duplication but didn't fix the root problem because both are built on watch channels with the same fundamental coalescing behavior.Solution
Replaced watch channels with broadcast channels throughout the pubsub system.
Key Changes in
pubsub.rs:Why Broadcast Channels:
Additional updates
#client.subscribeQueryin thesubscribe()function inperspectiveProxy.tsis now returned immediately (~ 5ms instead of 100-500ms) to improve load times. We no longer wait for the first responses in the subscription.Impact
Summary by CodeRabbit
Bug Fixes
Chores
✏️ Tip: You can customize this high-level summary in your review settings.
Note
Replace watch-based pubsub with broadcast channels and make query subscription init/keepalive more robust with immediate initial results, retries, and safer timers.
rust-executor/src/pubsub.rs):watchwithbroadcastchannels:HashMap<Topic, broadcast::Sender<Message>>, per-subscriber receivers, 100-message buffer.BroadcastStreamand handle lag/errors; simplifypublish(); remove dead-subscriber cleanup.core/src/perspectives/PerspectiveProxy.ts):subscribeQuery()immediately; resolve on first update; skip duplicate init messages.Written by Cursor Bugbot for commit 751f08a. This will update automatically on new commits. Configure here.