Skip to content

fix: complete Subscribe locally when contract is cached#2327

Merged
sanity merged 3 commits intomainfrom
fix-2326-subscribe-local
Dec 17, 2025
Merged

fix: complete Subscribe locally when contract is cached#2327
sanity merged 3 commits intomainfrom
fix-2326-subscribe-local

Conversation

@sanity
Copy link
Copy Markdown
Collaborator

@sanity sanity commented Dec 17, 2025

Problem

When a peer has a contract cached locally and a WebSocket client calls Subscribe, the operation incorrectly forwards to a remote peer instead of completing locally. The remote peer may not have the contract and returns subscribed: false, causing the client to think subscription failed.

This manifests in River when:

  1. A room is created locally (PUT contract)
  2. Browser calls Subscribe
  3. Subscribe forwards to another peer that doesn't have the contract yet
  4. That peer returns subscribed: false
  5. Client thinks subscription failed

Root Cause

The condition at subscribe.rs:153-156 required both:

  1. Contract cached locally AND
  2. No remote peers available
if has_contract_locally && candidates.is_empty() {
    return complete_local_subscription(...)
}

When remote peers exist, it skipped local completion and forwarded to the network, even though the local cache already guarantees the client will receive updates.

This Solution

Remove the && candidates.is_empty() condition. If the contract is cached locally, the client subscription should succeed immediately - network subscription is already handled separately when the contract was stored.

Why CI Tests Pass

Tests use networks where all relevant nodes have the contract before Subscribe is called, so network responses return subscribed: true. The bug only manifests when forwarding to a peer that doesn't have the contract yet.

Fixes

Closes #2326

[AI-assisted - Claude]

Remove the `&& candidates.is_empty()` condition that was preventing
local subscription completion when remote peers existed.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@sanity
Copy link
Copy Markdown
Collaborator Author

sanity commented Dec 17, 2025

/retest

The token bucket was using saturating_sub for tokens, so when concurrent
tasks reserved tokens simultaneously, they all saw the same zero balance
and calculated the same wait time. This caused them to wait concurrently
instead of sequentially, effectively bypassing rate limiting.

Fix: Change tokens from usize to isize to allow negative values ("debt").
When multiple tasks reserve tokens concurrently:
1. Task A: tokens=0, reserves 1000 → tokens=-1000, wait=1ms
2. Task B: tokens=-1000, reserves 1000 → tokens=-2000, wait=2ms
3. Tasks now wait sequentially as intended

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@sanity sanity requested a review from Copilot December 17, 2025 18:39
@sanity
Copy link
Copy Markdown
Collaborator Author

sanity commented Dec 17, 2025

@claude Please do a skeptical review of this PR, both at a high level (is this the right solution, does it fix the bug), and detailed

@claude
Copy link
Copy Markdown

claude Bot commented Dec 17, 2025

Claude finished @sanity's task in 3m 5s —— View job


Skeptical PR Review

I've completed a thorough review of this PR. Here are my findings:


High-Level Assessment

The fix is correct and solves the stated problem

The bug description is accurate: when a local peer has a contract cached and receives a Subscribe request, it was incorrectly forwarding to the network instead of completing locally. The fix properly addresses this.

⚠️ However, there's a critical architectural question this raises


Detailed Analysis

1. The Subscribe Logic Fix (subscribe.rs)

What changed:

// Before (lines 153-156)
if has_contract_locally && candidates.is_empty() {
    return complete_local_subscription(...)
}

// After (lines 153-157)  
if has_contract_locally {
    return complete_local_subscription(...)
}

Why this is correct:

  • When a contract is cached locally, the client subscription should complete immediately
  • The local executor already has the contract and will notify the client of updates via update_notifications channels
  • There's no need to forward to the network since local client subscriptions are architecturally separate from network peer-to-peer subscriptions (see architecture note at subscribe.rs:200-210)

