diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 6b988031db..309ea3030d 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -1648,8 +1648,75 @@ impl Operation for SubscribeOp { "subscribe: processing Subscribed response" ); - // Register our subscription locally. Relay nodes also register - // downstream subscribers (see relay path below) to propagate updates. + if let Some(requester_addr) = self.requester_addr { + // Relay path. We are forwarding a Subscribed response from + // an upstream fulfilling peer back to a downstream requester. + // + // Critically, we do NOT call `ring.subscribe()`, + // `record_subscription()`, `announce_contract_hosted()`, or + // register ourselves as having an upstream. Relays are not + // subscribers in their own right; they only mediate updates + // between the upstream fulfilling node and the downstream + // requester. Doing any of the above on a relay would: + // - Install a lease in `active_subscriptions`, which the + // renewal cycle (ring.rs::contracts_needing_renewal path 1) + // would then refresh every ~2 minutes indefinitely. Each + // renewal routes through fresh relays, which themselves + // then subscribe — a positive feedback loop that inflates + // subscription trees and UPDATE fan-out with peers that + // have no genuine interest in the contract. This mirrors + // the #3763 subscription-storm incident. + // - Broadcast a "I host this" announcement populating + // neighbors' proximity caches with the relay, further + // amplifying UPDATE broadcast targets. + // - Corrupt `subscription_backoff` by recording a bogus + // "successful subscription" for a request the relay + // never actually made. + // + // The GET-piggyback relay path already follows this model; + // see `operations.rs::setup_subscription_forwarding_at_relay` + // which documents the same constraint. The explicit + // SUBSCRIBE path was asymmetric historically and is brought + // into line here. + // + // UPDATE propagation back through the relay is driven by + // `register_downstream_subscriber` below (which registers + // the requester in both the hosting manager's downstream + // subscriber list and the interest manager). The upstream + // fulfilling node already registered *this relay* as its + // downstream subscriber when it processed our forwarded + // Request, so UPDATEs will reach us and we'll fan them out + // to the registered downstream. + register_downstream_subscriber( + op_manager, + key, + requester_addr, + self.requester_pub_key.as_ref(), + None, + msg_id, + " (relay registration on Response)", + ) + .await; + + tracing::debug!(tx = %msg_id, %key, requester = %requester_addr, "Forwarding Subscribed response to requester"); + // Note: ResponseSent telemetry is emitted by from_outbound_msg() + return Ok(OperationResult::SendAndComplete { + msg: NetMessage::from(SubscribeMsg::Response { + id: *msg_id, + instance_id: *instance_id, + result: SubscribeMsgResult::Subscribed { key: *key }, + }), + next_hop: Some(requester_addr), + stream_data: None, + }); + } + + // Originator path. We initiated this subscribe (client or + // renewal cycle) and the upstream peer confirmed. Install the + // local lease, fetch the contract if needed, and record the + // upstream as our interest target for future Unsubscribes. + + // Register our subscription locally. op_manager.ring.subscribe(*key); op_manager.ring.complete_subscription_request(key, true); @@ -1666,8 +1733,9 @@ impl Operation for SubscribeOp { crate::node::network_status::record_subscription(format!("{key}")); // Fetch contract if we don't have it. - // This is non-fatal - if it fails, we still continue with forwarding/completing - // the subscription. The contract will eventually arrive via UPDATE broadcasts. + // This is non-fatal - if it fails, we still complete the + // subscription. The contract will eventually arrive via + // UPDATE broadcasts. if let Err(e) = fetch_contract_if_missing(op_manager, *key.id()).await { tracing::debug!( tx = %msg_id, @@ -1700,88 +1768,50 @@ impl Operation for SubscribeOp { } } - // Forward response to requester or complete - if let Some(requester_addr) = self.requester_addr { - // We're a relay node — register the upstream requester as a - // downstream subscriber so update broadcasts propagate back - // through us. The requester_addr is the direct connection we - // received the original Request from, so the registration is - // always deliverable. This creates the subscription relay tree: - // fulfilling_node → relay(us) → requester - // Pass None for source_addr fallback because here source_addr - // is the fulfilling node (Response sender), NOT the requester. - // The primary lookup uses requester_pub_key (resolved at init). - register_downstream_subscriber( - op_manager, - key, - requester_addr, - self.requester_pub_key.as_ref(), - None, - msg_id, - " (relay registration on Response)", - ) - .await; - - tracing::debug!(tx = %msg_id, %key, requester = %requester_addr, "Forwarding Subscribed response to requester"); - // Note: ResponseSent telemetry is emitted by from_outbound_msg() - Ok(OperationResult::SendAndComplete { - msg: NetMessage::from(SubscribeMsg::Response { - id: *msg_id, - instance_id: *instance_id, - result: SubscribeMsgResult::Subscribed { key: *key }, - }), - next_hop: Some(requester_addr), - stream_data: None, - }) - } else { - // We're the originator - return completed state for handle_op_result - tracing::info!(tx = %msg_id, contract = %key, phase = "complete", "Subscribe completed (originator)"); - - // Register local interest so that ChangeInterests from peers - // get properly processed. Without this, when other nodes broadcast - // ChangeInterests for contracts they seed, the has_local_interest() - // check in the ChangeInterests handler fails, preventing peer - // interest registration and breaking update propagation. - if !self.is_renewal { - let became_interested = - op_manager.interest_manager.add_local_client(key); - if became_interested { - super::broadcast_change_interests( - op_manager, - vec![*key], - vec![], - ) - .await; - } - } - - // Emit telemetry for successful subscription - let own_loc = op_manager.ring.connection_manager.own_location(); - if let Some(event) = NetEventLog::subscribe_success( - msg_id, - &op_manager.ring, - *key, - own_loc, - None, // hop_count not tracked in subscribe - ) { - op_manager.ring.register_events(Either::Left(event)).await; + tracing::info!(tx = %msg_id, contract = %key, phase = "complete", "Subscribe completed (originator)"); + + // Register local interest so that ChangeInterests from peers + // get properly processed. Without this, when other nodes broadcast + // ChangeInterests for contracts they seed, the has_local_interest() + // check in the ChangeInterests handler fails, preventing peer + // interest registration and breaking update propagation. + if !self.is_renewal { + let became_interested = + op_manager.interest_manager.add_local_client(key); + if became_interested { + super::broadcast_change_interests( + op_manager, + vec![*key], + vec![], + ) + .await; } + } - Ok(OperationResult::ContinueOp(OpEnum::Subscribe( - SubscribeOp { - id, - state: SubscribeState::Completed(CompletedData { - key: *key, - }), - requester_addr: None, - requester_pub_key: None, - is_renewal: self.is_renewal, - stats: self.stats, - ack_received: false, - speculative_paths: 0, - }, - ))) + // Emit telemetry for successful subscription + let own_loc = op_manager.ring.connection_manager.own_location(); + if let Some(event) = NetEventLog::subscribe_success( + msg_id, + &op_manager.ring, + *key, + own_loc, + None, // hop_count not tracked in subscribe + ) { + op_manager.ring.register_events(Either::Left(event)).await; } + + Ok(OperationResult::ContinueOp(OpEnum::Subscribe( + SubscribeOp { + id, + state: SubscribeState::Completed(CompletedData { key: *key }), + requester_addr: None, + requester_pub_key: None, + is_renewal: self.is_renewal, + stats: self.stats, + ack_received: false, + speculative_paths: 0, + }, + ))) } SubscribeMsgResult::NotFound => { tracing::debug!( diff --git a/crates/core/src/ring/hosting.rs b/crates/core/src/ring/hosting.rs index bc5e9ee157..3c1e749df5 100644 --- a/crates/core/src/ring/hosting.rs +++ b/crates/core/src/ring/hosting.rs @@ -924,6 +924,20 @@ impl HostingManager { let mut snapshot = TopologySnapshot::new(peer_addr, location); let now = self.time_source.now(); + // Record the raw set of keys that are in `active_subscriptions` right + // now. This is used by regression tests to detect whether a peer + // installed a subscription lease — e.g. the relay-pollution bug fixed + // alongside this field where every forwarder on a SUBSCRIBE response + // path was unconditionally adding itself to active_subscriptions, + // causing feedback-loop renewal. Must be populated BEFORE the merged + // `contracts` map below, which hides active_subscriptions entries + // behind hosting cache presence when both exist. + for entry in self.active_subscriptions.iter() { + if *entry.value() > now { + snapshot.active_subscription_keys.insert(*entry.key().id()); + } + } + // Add all hosted contracts // Collect and sort for deterministic iteration order let hosting_cache = self.hosting_cache.read(); @@ -1565,6 +1579,67 @@ mod tests { // The exclusion test (test_contracts_needing_renewal_excludes_hosted_only) // covers the correct behavior. + /// Regression: a node that merely relays a SUBSCRIBE response for some + /// other peer must NOT end up with the contract in its own + /// `active_subscriptions`, and consequently must NOT appear in + /// `contracts_needing_renewal()`. + /// + /// Before the fix to `operations::subscribe::SubscribeMsgResult::Subscribed`, + /// every relay on a SUBSCRIBE response path called `ring.subscribe(*key)` + /// unconditionally. That installed a lease in `active_subscriptions`, + /// which `contracts_needing_renewal()` path #1 would then pick up every + /// ~2 minutes and spawn a fresh subscribe for — routing through new + /// relays that *also* installed leases, compounding with each cycle. + /// The feedback loop shows up as the 85+ phantom contracts observed on + /// the `technic` peer's local dashboard (see commit message). + /// + /// This test models the post-fix relay state as "contract has a + /// downstream subscriber registered, but no `subscribe()` lease", which + /// is what the SUBSCRIBE Response relay branch now does. The assertion + /// is that such a relay does not get recruited into the renewal cycle. + #[test] + fn test_relay_downstream_only_not_in_renewal() { + let manager = HostingManager::new(); + let contract = make_contract_key(77); + let downstream = make_peer_key(42); + + // Relay state: we've accepted a downstream subscriber for the + // contract, but we have not called `subscribe()` on our own behalf + // (we're just forwarding Updates for someone else) and we have no + // local client expressing interest. + assert!(manager.add_downstream_subscriber(&contract, downstream.clone())); + + // Invariant 1: we did not install a self-subscription lease. + assert!( + !manager.is_subscribed(&contract), + "Relay must not have an active subscription lease just from \ + registering a downstream subscriber" + ); + assert!( + manager.get_subscribed_contracts().is_empty(), + "active_subscriptions must be empty on a pure-relay peer" + ); + + // Invariant 2: the contract is not in the renewal set. This is the + // load-bearing property: if the relay were in `active_subscriptions`, + // `contracts_needing_renewal()` path #1 (expiring active leases) + // would pick it up and spawn a new subscribe, recruiting more + // relays. Pure downstream registration must NOT trigger renewal. + let needs_renewal = manager.contracts_needing_renewal(); + assert!( + !needs_renewal.contains(&contract), + "Pure-relay peer must not appear in contracts_needing_renewal \ + (relay-subscription feedback loop regression, see \ + subscribe.rs::SubscribeMsgResult::Subscribed)" + ); + + // Invariant 3: downstream registration still works as intended — + // the relay holds the downstream peer so UPDATE broadcasts can be + // forwarded. This is the *correct* mechanism for a relay to receive + // and propagate updates, without inflating subscription trees. + assert!(manager.has_downstream_subscribers(&contract)); + } + // Superseded: startup revalidation window removed in #3546 to prevent // subscription accumulation storms. Hosted-only contracts are no longer // proactively renewed at startup. Replaced by test_hosted_contracts_not_renewed_at_scale. diff --git a/crates/core/src/ring/topology_registry.rs b/crates/core/src/ring/topology_registry.rs index 9f26316e25..52c356dbbd 100644 --- a/crates/core/src/ring/topology_registry.rs +++ b/crates/core/src/ring/topology_registry.rs @@ -76,6 +76,14 @@ pub struct TopologySnapshot { pub location: f64, /// Subscriptions per contract pub contracts: HashMap, + /// Keys present in this peer's `HostingManager::active_subscriptions` map. + /// + /// Unlike the `contracts` map (which is merged with the hosting cache and + /// may hide active-subscription presence behind `is_hosting=true`), this + /// field is a direct projection of `active_subscriptions` with no merge. + /// Tests that need to detect whether a peer installed a subscription + /// lease — as opposed to merely caching the contract — should read this. + pub active_subscription_keys: HashSet, /// Timestamp when this snapshot was taken (for staleness detection) pub timestamp_nanos: u64, } @@ -87,6 +95,7 @@ impl TopologySnapshot { peer_addr, location, contracts: HashMap::new(), + active_subscription_keys: HashSet::new(), timestamp_nanos: 0, } } diff --git a/crates/core/tests/simulation_integration.rs b/crates/core/tests/simulation_integration.rs index 3e49c65a6a..23dfecbb15 100644 --- a/crates/core/tests/simulation_integration.rs +++ b/crates/core/tests/simulation_integration.rs @@ -2344,6 +2344,170 @@ fn test_subscribe_forwarding_ack_relay() { ); } +/// Soundness check for the fix that moved `ring.subscribe()` / +/// `record_subscription()` / `announce_contract_hosted()` out of the shared +/// prefix of `SubscribeMsgResult::Subscribed` and into the originator-only +/// branch. +/// +/// The *direct* regression is covered by the unit test +/// `ring::hosting::tests::test_relay_downstream_only_not_in_renewal`, which +/// asserts at the HostingManager level that a pure-relay state (downstream +/// subscriber registered, no self-lease) does not get recruited into +/// `contracts_needing_renewal()`. That unit test is where the bug bite is +/// pinned. +/// +/// This simulation test verifies the broader property that subscribes still +/// complete successfully under the refactored handler and that the bounded +/// number of active-subscription-lease holders stays sane under a multi-peer +/// subscribe scenario — any regression that re-introduced relay-side +/// `ring.subscribe` calls on nodes running the LEGACY process_message path +/// would cause `subscribed_peers` to grow beyond the expected count of +/// genuine originators. +/// +/// Note on turmoil vs production: in the turmoil simulation runner every +/// peer shares one process, which means `pending_op_results` is effectively +/// global. `node::try_forward_task_per_tx_reply` on an intermediate peer +/// can therefore find the originator's task entry and short-circuit the +/// Response before it reaches the legacy relay handler. In production each +/// peer is an independent process with its own `pending_op_results`, so +/// relays fall through to the legacy handler and hit the bug. This is why +/// the *primary* regression test for the bug is the in-process hosting +/// manager unit test rather than this simulation test. +#[test_log::test] +fn test_relay_does_not_pollute_active_subscriptions() { + use freenet::dev_tool::{NodeLabel, ScheduledOperation, SimOperation}; + + const SEED: u64 = 0x5CB6_F37C_0002; + const NETWORK_NAME: &str = "subscribe-relay-pollution"; + + GlobalTestMetrics::reset(); + setup_deterministic_state(SEED); + let rt = create_runtime(); + + // Sparse topology: more peers than max_connections so the ring can't + // be fully connected and subscribe requests MUST route through at least + // one relay to reach the peer that holds the contract. + let sim = rt.block_on(async { + SimNetwork::new( + NETWORK_NAME, + 2, // 2 gateways + 18, // 18 regular nodes (20 total peers) + 10, // ring_max_htl + 3, // rnd_if_htl_above + 3, // max_connections — tight ring, non-full topology + 2, // min_connections + SEED, + ) + .await + }); + + let contract = SimOperation::create_test_contract(0xAD); + let contract_id = *contract.key().id(); + let initial_state = SimOperation::create_test_state(1); + + // Gateway PUTs (with subscribe). Multiple distant nodes then subscribe + // in parallel so at least some Subscribe requests must route through + // relay hops before reaching a peer that holds the contract. + let operations = vec![ + ScheduledOperation::new( + NodeLabel::gateway(NETWORK_NAME, 0), + SimOperation::Put { + contract: contract.clone(), + state: initial_state, + subscribe: true, + }, + ), + ScheduledOperation::new( + NodeLabel::node(NETWORK_NAME, 15), + SimOperation::Subscribe { contract_id }, + ), + ScheduledOperation::new( + NodeLabel::node(NETWORK_NAME, 16), + SimOperation::Subscribe { contract_id }, + ), + ScheduledOperation::new( + NodeLabel::node(NETWORK_NAME, 17), + SimOperation::Subscribe { contract_id }, + ), + ScheduledOperation::new( + NodeLabel::node(NETWORK_NAME, 18), + SimOperation::Subscribe { contract_id }, + ), + ]; + + let result = sim.run_controlled_simulation( + SEED, + operations, + Duration::from_secs(120), + Duration::from_secs(30), + ); + + assert!( + result.turmoil_result.is_ok(), + "Simulation failed: {:?}", + result.turmoil_result.err() + ); + + // run_controlled_simulation snapshots every peer's HostingManager state + // at shutdown and returns them on the result. Use those directly rather + // than the global registry (which is cleared on SimNetwork drop). + let snapshots = &result.topology_snapshots; + assert!( + !snapshots.is_empty(), + "No topology snapshots captured — background snapshot task didn't run" + ); + + // Count peers whose `HostingManager::active_subscriptions` contains the + // contract. Only genuine originators of a subscription should appear here: + // + // * The PUT-with-subscribe originator (gateway 0). + // * Each of nodes 15..=18 that called Subscribe explicitly. + // + // That's at most 5 peers. Before the fix, the + // `SubscribeMsgResult::Subscribed` handler would ALSO add every relay + // that forwards a Subscribed response to the set — crucially, when four + // simultaneous subscribers route through each other, the bug's recruited + // relays overlap with legit subscribers, but additional strictly-relay + // peers still accumulate. After the fix the bound collapses to the 5 + // genuine originators. + let mut subscribed_peers: Vec = snapshots + .iter() + .filter(|snap| snap.active_subscription_keys.contains(&contract_id)) + .map(|snap| snap.peer_addr) + .collect(); + subscribed_peers.sort(); + + tracing::info!( + subscribed_peer_count = subscribed_peers.len(), + ?subscribed_peers, + total_snapshots = snapshots.len(), + "Peers with contract in active_subscriptions" + ); + + // Five nodes legitimately install a lease in `active_subscriptions`: + // the PUT-with-subscribe gateway and the four explicit Subscribe + // originators (nodes 15..=18). Anything more means a peer that did + // not originate a subscribe — a pure relay — ended up with the + // contract in its active_subscriptions, which is the bug. + assert!( + subscribed_peers.len() <= 5, + "Relay pollution regression: {} peer(s) have the contract in \ + `active_subscriptions`, expected at most 5 (gateway + 4 subscribing \ + nodes). Polluted peers: {:?}. \ + See subscribe.rs::SubscribeMsgResult::Subscribed — `ring.subscribe()` \ + must only run on the originator branch, not on relays.", + subscribed_peers.len(), + subscribed_peers, + ); + + // Sanity: at least some peer must have actually subscribed. + assert!( + !subscribed_peers.is_empty(), + "No peers have the contract in active_subscriptions — \ + subscribes didn't complete, test is degenerate" + ); +} + // ============================================================================= // CRDT Emulation Mode Test (PR #2763 Bug Reproduction) // =============================================================================