diff --git a/crates/floresta-wire/src/p2p_wire/node/chain_selector_ctx.rs b/crates/floresta-wire/src/p2p_wire/node/chain_selector_ctx.rs index d9bddcc3f..d8cde426c 100644 --- a/crates/floresta-wire/src/p2p_wire/node/chain_selector_ctx.rs +++ b/crates/floresta-wire/src/p2p_wire/node/chain_selector_ctx.rs @@ -139,7 +139,10 @@ pub enum PeerCheck { impl NodeContext for ChainSelector { const REQUEST_TIMEOUT: u64 = 60; // Ban peers stalling our IBD - const TRY_NEW_CONNECTION: u64 = 1; // Try creating connections more aggressively + + // Since we don't have any peers when this starts, we use a more aggressive batch + // size to make sure we get our MAX_OUTGOING_CONNECTIONS ASAP + const NEW_CONNECTIONS_BATCH_SIZE: usize = 12; fn get_required_services(&self) -> ServiceFlags { ServiceFlags::NETWORK | service_flags::UTREEXO.into() | service_flags::UTREEXO_FILTER.into() @@ -177,7 +180,7 @@ where if let Err(e) = self.chain.accept_header(*header) { error!("Error while downloading headers from peer={peer} err={e}"); - self.send_to_peer(peer, NodeRequest::Shutdown)?; + self.increase_banscore(peer, self.max_banscore)?; let peer = self.peers.get(&peer).unwrap(); self.common.address_man.update_set_state( @@ -563,7 +566,7 @@ where match liar_state { PeerCheck::OneLying(liar) => { - self.send_to_peer(liar, NodeRequest::Shutdown)?; + self.increase_banscore(liar, self.max_banscore)?; if liar == peer1 { invalid_accs.insert(peer[0].1.clone()); continue; @@ -571,15 +574,16 @@ where invalid_accs.insert(peer[1].1.clone()); } PeerCheck::UnresponsivePeer(dead_peer) => { - self.send_to_peer(dead_peer, NodeRequest::Shutdown)?; + self.increase_banscore(dead_peer, self.max_banscore)?; } PeerCheck::BothUnresponsivePeers => { - self.send_to_peer(peer1, NodeRequest::Shutdown)?; - self.send_to_peer(peer2, NodeRequest::Shutdown)?; + self.increase_banscore(peer1, self.max_banscore)?; + self.increase_banscore(peer2, self.max_banscore)?; } PeerCheck::BothLying => { - self.send_to_peer(peer1, NodeRequest::Shutdown)?; - self.send_to_peer(peer2, NodeRequest::Shutdown)?; + self.increase_banscore(peer1, self.max_banscore)?; + self.increase_banscore(peer2, self.max_banscore)?; + invalid_accs.insert(peer[0].1.clone()); invalid_accs.insert(peer[1].1.clone()); } @@ -699,7 +703,7 @@ where peer.1.address_id as usize, AddressState::Banned(ChainSelector::BAN_TIME), ); - self.send_to_peer(peer.0, NodeRequest::Shutdown)?; + self.increase_banscore(peer.0, self.max_banscore)?; } } @@ -838,12 +842,14 @@ where periodic_job!( self.last_connection => self.maybe_open_connection(ServiceFlags::NONE), ChainSelector::TRY_NEW_CONNECTION, + no_log, ); // Open new feeler connection periodically periodic_job!( self.last_feeler => self.open_feeler_connection(), ChainSelector::FEELER_INTERVAL, + no_log, ); if let ChainSelectorState::LookingForForks(start) = self.context.state { diff --git a/crates/floresta-wire/src/p2p_wire/node/conn.rs b/crates/floresta-wire/src/p2p_wire/node/conn.rs index f5350a29e..d5be27315 100644 --- a/crates/floresta-wire/src/p2p_wire/node/conn.rs +++ b/crates/floresta-wire/src/p2p_wire/node/conn.rs @@ -59,6 +59,12 @@ where // === CONNECTION CREATION === pub(crate) fn create_connection(&mut self, kind: ConnectionKind) -> Result<(), WireError> { + // Connection with fixed peers should be marked as `manual`, rather than `regular` + let connection_kind = match (self.fixed_peer.as_ref(), kind) { + (Some(_), ConnectionKind::Regular(_)) => ConnectionKind::Manual, + _ => kind, + }; + let required_services = match kind { ConnectionKind::Regular(services) => services, _ => ServiceFlags::NONE, @@ -71,7 +77,7 @@ where .or_else(|| { self.address_man.get_address_to_connect( required_services, - matches!(kind, ConnectionKind::Feeler), + matches!(connection_kind, ConnectionKind::Feeler), ) }); @@ -110,10 +116,10 @@ where // or if we are connecting to a utreexo peer, since utreexod doesn't support V2 yet. let is_fixed = self.fixed_peer.is_some(); let allow_v1 = self.config.allow_v1_fallback - || kind == ConnectionKind::Regular(UTREEXO.into()) + || connection_kind == ConnectionKind::Regular(UTREEXO.into()) || is_fixed; - self.open_connection(kind, peer_id, address, allow_v1)?; + self.open_connection(connection_kind, peer_id, address, allow_v1)?; Ok(()) } @@ -123,7 +129,11 @@ where if self.fixed_peer.is_some() { return Ok(()); } - self.create_connection(ConnectionKind::Feeler)?; + + for _ in 0..T::NEW_CONNECTIONS_BATCH_SIZE { + self.create_connection(ConnectionKind::Feeler)?; + } + Ok(()) } @@ -131,7 +141,7 @@ where /// /// `kind` may or may not be a [`ConnectionKind::Feeler`], a special connection type /// that is used to learn about good peers, but are not kept after handshake - /// (others are [`ConnectionKind::Regular`] and [`ConnectionKind::Extra`]). + /// (others are [`ConnectionKind::Regular`], [`ConnectionKind::Manual`] and [`ConnectionKind::Extra`]). /// /// We will always try to open a V2 connection first. If the `allow_v1_fallback` is set, /// we may retry the connection with the old V1 protocol if the V2 connection fails. @@ -211,6 +221,12 @@ where match kind { ConnectionKind::Feeler => self.last_feeler = Instant::now(), ConnectionKind::Regular(_) => self.last_connection = Instant::now(), + // Note: Crating a manual peer intentionally don't prevent us from checking our peers + // again, since we might need to disconnect someone to open up space for more + // utreexo or CBS connections. + // + // Extra connections are also not taken into account here because they will probably be + // short-lived. _ => {} } @@ -569,6 +585,9 @@ where ) -> Result<(), WireError> { // try to connect with manually added peers self.maybe_open_connection_with_added_peers()?; + if self.connected_peers() >= T::MAX_OUTGOING_PEERS { + return Ok(()); + } // If the user passes in a `--connect` cli argument, we only connect with // that particular peer. @@ -583,7 +602,7 @@ where self.maybe_use_hardcoded_addresses(needs_utreexo); let connection_kind = ConnectionKind::Regular(required_service); - if self.peers.len() < T::MAX_OUTGOING_PEERS { + for _ in 0..T::NEW_CONNECTIONS_BATCH_SIZE { self.create_connection(connection_kind)?; } @@ -617,7 +636,7 @@ where // Finally, open the connection with the node self.open_connection( - ConnectionKind::Regular(ServiceFlags::NONE), + ConnectionKind::Manual, peers_count as usize, address, added_peer.v1_fallback, diff --git a/crates/floresta-wire/src/p2p_wire/node/mod.rs b/crates/floresta-wire/src/p2p_wire/node/mod.rs index 038be04fb..d32981a04 100644 --- a/crates/floresta-wire/src/p2p_wire/node/mod.rs +++ b/crates/floresta-wire/src/p2p_wire/node/mod.rs @@ -132,10 +132,30 @@ pub(crate) enum InflightRequests { } #[derive(Debug, PartialEq, Clone, Copy)] +/// The kind of connection we see this peer as. +/// +/// Core's counterpart: https://github.com/bitcoin/bitcoin/blob/bf9ef4f0433551e850a11c2da8baae0ec6439a99/src/node/connection_types.h#L18 pub enum ConnectionKind { + /// A feeler connection is a short-lived connection used to check whether this peer is alive. + /// + /// After handshake, we ask for addresses and when we receive an answer we just disconnect, + /// marking this peer as alive in our address manager Feeler, + + /// A regular peer, used to send requests to and learn about transactions and blocks Regular(ServiceFlags), + + /// An extra peer specially created if our tip haven't moved for too long + /// + /// If more than [`NodeContext::ASSUME_STALE`] seconds has passed since the + /// processed block. We use this to make sure we are not in a partitioned subnet, + /// unable to learn about new blocks Extra, + + /// A connection that was manually requested by our user. This type of peer won't be banned on + /// misbehaving, and won't respect the [`ServiceFlags`] requirements when creating a + /// connection. + Manual, } impl Serialize for ConnectionKind { @@ -147,6 +167,7 @@ impl Serialize for ConnectionKind { ConnectionKind::Feeler => serializer.serialize_str("feeler"), ConnectionKind::Regular(_) => serializer.serialize_str("regular"), ConnectionKind::Extra => serializer.serialize_str("extra"), + ConnectionKind::Manual => serializer.serialize_str("manual"), } } } @@ -394,7 +415,7 @@ macro_rules! periodic_job { ($timer:expr => $what:expr, $interval_secs:path,no_log $(,)?) => {{ if $timer.elapsed() > Duration::from_secs($interval_secs) { - $what; + let _ = $what; $timer = Instant::now(); } }}; diff --git a/crates/floresta-wire/src/p2p_wire/node/peer_man.rs b/crates/floresta-wire/src/p2p_wire/node/peer_man.rs index 2796da117..1eafa6ced 100644 --- a/crates/floresta-wire/src/p2p_wire/node/peer_man.rs +++ b/crates/floresta-wire/src/p2p_wire/node/peer_man.rs @@ -106,7 +106,11 @@ where self.peers .values() .filter(|p| { - p.state == PeerStatus::Ready && matches!(p.kind, ConnectionKind::Regular(_)) + // Connections that will continue open for as long as we are running (and the other + // peer don't die) + let long_lived_connection = matches!(p.kind, ConnectionKind::Regular(_)) + || matches!(p.kind, ConnectionKind::Manual); + p.state == PeerStatus::Ready && long_lived_connection }) .count() } @@ -249,6 +253,27 @@ where return Ok(()); } + let good_peers_count = self.connected_peers(); + if good_peers_count > T::MAX_OUTGOING_PEERS { + // Don't allow our node to have more than T::MAX_OUTGOING_PEERS, unless this is a + // manual peer, those can exceed our quota + if version.kind != ConnectionKind::Manual { + debug!( + "Already have {} peers, disconnecting peer to avoid blowing up our max of {}", + good_peers_count, + T::MAX_OUTGOING_PEERS + ); + + self.send_to_peer(peer, NodeRequest::Shutdown)?; + // this prevents logging "peer xx disconnected" + self.peers + .entry(peer) + .and_modify(|peer| peer.state = PeerStatus::Awaiting); + + return Ok(()); + } + } + info!( "New peer id={} version={} blocks={} services={}", version.id, version.user_agent, version.blocks, version.services @@ -415,7 +440,10 @@ where pub(crate) fn handle_disconnection(&mut self, peer: u32, idx: usize) -> Result<(), WireError> { if let Some(p) = self.peers.remove(&peer) { std::mem::drop(p.channel); - if matches!(p.kind, ConnectionKind::Regular(_)) && p.state == PeerStatus::Ready { + + let long_lived = matches!(p.kind, ConnectionKind::Regular(_)) + || matches!(p.kind, ConnectionKind::Manual); + if long_lived && p.state == PeerStatus::Ready { info!("Peer disconnected: {peer}"); } @@ -474,6 +502,11 @@ where return Ok(()); }; + // Manual connections are exempt from being punished + if matches!(peer.kind, ConnectionKind::Manual) { + return Ok(()); + } + peer.banscore += factor; // This peer is misbehaving too often, ban it @@ -886,7 +919,7 @@ where return Err(WireError::PeerAlreadyExists(addr, port)); } - let kind = ConnectionKind::Regular(ServiceFlags::NONE); + let kind = ConnectionKind::Manual; // Add this address to our address manager for later // assume it has the bare-minimum services, otherwise `push_addresses` will ignore it diff --git a/crates/floresta-wire/src/p2p_wire/node/running_ctx.rs b/crates/floresta-wire/src/p2p_wire/node/running_ctx.rs index 9db7ec6f7..cd2e0f2bd 100644 --- a/crates/floresta-wire/src/p2p_wire/node/running_ctx.rs +++ b/crates/floresta-wire/src/p2p_wire/node/running_ctx.rs @@ -17,7 +17,8 @@ use floresta_chain::pruned_utreexo::UpdatableChainstate; use floresta_chain::ThreadSafeChain; use floresta_common::service_flags; use floresta_common::service_flags::UTREEXO; -use rand::random; +use rand::seq::IteratorRandom; +use rand::thread_rng; use rustreexo::accumulator::stump::Stump; use tokio::time; use tokio::time::MissedTickBehavior; @@ -57,6 +58,7 @@ pub struct RunningNode { impl NodeContext for RunningNode { const REQUEST_TIMEOUT: u64 = 2 * 60; + fn get_required_services(&self) -> ServiceFlags { ServiceFlags::NETWORK | service_flags::UTREEXO.into() @@ -137,12 +139,11 @@ where // peer and create a utreexo and CBS connection if !self.has_utreexo_peers() { if self.peer_ids.len() == 10 { - let peer = random::() % self.peer_ids.len(); - let peer = self - .peer_ids - .get(peer) - .expect("we've modulo before, we should have it"); - self.send_to_peer(*peer, NodeRequest::Shutdown)?; + self.peers + .values() + .filter(|peer| matches!(peer.kind, ConnectionKind::Regular(_))) + .choose(&mut thread_rng()) + .and_then(|p| p.channel.send(NodeRequest::Shutdown).ok()); } self.maybe_open_connection(UTREEXO.into())?; @@ -153,12 +154,11 @@ where return Ok(()); } if self.peer_ids.len() == 10 { - let peer = random::() % self.peer_ids.len(); - let peer = self - .peer_ids - .get(peer) - .expect("we've modulo before, we should have it"); - self.send_to_peer(*peer, NodeRequest::Shutdown)?; + self.peers + .values() + .filter(|peer| matches!(peer.kind, ConnectionKind::Regular(_))) + .choose(&mut thread_rng()) + .and_then(|p| p.channel.send(NodeRequest::Shutdown).ok()); } self.maybe_open_connection(ServiceFlags::COMPACT_FILTERS)?; @@ -409,6 +409,7 @@ where periodic_job!( self.last_connection => self.check_connections(), RunningNode::TRY_NEW_CONNECTION, + no_log, ); // Check if some of our peers have timed out a request @@ -418,6 +419,7 @@ where periodic_job!( self.last_feeler => self.open_feeler_connection(), RunningNode::FEELER_INTERVAL, + no_log, ); // The jobs below need a connected peer to work @@ -692,6 +694,7 @@ where let peer_to_disconnect = self .peers .iter() + // Don't disconnect manual connections .filter(|(_, info)| matches!(info.kind, ConnectionKind::Regular(_))) .min_by_key(|(k, _)| self.get_peer_score(**k)) .map(|(peer, _)| *peer); @@ -720,12 +723,7 @@ where (peer, Instant::now()), ); } - - // update the peer info - self.peers.entry(peer).and_modify(|info| { - info.kind = ConnectionKind::Regular(peer_info.services); - }); - } + } PeerMessages::Ready(version) => { debug!( diff --git a/crates/floresta-wire/src/p2p_wire/node/sync_ctx.rs b/crates/floresta-wire/src/p2p_wire/node/sync_ctx.rs index 97794f940..11113f7a7 100644 --- a/crates/floresta-wire/src/p2p_wire/node/sync_ctx.rs +++ b/crates/floresta-wire/src/p2p_wire/node/sync_ctx.rs @@ -8,7 +8,7 @@ use floresta_chain::proof_util; use floresta_chain::ThreadSafeChain; use floresta_common::service_flags; use floresta_common::service_flags::UTREEXO; -use rand::seq::SliceRandom; +use rand::seq::IteratorRandom; use rand::thread_rng; use tokio::time; use tokio::time::MissedTickBehavior; @@ -147,19 +147,14 @@ where // // FIXME: We should actually disconnect the slowest non-utreexo peer, to // make sure we can download blocks faster. - let mut non_utreexo_peers = Vec::new(); - for (id, peer) in self.peers.iter() { - if !peer.services.has(UTREEXO.into()) { - non_utreexo_peers.push(*id); - } - } - - let peer_to_disconnect = *non_utreexo_peers + self.peers + .values() + .filter(|peer| { + matches!(peer.kind, ConnectionKind::Regular(_)) + && !peer.services.has(UTREEXO.into()) + }) .choose(&mut thread_rng()) - .expect("infallible: the `if` clause implies we have some non-utreexo peers"); - - info!("Disconnecting non-utreexo peer {peer_to_disconnect} to open up more space for utreexo peers"); - self.send_to_peer(peer_to_disconnect, NodeRequest::Shutdown)?; + .and_then(|p| p.channel.send(NodeRequest::Shutdown).ok()); } if utreexo_peers < 2 { @@ -249,12 +244,14 @@ where periodic_job!( self.last_connection => self.check_connections(), SyncNode::TRY_NEW_CONNECTION, + no_log, ); // Open new feeler connection periodically periodic_job!( self.last_feeler => self.open_feeler_connection(), SyncNode::FEELER_INTERVAL, + no_log, ); try_and_log!(self.check_for_timeout()); diff --git a/crates/floresta-wire/src/p2p_wire/node_context.rs b/crates/floresta-wire/src/p2p_wire/node_context.rs index 100dcd947..224fb5e94 100644 --- a/crates/floresta-wire/src/p2p_wire/node_context.rs +++ b/crates/floresta-wire/src/p2p_wire/node_context.rs @@ -44,7 +44,7 @@ pub trait NodeContext { const PEER_DB_DUMP_INTERVAL: u64 = 30; // 30 seconds /// Attempt to open a new connection (if needed) every TRY_NEW_CONNECTION seconds - const TRY_NEW_CONNECTION: u64 = 10; // 10 seconds + const TRY_NEW_CONNECTION: u64 = 2; // 2 seconds /// If ASSUME_STALE seconds passed since our last tip update, treat it as stale const ASSUME_STALE: u64 = 15 * 60; // 15 minutes @@ -70,8 +70,9 @@ pub trait NodeContext { /// How often we send our addresses to our peers const SEND_ADDRESSES_INTERVAL: u64 = 60 * 60; // 1 hour - /// How long should we wait for a peer to respond our connection request - const CONNECTION_TIMEOUT: u64 = 30; // 30 seconds + /// How long should we wait for a peer to respond our connection request. This shouldn't be + /// greater than `TRY_NEW_CONNECTION` in order to clear timed-out requests at the same pace. + const CONNECTION_TIMEOUT: u64 = 2; // 2 seconds /// How many blocks we can ask in the same request const BLOCKS_PER_GETDATA: usize = 5; @@ -82,6 +83,9 @@ pub trait NodeContext { /// How often we perform the main loop maintenance tasks (checking for timeouts, peers, etc.) const MAINTENANCE_TICK: Duration = Duration::from_secs(1); + /// How many connections we try at once + const NEW_CONNECTIONS_BATCH_SIZE: usize = 4; + fn get_required_services(&self) -> ServiceFlags { ServiceFlags::NETWORK } diff --git a/tests/florestad/node-info.py b/tests/florestad/node-info.py index d9d643756..522efc918 100644 --- a/tests/florestad/node-info.py +++ b/tests/florestad/node-info.py @@ -43,7 +43,7 @@ def run_test(self): peer_info = self.florestad.rpc.get_peerinfo() self.assertEqual(peer_info[0]["address"], self.bitcoind.p2p_url) - self.assertEqual(peer_info[0]["kind"], "regular") + self.assertEqual(peer_info[0]["kind"], "manual") self.assertEqual( peer_info[0]["services"], "ServiceFlags(NETWORK|WITNESS|NETWORK_LIMITED|P2P_V2)",