The bug was real:

  • The old logic required BOTH local cache AND no remote peers
  • If remote peers existed, it would forward to them even though local completion was possible
  • Remote peer might not have the contract → returns subscribed: false → client thinks it failed

2. Critical Question: Network Subscription ⚠️

My skeptical concern: When a client subscribes to a locally-cached contract, does the network layer properly propagate updates from OTHER nodes that also cache this contract?

Let me trace the flow:

  1. Client does PUT → contract stored locally
  2. Network layer presumably broadcasts to peers (seeding)
  3. Client does Subscribe → now completes locally (this PR)
  4. Question: If another peer updates the contract, will this node receive that update and forward it to the local client?

Looking at the code:

From subscribe.rs:200-210 (architecture note):

Local client subscriptions are deliberately kept separate from network subscriptions:

  • Network subscriptions are stored in ring.seeding_manager.subscribers
  • Local subscriptions are managed by the contract executor via update_notifications

The assumption in the PR description:

"The contract is already being seeded, so updates will flow to this client"

Is this assumption valid? Looking at subscribe.rs:350-364, when a remote peer sends a Subscribe request and we have the contract, we call add_subscriber to register them for network updates. But when completing locally, we DON'T register ourselves in the network subscription list.

This might be correct IF:

  • The node automatically receives network updates for any contract it caches (via the seeding mechanism)
  • Those updates are then propagated to local clients via the executor's notification channel

But I can't verify this from the code shown. The seeding architecture might handle this, but it's not obvious from subscribe.rs alone.

Recommendation: Add a test that verifies:

// Test scenario:
// 1. Peer A: PUT contract, Subscribe (completes locally per this PR)
// 2. Peer B: receives contract, UPDATE contract with new state
// 3. Verify: Peer A's local client receives the update notification

If this scenario already works (which the PR description implies), then the fix is sound. But this is the critical integration point that needs validation.


3. Token Bucket Changes (token_bucket.rs)

These changes are intentional and from a separate commit:

Looking at git history:

75d4cfa0 fix(transport): track token debt for proper concurrent rate limiting
e99dc109 fix: complete Subscribe locally when contract is cached

The token_bucket.rs changes are from commit 75d4cfa0 which fixes concurrent rate limiting by allowing negative token values to track "debt". This is a separate fix that's included in this PR branch.

Are they related to Subscribe? No - they're unrelated transport layer fixes.

Should they be in this PR? Ideally these would be separate PRs for cleaner review, but since they're both bug fixes on the same branch, it's acceptable. The token bucket fix is sound on its own merits.


4. Test Coverage ⚠️

Current unit tests (subscribe/tests.rs):

  • Focus on routing logic with mock TestRing
  • Don't test the local completion path with actual contract storage
  • Don't test cross-node update propagation to local subscribers

What's missing:

  • Integration test showing local Subscribe after PUT works
  • Integration test showing local subscriber receives updates from remote peer
  • Test specifically for the bug scenario described (Subscribe forwarding to peer without contract)

Why CI passes:
As noted in PR description: "Tests use networks where all relevant nodes have the contract before Subscribe is called, so network responses return subscribed: true. The bug only manifests when forwarding to a peer that doesn't have the contract yet."


Edge Cases to Consider

Potential issue #1: Race condition

Scenario: Contract is being evicted from cache between the has_contract_locally check and complete_local_subscription call.

Impact: Minor - client would get success response but might miss initial state. Executor would handle this gracefully.

Severity: Low

Potential issue #2: Contract exists but isn't fully initialized

Scenario: Contract file exists but executor hasn't loaded it yet.

Impact: complete_local_subscription succeeds, but client might not receive updates until executor initializes.

Severity: Low - should be handled by executor's notification mechanism

Potential issue #3: Memory contracts

Question: Does has_contract_locally return true for in-memory contracts? If so, same logic should apply.

Severity: Question for clarification


Recommendations

Must Address:

  1. Verify network update propagation - Confirm that when this node has a contract cached and a local client subscribed, updates from OTHER peers properly flow to the local client. This is the critical assumption the fix relies on.

