Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 113 additions & 83 deletions crates/core/src/operations/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1648,8 +1648,75 @@ impl Operation for SubscribeOp {
"subscribe: processing Subscribed response"
);

// Register our subscription locally. Relay nodes also register
// downstream subscribers (see relay path below) to propagate updates.
if let Some(requester_addr) = self.requester_addr {
// Relay path. We are forwarding a Subscribed response from
// an upstream fulfilling peer back to a downstream requester.
//
// Critically, we do NOT call `ring.subscribe()`,
// `record_subscription()`, `announce_contract_hosted()`, or
// register ourselves as having an upstream. Relays are not
// subscribers in their own right; they only mediate updates
// between the upstream fulfilling node and the downstream
// requester. Doing any of the above on a relay would:
// - Install a lease in `active_subscriptions`, which the
// renewal cycle (ring.rs::contracts_needing_renewal path 1)
// would then refresh every ~2 minutes indefinitely. Each
// renewal routes through fresh relays, which themselves
// then subscribe — a positive feedback loop that inflates
// subscription trees and UPDATE fan-out with peers that
// have no genuine interest in the contract. This mirrors
// the #3763 subscription-storm incident.
// - Broadcast a "I host this" announcement populating
// neighbors' proximity caches with the relay, further
// amplifying UPDATE broadcast targets.
// - Corrupt `subscription_backoff` by recording a bogus
// "successful subscription" for a request the relay
// never actually made.
//
// The GET-piggyback relay path already follows this model;
// see `operations.rs::setup_subscription_forwarding_at_relay`
// which documents the same constraint. The explicit
// SUBSCRIBE path was asymmetric historically and is brought
// into line here.
//
// UPDATE propagation back through the relay is driven by
// `register_downstream_subscriber` below (which registers
// the requester in both the hosting manager's downstream
// subscriber list and the interest manager). The upstream
// fulfilling node already registered *this relay* as its
// downstream subscriber when it processed our forwarded
// Request, so UPDATEs will reach us and we'll fan them out
// to the registered downstream.
register_downstream_subscriber(
op_manager,
key,
requester_addr,
self.requester_pub_key.as_ref(),
None,
msg_id,
" (relay registration on Response)",
)
.await;

tracing::debug!(tx = %msg_id, %key, requester = %requester_addr, "Forwarding Subscribed response to requester");
// Note: ResponseSent telemetry is emitted by from_outbound_msg()
return Ok(OperationResult::SendAndComplete {
msg: NetMessage::from(SubscribeMsg::Response {
id: *msg_id,
instance_id: *instance_id,
result: SubscribeMsgResult::Subscribed { key: *key },
}),
next_hop: Some(requester_addr),
stream_data: None,
});
}

// Originator path. We initiated this subscribe (client or
// renewal cycle) and the upstream peer confirmed. Install the
// local lease, fetch the contract if needed, and record the
// upstream as our interest target for future Unsubscribes.

// Register our subscription locally.
op_manager.ring.subscribe(*key);
op_manager.ring.complete_subscription_request(key, true);

