fix: complete Subscribe locally when contract is cached#2327
Conversation
Remove the `&& candidates.is_empty()` condition that was preventing local subscription completion when remote peers existed. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
/retest |
The token bucket was using saturating_sub for tokens, so when concurrent
tasks reserved tokens simultaneously, they all saw the same zero balance
and calculated the same wait time. This caused them to wait concurrently
instead of sequentially, effectively bypassing rate limiting.
Fix: Change tokens from usize to isize to allow negative values ("debt").
When multiple tasks reserve tokens concurrently:
1. Task A: tokens=0, reserves 1000 → tokens=-1000, wait=1ms
2. Task B: tokens=-1000, reserves 1000 → tokens=-2000, wait=2ms
3. Tasks now wait sequentially as intended
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
@claude Please do a skeptical review of this PR, both at a high level (is this the right solution, does it fix the bug), and detailed |
|
Claude finished @sanity's task in 3m 5s —— View job Skeptical PR ReviewI've completed a thorough review of this PR. Here are my findings: High-Level Assessment✅ The fix is correct and solves the stated problemThe bug description is accurate: when a local peer has a contract cached and receives a Subscribe request, it was incorrectly forwarding to the network instead of completing locally. The fix properly addresses this.
|
There was a problem hiding this comment.
Pull request overview
This pull request contains two distinct fixes bundled together:
-
Subscribe Operation Fix: Changes the Subscribe operation to complete locally when a contract is cached, rather than forwarding to remote peers. This resolves an issue where subscriptions incorrectly forward to peers that may not have the contract, causing spurious subscription failures.
-
Token Bucket Debt Tracking: Modifies the token bucket rate limiter to track negative token values ("debt") when concurrent reservations occur, improving rate limiting accuracy under concurrent load.
Key Changes
- Modified Subscribe logic to skip network forwarding when contract is available locally
- Changed token bucket's token tracking from
usizetoisizeto support negative values - Updated token bucket's refill and reservation logic to properly handle debt accumulation
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| crates/core/src/operations/subscribe.rs | Simplified local subscription condition by removing the candidates.is_empty() check - now completes locally whenever contract is cached |
| crates/core/src/transport/token_bucket.rs | Changed token type from usize to isize to track debt from concurrent reservations, updated arithmetic operations and test expectations accordingly |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self.tokens = (self.tokens + new_tokens_whole).min(self.capacity); | ||
| // Add tokens, capped at capacity. Note: tokens may be negative (debt), | ||
| // so we add to it and cap at capacity. | ||
| let capacity_isize = self.capacity as isize; |
There was a problem hiding this comment.
The conversion from capacity to isize can truncate if capacity exceeds isize::MAX. Consider adding validation or documenting the constraint.
| let capacity_isize = self.capacity as isize; | |
| let capacity_isize = isize::try_from(self.capacity).unwrap_or(isize::MAX); |
| state.tokens = state.tokens.saturating_sub(bytes); | ||
| // Deduct tokens - may go negative to track "debt" from concurrent reservations. | ||
| // This ensures subsequent reservations see the accumulated deficit and wait longer. | ||
| state.tokens -= bytes_isize; |
There was a problem hiding this comment.
The subtraction state.tokens -= bytes_isize can potentially overflow if state.tokens is already a large negative value and bytes_isize is large. Consider using checked arithmetic (e.g., checked_sub) to detect overflow conditions and handle them gracefully, or document the assumption that this scenario won't occur in practice.
| state.tokens -= bytes_isize; | |
| state.tokens = state.tokens.saturating_sub(bytes_isize); |
| // Add tokens, capped at capacity. Note: tokens may be negative (debt), | ||
| // so we add to it and cap at capacity. | ||
| let capacity_isize = self.capacity as isize; | ||
| self.tokens = (self.tokens + new_tokens_whole).min(capacity_isize); |
There was a problem hiding this comment.
The addition self.tokens + new_tokens_whole can potentially overflow if both values are near isize::MAX. Consider using checked arithmetic (e.g., checked_add) to detect overflow conditions and handle them gracefully.
| self.tokens = (self.tokens + new_tokens_whole).min(capacity_isize); | |
| let added_tokens = self.tokens.saturating_add(new_tokens_whole); | |
| self.tokens = added_tokens.min(capacity_isize); |
| let mut state = self.state.lock(); | ||
| state.refill(); | ||
|
|
||
| let bytes_isize = bytes as isize; |
There was a problem hiding this comment.
The conversion from usize to isize using as can truncate values on platforms where usize can be larger than isize::MAX. Consider validating that bytes doesn't exceed isize::MAX before converting, or documenting this constraint. For example, if bytes is larger than isize::MAX (2^63-1 on 64-bit systems), the conversion will produce incorrect negative values.
| state: Mutex::new(BucketState { | ||
| capacity, | ||
| tokens: capacity, // Start full | ||
| tokens: capacity as isize, // Start full |
There was a problem hiding this comment.
Similar to the bytes conversion, converting capacity to isize can truncate if capacity exceeds isize::MAX. Consider adding validation in the new() method or documenting the constraint that capacity must not exceed isize::MAX.
| tokens: capacity as isize, // Start full | |
| tokens: isize::try_from(capacity) | |
| .expect("TokenBucket capacity must not exceed isize::MAX"), // Start full |
Response to ReviewThanks for the thorough skeptical review. Let me address the key architectural questions: Q1: Does a node automatically receive updates for contracts it's seeding?Yes. Looking at the code flow:
So the flow is:
The two subscription types (network vs local) are intentionally separate (see architecture note at Q2: Manual testing?This was discovered and tested during River E2E testing. The bug manifests as:
With the fix, Subscribe completes locally since contract is already cached. Q3: Integration test?Agreed this would be valuable. The current tests pass because all nodes have the contract before Subscribe is called. A test specifically covering "PUT then immediate Subscribe when other peers don't have contract yet" would catch this regression. However, I'd suggest that's a follow-up issue rather than blocking this fix, since:
Re: Token Bucket ChangesYes, these are a separate fix ( [AI-assisted - Claude] |
Response to Copilot's Inline CommentsCopilot raised 5 concerns about overflow/truncation in the token bucket changes. Here's why they're not actionable: ❌
|
Verifies that Subscribe completes locally when contract is cached, even if remote peers exist that don't have the contract yet. This catches the class of bugs where local operations are unnecessarily forwarded to the network. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
`test_ping_blocked_peers` was flaky in the merge queue because the task-per-tx subscribe driver dispatched its initial Request through `OpCtx::send_and_await`, which routes via `op_execution_sender` and loops the message back to the local event loop as an `InboundMessage`. When the contract was already cached locally (every client node had just finished a GET), `process_message` short-circuited at "Subscribe completed (originator has contract locally)" and synthesized a success reply without ever sending the Request to the gateway. The gateway therefore never registered the node as a downstream subscriber, so subsequent UPDATE broadcasts never reached it. The asymmetric failure pattern (one client node receives all updates, the other only its own) is explained by `auto_subscribe_on_get_response` falling back to the legacy `request_subscribe` path on GET completion: one client wins the race and registers via the legacy path, the other loses and is left depending on the broken explicit Subscribe. Fix: - Add `OpCtx::send_to_and_await(target_addr, msg)`. The op-execution channel payload now carries `Option<SocketAddr>`. When set, `handle_op_execution` emits `OutboundMessageWithTarget` instead of `InboundMessage`, dispatching the message to the named peer over the wire while still registering the reply callback in `pending_op_results`. - Have `op_ctx_task` call `send_to_and_await(current_target_addr, ...)` for client-initiated subscribe Requests. The task already selected `current_target_addr` via `prepare_initial_request`; previously that address was being dropped on the floor. - On `Subscribed` reply, mirror the legacy Response handler's local side effects (`ring.subscribe`, `complete_subscription_request`, `interest_manager.add_local_client`) inside `op_ctx_task`. The task-per-tx reply forwarding bypass in `node.rs` skips `handle_op_request` for terminal Responses, so without these calls the local interest manager never learns about the subscription. - Preserve the existing `send_and_await(msg)` (target=None) variant for non-routed call sites and tests. The local-originator branch in `process_message` is preserved intentionally — it remains correct for true standalone/sole-holder nodes (PR #2327, #2004). The fix prevents the network-target path from entering it. Wire format is unchanged: this is purely an internal dispatch fix. Regression test: `send_to_and_await_forwards_target_address` pins the channel-level invariant (target propagates through the op execution channel). The end-to-end behavior is covered by `test_ping_blocked_peers`, which previously hung for ~280 s and now passes in ~12 s. Also includes unrelated cleanup: - `sqlite.rs`: replace `let _ =` on a sqlx Result with `drop(...)` to satisfy clippy's `let_underscore_must_use` (newer clippy than CI). Closes #3838 [AI-assisted - Claude]
Problem
When a peer has a contract cached locally and a WebSocket client calls Subscribe, the operation incorrectly forwards to a remote peer instead of completing locally. The remote peer may not have the contract and returns
subscribed: false, causing the client to think subscription failed.This manifests in River when:
subscribed: falseRoot Cause
The condition at
subscribe.rs:153-156required both:When remote peers exist, it skipped local completion and forwarded to the network, even though the local cache already guarantees the client will receive updates.
This Solution
Remove the
&& candidates.is_empty()condition. If the contract is cached locally, the client subscription should succeed immediately - network subscription is already handled separately when the contract was stored.Why CI Tests Pass
Tests use networks where all relevant nodes have the contract before Subscribe is called, so network responses return
subscribed: true. The bug only manifests when forwarding to a peer that doesn't have the contract yet.Fixes
Closes #2326
[AI-assisted - Claude]