Should Consider:

  1. Add integration test - Test the exact bug scenario: local PUT, Subscribe, verify remote peer doesn't get queried
  2. Add cross-node test - Verify local subscriber receives updates from remote peer
  3. Separate token bucket changes - Consider extracting to separate PR for cleaner history (optional, not blocking)

Nice to Have:

  1. Document the seeding flow - Add comment explaining how network updates reach local subscribers when contract is cached
  2. Update tests - Add test case that would catch this bug in the future

Verdict

High-level:The fix is correct for the described bug

Architecture: ⚠️ Relies on seeding mechanism working correctly (which it probably does, but should be verified)

Code quality:Clean, well-commented, follows existing patterns

Testing: ⚠️ Lacks integration test for the specific bug scenario

Overall: I'd approve this PR with the caveat that the network update propagation assumption should be verified. The fix itself is sound - removing an overly restrictive condition that prevented local subscription completion. The real question is whether the broader architecture properly handles update propagation, which this PR assumes is already working.


Questions for Author

  1. Can you confirm that when a node caches a contract, it automatically receives network updates for that contract (via seeding) even without explicitly subscribing in the seeding_manager?

  2. Have you manually tested the scenario: Node A PUTs + Subscribes, Node B updates, Node A's client receives the update?

  3. Should we add an integration test for this specific scenario to prevent regression?


|

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request contains two distinct fixes bundled together:

  1. Subscribe Operation Fix: Changes the Subscribe operation to complete locally when a contract is cached, rather than forwarding to remote peers. This resolves an issue where subscriptions incorrectly forward to peers that may not have the contract, causing spurious subscription failures.

  2. Token Bucket Debt Tracking: Modifies the token bucket rate limiter to track negative token values ("debt") when concurrent reservations occur, improving rate limiting accuracy under concurrent load.

Key Changes

  • Modified Subscribe logic to skip network forwarding when contract is available locally
  • Changed token bucket's token tracking from usize to isize to support negative values
  • Updated token bucket's refill and reservation logic to properly handle debt accumulation

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.

File Description
crates/core/src/operations/subscribe.rs Simplified local subscription condition by removing the candidates.is_empty() check - now completes locally whenever contract is cached
crates/core/src/transport/token_bucket.rs Changed token type from usize to isize to track debt from concurrent reservations, updated arithmetic operations and test expectations accordingly

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

self.tokens = (self.tokens + new_tokens_whole).min(self.capacity);
// Add tokens, capped at capacity. Note: tokens may be negative (debt),
// so we add to it and cap at capacity.
let capacity_isize = self.capacity as isize;
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conversion from capacity to isize can truncate if capacity exceeds isize::MAX. Consider adding validation or documenting the constraint.

Suggested change
let capacity_isize = self.capacity as isize;
let capacity_isize = isize::try_from(self.capacity).unwrap_or(isize::MAX);

Copilot uses AI. Check for mistakes.
state.tokens = state.tokens.saturating_sub(bytes);
// Deduct tokens - may go negative to track "debt" from concurrent reservations.
// This ensures subsequent reservations see the accumulated deficit and wait longer.
state.tokens -= bytes_isize;
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The subtraction state.tokens -= bytes_isize can potentially overflow if state.tokens is already a large negative value and bytes_isize is large. Consider using checked arithmetic (e.g., checked_sub) to detect overflow conditions and handle them gracefully, or document the assumption that this scenario won't occur in practice.

Suggested change
state.tokens -= bytes_isize;
state.tokens = state.tokens.saturating_sub(bytes_isize);

Copilot uses AI. Check for mistakes.
// Add tokens, capped at capacity. Note: tokens may be negative (debt),
// so we add to it and cap at capacity.
let capacity_isize = self.capacity as isize;
self.tokens = (self.tokens + new_tokens_whole).min(capacity_isize);
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addition self.tokens + new_tokens_whole can potentially overflow if both values are near isize::MAX. Consider using checked arithmetic (e.g., checked_add) to detect overflow conditions and handle them gracefully.