Expand All @@ -1666,8 +1733,9 @@ impl Operation for SubscribeOp {
crate::node::network_status::record_subscription(format!("{key}"));

// Fetch contract if we don't have it.
// This is non-fatal - if it fails, we still continue with forwarding/completing
// the subscription. The contract will eventually arrive via UPDATE broadcasts.
// This is non-fatal - if it fails, we still complete the
// subscription. The contract will eventually arrive via
// UPDATE broadcasts.
if let Err(e) = fetch_contract_if_missing(op_manager, *key.id()).await {
tracing::debug!(
tx = %msg_id,
Expand Down Expand Up @@ -1700,88 +1768,50 @@ impl Operation for SubscribeOp {
}
}

// Forward response to requester or complete
if let Some(requester_addr) = self.requester_addr {
// We're a relay node — register the upstream requester as a
// downstream subscriber so update broadcasts propagate back
// through us. The requester_addr is the direct connection we
// received the original Request from, so the registration is
// always deliverable. This creates the subscription relay tree:
// fulfilling_node → relay(us) → requester
// Pass None for source_addr fallback because here source_addr
// is the fulfilling node (Response sender), NOT the requester.
// The primary lookup uses requester_pub_key (resolved at init).
register_downstream_subscriber(
op_manager,
key,
requester_addr,
self.requester_pub_key.as_ref(),
None,
msg_id,
" (relay registration on Response)",
)
.await;

tracing::debug!(tx = %msg_id, %key, requester = %requester_addr, "Forwarding Subscribed response to requester");
// Note: ResponseSent telemetry is emitted by from_outbound_msg()
Ok(OperationResult::SendAndComplete {
msg: NetMessage::from(SubscribeMsg::Response {
id: *msg_id,
instance_id: *instance_id,
result: SubscribeMsgResult::Subscribed { key: *key },
}),
next_hop: Some(requester_addr),
stream_data: None,
})
} else {
// We're the originator - return completed state for handle_op_result
tracing::info!(tx = %msg_id, contract = %key, phase = "complete", "Subscribe completed (originator)");

// Register local interest so that ChangeInterests from peers
// get properly processed. Without this, when other nodes broadcast
// ChangeInterests for contracts they seed, the has_local_interest()
// check in the ChangeInterests handler fails, preventing peer
// interest registration and breaking update propagation.
if !self.is_renewal {
let became_interested =
op_manager.interest_manager.add_local_client(key);
if became_interested {
super::broadcast_change_interests(
op_manager,
vec![*key],
vec![],
)
.await;
}
}

// Emit telemetry for successful subscription
let own_loc = op_manager.ring.connection_manager.own_location();
if let Some(event) = NetEventLog::subscribe_success(
msg_id,
&op_manager.ring,
*key,
own_loc,
None, // hop_count not tracked in subscribe
) {
op_manager.ring.register_events(Either::Left(event)).await;
tracing::info!(tx = %msg_id, contract = %key, phase = "complete", "Subscribe completed (originator)");

// Register local interest so that ChangeInterests from peers
// get properly processed. Without this, when other nodes broadcast
// ChangeInterests for contracts they seed, the has_local_interest()
// check in the ChangeInterests handler fails, preventing peer
// interest registration and breaking update propagation.
if !self.is_renewal {
let became_interested =
op_manager.interest_manager.add_local_client(key);
if became_interested {
super::broadcast_change_interests(
op_manager,
vec![*key],
vec![],
)
.await;
}
}

Ok(OperationResult::ContinueOp(OpEnum::Subscribe(
SubscribeOp {
id,
state: SubscribeState::Completed(CompletedData {
key: *key,
}),
requester_addr: None,
requester_pub_key: None,
is_renewal: self.is_renewal,
stats: self.stats,
ack_received: false,
speculative_paths: 0,
},
)))
// Emit telemetry for successful subscription
let own_loc = op_manager.ring.connection_manager.own_location();
if let Some(event) = NetEventLog::subscribe_success(
msg_id,
&op_manager.ring,
*key,
own_loc,
None, // hop_count not tracked in subscribe
) {
op_manager.ring.register_events(Either::Left(event)).await;
}

Ok(OperationResult::ContinueOp(OpEnum::Subscribe(
SubscribeOp {
id,
state: SubscribeState::Completed(CompletedData { key: *key }),
requester_addr: None,
requester_pub_key: None,
is_renewal: self.is_renewal,
stats: self.stats,
ack_received: false,
speculative_paths: 0,
},
)))
}
SubscribeMsgResult::NotFound => {
tracing::debug!(
Expand Down
75 changes: 75 additions & 0 deletions crates/core/src/ring/hosting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,20 @@ impl HostingManager {
let mut snapshot = TopologySnapshot::new(peer_addr, location);
let now = self.time_source.now();

// Record the raw set of keys that are in `active_subscriptions` right
// now. This is used by regression tests to detect whether a peer
// installed a subscription lease — e.g. the relay-pollution bug fixed
// alongside this field where every forwarder on a SUBSCRIBE response
// path was unconditionally adding itself to active_subscriptions,
// causing feedback-loop renewal. Must be populated BEFORE the merged
// `contracts` map below, which hides active_subscriptions entries
// behind hosting cache presence when both exist.
for entry in self.active_subscriptions.iter() {
if *entry.value() > now {
snapshot.active_subscription_keys.insert(*entry.key().id());
}
}

// Add all hosted contracts
// Collect and sort for deterministic iteration order
let hosting_cache = self.hosting_cache.read();
Expand Down Expand Up @@ -1565,6 +1579,67 @@ mod tests {
// The exclusion test (test_contracts_needing_renewal_excludes_hosted_only)
// covers the correct behavior.

/// Regression: a node that merely relays a SUBSCRIBE response for some
/// other peer must NOT end up with the contract in its own
/// `active_subscriptions`, and consequently must NOT appear in
/// `contracts_needing_renewal()`.
///
/// Before the fix to `operations::subscribe::SubscribeMsgResult::Subscribed`,
/// every relay on a SUBSCRIBE response path called `ring.subscribe(*key)`
/// unconditionally. That installed a lease in `active_subscriptions`,
/// which `contracts_needing_renewal()` path #1 would then pick up every
/// ~2 minutes and spawn a fresh subscribe for — routing through new
/// relays that *also* installed leases, compounding with each cycle.
/// The feedback loop shows up as the 85+ phantom contracts observed on
/// the `technic` peer's local dashboard (see commit message).
///
/// This test models the post-fix relay state as "contract has a
/// downstream subscriber registered, but no `subscribe()` lease", which
/// is what the SUBSCRIBE Response relay branch now does. The assertion
/// is that such a relay does not get recruited into the renewal cycle.
#[test]
fn test_relay_downstream_only_not_in_renewal() {
let manager = HostingManager::new();
let contract = make_contract_key(77);
let downstream = make_peer_key(42);

// Relay state: we've accepted a downstream subscriber for the
// contract, but we have not called `subscribe()` on our own behalf
// (we're just forwarding Updates for someone else) and we have no
// local client expressing interest.
assert!(manager.add_downstream_subscriber(&contract, downstream.clone()));

// Invariant 1: we did not install a self-subscription lease.
assert!(
!manager.is_subscribed(&contract),
"Relay must not have an active subscription lease just from \
registering a downstream subscriber"
);
assert!(
manager.get_subscribed_contracts().is_empty(),
"active_subscriptions must be empty on a pure-relay peer"
);

// Invariant 2: the contract is not in the renewal set. This is the
// load-bearing property: if the relay were in `active_subscriptions`,
// `contracts_needing_renewal()` path #1 (expiring active leases)
// would pick it up and spawn a new subscribe, recruiting more
// relays. Pure downstream registration must NOT trigger renewal.
let needs_renewal = manager.contracts_needing_renewal();
assert!(
!needs_renewal.contains(&contract),
"Pure-relay peer must not appear in contracts_needing_renewal \
(relay-subscription feedback loop regression, see \
subscribe.rs::SubscribeMsgResult::Subscribed)"
);

// Invariant 3: downstream registration still works as intended —
// the relay holds the downstream peer so UPDATE broadcasts can be
// forwarded. This is the *correct* mechanism for a relay to receive
// and propagate updates, without inflating subscription trees.
assert!(manager.has_downstream_subscribers(&contract));
}

// Superseded: startup revalidation window removed in #3546 to prevent
// subscription accumulation storms. Hosted-only contracts are no longer
// proactively renewed at startup. Replaced by test_hosted_contracts_not_renewed_at_scale.
Expand Down
Loading
Loading