Suggested change
self.tokens = (self.tokens + new_tokens_whole).min(capacity_isize);
let added_tokens = self.tokens.saturating_add(new_tokens_whole);
self.tokens = added_tokens.min(capacity_isize);

Copilot uses AI. Check for mistakes.
let mut state = self.state.lock();
state.refill();

let bytes_isize = bytes as isize;
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conversion from usize to isize using as can truncate values on platforms where usize can be larger than isize::MAX. Consider validating that bytes doesn't exceed isize::MAX before converting, or documenting this constraint. For example, if bytes is larger than isize::MAX (2^63-1 on 64-bit systems), the conversion will produce incorrect negative values.

Copilot uses AI. Check for mistakes.
state: Mutex::new(BucketState {
capacity,
tokens: capacity, // Start full
tokens: capacity as isize, // Start full
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the bytes conversion, converting capacity to isize can truncate if capacity exceeds isize::MAX. Consider adding validation in the new() method or documenting the constraint that capacity must not exceed isize::MAX.

Suggested change
tokens: capacity as isize, // Start full
tokens: isize::try_from(capacity)
.expect("TokenBucket capacity must not exceed isize::MAX"), // Start full

Copilot uses AI. Check for mistakes.
@sanity
Copy link
Copy Markdown
Collaborator Author

sanity commented Dec 17, 2025

Response to Review

Thanks for the thorough skeptical review. Let me address the key architectural questions:

Q1: Does a node automatically receive updates for contracts it's seeding?

Yes. Looking at the code flow:

  1. PUT stores contract → seed_contract() called (put.rs:227-229):

    if !was_seeding {
        op_manager.ring.seed_contract(key, value.size() as u64);
    }
  2. UPDATE checks seeding status (update.rs:1041-1043):

    let is_seeding = op_manager.ring.is_seeding_contract(&key);
    let has_subscribers = op_manager.ring.subscribers_of(&key).is_some();
    let should_handle_update = is_seeding || has_subscribers;
  3. Executor notifies local clients (executor/runtime.rs:1474-1514):
    When an update is processed locally, send_update_notification() iterates over update_notifications and sends to all registered local clients.

So the flow is:

  • Node does PUT → becomes seeding node → receives network UPDATEs
  • Local client does Subscribe → registers with executor's update_notifications
  • Network UPDATE arrives → processed locally → executor notifies all registered local clients

The two subscription types (network vs local) are intentionally separate (see architecture note at subscribe.rs:200-210).

Q2: Manual testing?

This was discovered and tested during River E2E testing. The bug manifests as:

  1. Technic: PUT (room created) → Subscribe → forwarded to Vega (doesn't have contract) → subscribed: false

With the fix, Subscribe completes locally since contract is already cached.

Q3: Integration test?

Agreed this would be valuable. The current tests pass because all nodes have the contract before Subscribe is called. A test specifically covering "PUT then immediate Subscribe when other peers don't have contract yet" would catch this regression.

However, I'd suggest that's a follow-up issue rather than blocking this fix, since:

  1. The fix is correct and addresses a real bug
  2. The architecture analysis confirms the assumption is valid
  3. River E2E already exercises this path

Re: Token Bucket Changes

Yes, these are a separate fix (75d4cfa0) that happened to be needed because CI was failing on an unrelated flaky test. Should I split into a separate PR?

[AI-assisted - Claude]

@sanity
Copy link
Copy Markdown
Collaborator Author

sanity commented Dec 17, 2025

Response to Copilot's Inline Comments

Copilot raised 5 concerns about overflow/truncation in the token bucket changes. Here's why they're not actionable:

saturating_sub for token deduction (Comment on line 112)

This suggestion is incorrect and would reintroduce the bug. The whole point of this fix is that tokens MUST go negative to track "debt" from concurrent reservations. Using saturating_sub was exactly what caused the flaky test - all concurrent tasks saw tokens=0 and calculated the same wait time.

❌ Overflow concerns for usize → isize conversion

The token bucket is a rate limiter for network traffic:

  • Typical capacity: 10-100 KB (burst)
  • Typical bytes per reserve: 1-64 KB (packet size)
  • isize::MAX on 64-bit: 9.2 exabytes

These values will never exceed isize::MAX. Adding try_from().expect() or validation would be defensive code against an impossible scenario.

saturating_add in refill() (Comment on line 169)

This one is harmless - we could use saturating_add for the token addition during refill. But again, adding 1MB/s worth of tokens for a few milliseconds will never overflow isize::MAX.

Verdict: No changes needed. The implementation is correct for the domain (network rate limiting with realistic values).

[AI-assisted - Claude]

Verifies that Subscribe completes locally when contract is cached,
even if remote peers exist that don't have the contract yet.

This catches the class of bugs where local operations are unnecessarily
forwarded to the network.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@sanity sanity merged commit 356acd7 into main Dec 17, 2025
10 checks passed
@sanity sanity deleted the fix-2326-subscribe-local branch December 17, 2025 20:07
sanity added a commit that referenced this pull request Apr 13, 2026
`test_ping_blocked_peers` was flaky in the merge queue because the
task-per-tx subscribe driver dispatched its initial Request through
`OpCtx::send_and_await`, which routes via `op_execution_sender` and
loops the message back to the local event loop as an `InboundMessage`.
When the contract was already cached locally (every client node had
just finished a GET), `process_message` short-circuited at
"Subscribe completed (originator has contract locally)" and synthesized
a success reply without ever sending the Request to the gateway. The
gateway therefore never registered the node as a downstream subscriber,
so subsequent UPDATE broadcasts never reached it.

The asymmetric failure pattern (one client node receives all updates,
the other only its own) is explained by `auto_subscribe_on_get_response`
falling back to the legacy `request_subscribe` path on GET completion:
one client wins the race and registers via the legacy path, the other
loses and is left depending on the broken explicit Subscribe.

Fix:

- Add `OpCtx::send_to_and_await(target_addr, msg)`. The op-execution
  channel payload now carries `Option<SocketAddr>`. When set,
  `handle_op_execution` emits `OutboundMessageWithTarget` instead of
  `InboundMessage`, dispatching the message to the named peer over
  the wire while still registering the reply callback in
  `pending_op_results`.

- Have `op_ctx_task` call `send_to_and_await(current_target_addr, ...)`
  for client-initiated subscribe Requests. The task already selected
  `current_target_addr` via `prepare_initial_request`; previously that
  address was being dropped on the floor.

- On `Subscribed` reply, mirror the legacy Response handler's local
  side effects (`ring.subscribe`, `complete_subscription_request`,
  `interest_manager.add_local_client`) inside `op_ctx_task`. The
  task-per-tx reply forwarding bypass in `node.rs` skips
  `handle_op_request` for terminal Responses, so without these calls
  the local interest manager never learns about the subscription.

- Preserve the existing `send_and_await(msg)` (target=None) variant
  for non-routed call sites and tests.

The local-originator branch in `process_message` is preserved
intentionally — it remains correct for true standalone/sole-holder
nodes (PR #2327, #2004). The fix prevents the network-target path
from entering it.

Wire format is unchanged: this is purely an internal dispatch fix.

Regression test: `send_to_and_await_forwards_target_address` pins the
channel-level invariant (target propagates through the op execution
channel). The end-to-end behavior is covered by `test_ping_blocked_peers`,
which previously hung for ~280 s and now passes in ~12 s.

Also includes unrelated cleanup:
- `sqlite.rs`: replace `let _ =` on a sqlx Result with `drop(...)`
  to satisfy clippy's `let_underscore_must_use` (newer clippy than CI).

Closes #3838

[AI-assisted - Claude]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

bug: Subscribe returns false when contract is cached locally but remote peers exist

2 participants