Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions crates/floresta-wire/src/p2p_wire/node/chain_selector_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ pub enum PeerCheck {

impl NodeContext for ChainSelector {
const REQUEST_TIMEOUT: u64 = 60; // Ban peers stalling our IBD
const TRY_NEW_CONNECTION: u64 = 1; // Try creating connections more aggressively
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep this with 4 peers as batch size

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've increased this to make sure we GC the dead peers before attempting a new round. We remove all pending Connects before trying to add more, in order to keep the inflight connections list small


// Since we don't have any peers when this starts, we use a more aggressive batch
// size to make sure we get our MAX_OUTGOING_CONNECTIONS ASAP
const NEW_CONNECTIONS_BATCH_SIZE: usize = 12;

fn get_required_services(&self) -> ServiceFlags {
ServiceFlags::NETWORK | service_flags::UTREEXO.into() | service_flags::UTREEXO_FILTER.into()
Expand Down Expand Up @@ -177,7 +180,7 @@ where
if let Err(e) = self.chain.accept_header(*header) {
error!("Error while downloading headers from peer={peer} err={e}");

self.send_to_peer(peer, NodeRequest::Shutdown)?;
self.increase_banscore(peer, self.max_banscore)?;

let peer = self.peers.get(&peer).unwrap();
self.common.address_man.update_set_state(
Expand Down Expand Up @@ -563,23 +566,24 @@ where

match liar_state {
PeerCheck::OneLying(liar) => {
self.send_to_peer(liar, NodeRequest::Shutdown)?;
self.increase_banscore(liar, self.max_banscore)?;
if liar == peer1 {
invalid_accs.insert(peer[0].1.clone());
continue;
}
invalid_accs.insert(peer[1].1.clone());
}
PeerCheck::UnresponsivePeer(dead_peer) => {
self.send_to_peer(dead_peer, NodeRequest::Shutdown)?;
self.increase_banscore(dead_peer, self.max_banscore)?;
}
PeerCheck::BothUnresponsivePeers => {
self.send_to_peer(peer1, NodeRequest::Shutdown)?;
self.send_to_peer(peer2, NodeRequest::Shutdown)?;
self.increase_banscore(peer1, self.max_banscore)?;
self.increase_banscore(peer2, self.max_banscore)?;
}
PeerCheck::BothLying => {
self.send_to_peer(peer1, NodeRequest::Shutdown)?;
self.send_to_peer(peer2, NodeRequest::Shutdown)?;
self.increase_banscore(peer1, self.max_banscore)?;
self.increase_banscore(peer2, self.max_banscore)?;

invalid_accs.insert(peer[0].1.clone());
invalid_accs.insert(peer[1].1.clone());
}
Expand Down Expand Up @@ -699,7 +703,7 @@ where
peer.1.address_id as usize,
AddressState::Banned(ChainSelector::BAN_TIME),
);
self.send_to_peer(peer.0, NodeRequest::Shutdown)?;
self.increase_banscore(peer.0, self.max_banscore)?;
}
}

Expand Down Expand Up @@ -838,12 +842,14 @@ where
periodic_job!(
self.last_connection => self.maybe_open_connection(ServiceFlags::NONE),
ChainSelector::TRY_NEW_CONNECTION,
no_log,
);

// Open new feeler connection periodically
periodic_job!(
self.last_feeler => self.open_feeler_connection(),
ChainSelector::FEELER_INTERVAL,
no_log,
);

if let ChainSelectorState::LookingForForks(start) = self.context.state {
Expand Down
33 changes: 26 additions & 7 deletions crates/floresta-wire/src/p2p_wire/node/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ where
// === CONNECTION CREATION ===

pub(crate) fn create_connection(&mut self, kind: ConnectionKind) -> Result<(), WireError> {
// Connection with fixed peers should be marked as `manual`, rather than `regular`
let connection_kind = match (self.fixed_peer.as_ref(), kind) {
(Some(_), ConnectionKind::Regular(_)) => ConnectionKind::Manual,
_ => kind,
};

let required_services = match kind {
ConnectionKind::Regular(services) => services,
_ => ServiceFlags::NONE,
Expand All @@ -71,7 +77,7 @@ where
.or_else(|| {
self.address_man.get_address_to_connect(
required_services,
matches!(kind, ConnectionKind::Feeler),
matches!(connection_kind, ConnectionKind::Feeler),
)
});

Expand Down Expand Up @@ -110,10 +116,10 @@ where
// or if we are connecting to a utreexo peer, since utreexod doesn't support V2 yet.
let is_fixed = self.fixed_peer.is_some();
let allow_v1 = self.config.allow_v1_fallback
|| kind == ConnectionKind::Regular(UTREEXO.into())
|| connection_kind == ConnectionKind::Regular(UTREEXO.into())
|| is_fixed;

self.open_connection(kind, peer_id, address, allow_v1)?;
self.open_connection(connection_kind, peer_id, address, allow_v1)?;

Ok(())
}
Expand All @@ -123,15 +129,19 @@ where
if self.fixed_peer.is_some() {
return Ok(());
}
self.create_connection(ConnectionKind::Feeler)?;

for _ in 0..T::NEW_CONNECTIONS_BATCH_SIZE {
self.create_connection(ConnectionKind::Feeler)?;
}

Ok(())
}

/// Creates a new outgoing connection with `address`.
///
/// `kind` may or may not be a [`ConnectionKind::Feeler`], a special connection type
/// that is used to learn about good peers, but are not kept after handshake
/// (others are [`ConnectionKind::Regular`] and [`ConnectionKind::Extra`]).
/// (others are [`ConnectionKind::Regular`], [`ConnectionKind::Manual`] and [`ConnectionKind::Extra`]).
///
/// We will always try to open a V2 connection first. If the `allow_v1_fallback` is set,
/// we may retry the connection with the old V1 protocol if the V2 connection fails.
Expand Down Expand Up @@ -211,6 +221,12 @@ where
match kind {
ConnectionKind::Feeler => self.last_feeler = Instant::now(),
ConnectionKind::Regular(_) => self.last_connection = Instant::now(),
// Note: Crating a manual peer intentionally don't prevent us from checking our peers
// again, since we might need to disconnect someone to open up space for more
// utreexo or CBS connections.
//
// Extra connections are also not taken into account here because they will probably be
// short-lived.
_ => {}
}

Expand Down Expand Up @@ -569,6 +585,9 @@ where
) -> Result<(), WireError> {
// try to connect with manually added peers
self.maybe_open_connection_with_added_peers()?;
if self.connected_peers() >= T::MAX_OUTGOING_PEERS {
return Ok(());
}

// If the user passes in a `--connect` cli argument, we only connect with
// that particular peer.
Expand All @@ -583,7 +602,7 @@ where
self.maybe_use_hardcoded_addresses(needs_utreexo);

let connection_kind = ConnectionKind::Regular(required_service);
if self.peers.len() < T::MAX_OUTGOING_PEERS {
for _ in 0..T::NEW_CONNECTIONS_BATCH_SIZE {
self.create_connection(connection_kind)?;
}
Comment on lines 604 to 607
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this in a private branch, I think makes sense

        // How many peers we can connect to without exceeding the limit, if any
        let peer_capacity = T::MAX_OUTGOING_PEERS.saturating_sub(self.connected_peers());
        let connection_kind = ConnectionKind::Regular(required_service);

        // Try connecting to at most 4 peers (fewer if max capacity would be reached)
        for _ in 0..peer_capacity.min(MAX_OPEN_CONNECTIONS_BATCH) {
            self.create_connection(connection_kind)?;
        }

Copy link
Member Author

@Davidson-Souza Davidson-Souza Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried this, but for the last connections we basically revert to the older logic. I had some runs where I would create 8/9 connections super quick, and then just hang for dozens of seconds.

Using this logic we can get 10 peers for chain selection pretty quickly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you exceed the limit on purpose then you disconnect the exceeded peers. Btw the 8/9 fast connections happens to me on master, idk why it takes longer to get the 10th peer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you exceed the limit on purpose then you disconnect the exceeded peers.

Yes! It should not get bigger than 20 tho, because we also abort the connection pretty quickly if they don't respond

Btw the 8/9 fast connections happens to me on master, idk why it takes longer to get the 10th peer.

For me as well, I think it has to do with the address manager getting populated by the addresses received from new connections? It's really weird tho. At least for me it doesn't happen on this PR


Expand Down Expand Up @@ -617,7 +636,7 @@ where

// Finally, open the connection with the node
self.open_connection(
ConnectionKind::Regular(ServiceFlags::NONE),
ConnectionKind::Manual,
peers_count as usize,
address,
added_peer.v1_fallback,
Expand Down
23 changes: 22 additions & 1 deletion crates/floresta-wire/src/p2p_wire/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,30 @@ pub(crate) enum InflightRequests {
}

#[derive(Debug, PartialEq, Clone, Copy)]
/// The kind of connection we see this peer as.
///
/// Core's counterpart: https://github.com/bitcoin/bitcoin/blob/bf9ef4f0433551e850a11c2da8baae0ec6439a99/src/node/connection_types.h#L18
pub enum ConnectionKind {
/// A feeler connection is a short-lived connection used to check whether this peer is alive.
///
/// After handshake, we ask for addresses and when we receive an answer we just disconnect,
/// marking this peer as alive in our address manager
Feeler,

/// A regular peer, used to send requests to and learn about transactions and blocks
Regular(ServiceFlags),

/// An extra peer specially created if our tip haven't moved for too long
///
/// If more than [`NodeContext::ASSUME_STALE`] seconds has passed since the
/// processed block. We use this to make sure we are not in a partitioned subnet,
/// unable to learn about new blocks
Extra,

/// A connection that was manually requested by our user. This type of peer won't be banned on
/// misbehaving, and won't respect the [`ServiceFlags`] requirements when creating a
/// connection.
Manual,
}

impl Serialize for ConnectionKind {
Expand All @@ -147,6 +167,7 @@ impl Serialize for ConnectionKind {
ConnectionKind::Feeler => serializer.serialize_str("feeler"),
ConnectionKind::Regular(_) => serializer.serialize_str("regular"),
ConnectionKind::Extra => serializer.serialize_str("extra"),
ConnectionKind::Manual => serializer.serialize_str("manual"),
}
}
}
Expand Down Expand Up @@ -394,7 +415,7 @@ macro_rules! periodic_job {

($timer:expr => $what:expr, $interval_secs:path,no_log $(,)?) => {{
if $timer.elapsed() > Duration::from_secs($interval_secs) {
$what;
let _ = $what;
$timer = Instant::now();
}
}};
Expand Down
39 changes: 36 additions & 3 deletions crates/floresta-wire/src/p2p_wire/node/peer_man.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ where
self.peers
.values()
.filter(|p| {
p.state == PeerStatus::Ready && matches!(p.kind, ConnectionKind::Regular(_))
// Connections that will continue open for as long as we are running (and the other
// peer don't die)
let long_lived_connection = matches!(p.kind, ConnectionKind::Regular(_))
|| matches!(p.kind, ConnectionKind::Manual);
p.state == PeerStatus::Ready && long_lived_connection
})
.count()
}
Expand Down Expand Up @@ -249,6 +253,27 @@ where
return Ok(());
}

let good_peers_count = self.connected_peers();
if good_peers_count > T::MAX_OUTGOING_PEERS {
// Don't allow our node to have more than T::MAX_OUTGOING_PEERS, unless this is a
// manual peer, those can exceed our quota
if version.kind != ConnectionKind::Manual {
debug!(
"Already have {} peers, disconnecting peer to avoid blowing up our max of {}",
good_peers_count,
T::MAX_OUTGOING_PEERS
);

self.send_to_peer(peer, NodeRequest::Shutdown)?;
// this prevents logging "peer xx disconnected"
self.peers
.entry(peer)
.and_modify(|peer| peer.state = PeerStatus::Awaiting);

return Ok(());
}
}
Comment on lines +256 to +275
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm I think better to take into account how many peers we could connect to before exceeding the limit BEFORE trying more connections, see next comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, to control excess peers we can disconnect slow peers in IBD and random peers once synced... followup PR I have already done privately


info!(
"New peer id={} version={} blocks={} services={}",
version.id, version.user_agent, version.blocks, version.services
Expand Down Expand Up @@ -415,7 +440,10 @@ where
pub(crate) fn handle_disconnection(&mut self, peer: u32, idx: usize) -> Result<(), WireError> {
if let Some(p) = self.peers.remove(&peer) {
std::mem::drop(p.channel);
if matches!(p.kind, ConnectionKind::Regular(_)) && p.state == PeerStatus::Ready {

let long_lived = matches!(p.kind, ConnectionKind::Regular(_))
|| matches!(p.kind, ConnectionKind::Manual);
if long_lived && p.state == PeerStatus::Ready {
info!("Peer disconnected: {peer}");
}

Expand Down Expand Up @@ -474,6 +502,11 @@ where
return Ok(());
};

// Manual connections are exempt from being punished
if matches!(peer.kind, ConnectionKind::Manual) {
return Ok(());
}

peer.banscore += factor;

// This peer is misbehaving too often, ban it
Expand Down Expand Up @@ -886,7 +919,7 @@ where
return Err(WireError::PeerAlreadyExists(addr, port));
}

let kind = ConnectionKind::Regular(ServiceFlags::NONE);
let kind = ConnectionKind::Manual;

// Add this address to our address manager for later
// assume it has the bare-minimum services, otherwise `push_addresses` will ignore it
Expand Down
36 changes: 17 additions & 19 deletions crates/floresta-wire/src/p2p_wire/node/running_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use floresta_chain::pruned_utreexo::UpdatableChainstate;
use floresta_chain::ThreadSafeChain;
use floresta_common::service_flags;
use floresta_common::service_flags::UTREEXO;
use rand::random;
use rand::seq::IteratorRandom;
use rand::thread_rng;
use rustreexo::accumulator::stump::Stump;
use tokio::time;
use tokio::time::MissedTickBehavior;
Expand Down Expand Up @@ -57,6 +58,7 @@ pub struct RunningNode {

impl NodeContext for RunningNode {
const REQUEST_TIMEOUT: u64 = 2 * 60;

fn get_required_services(&self) -> ServiceFlags {
ServiceFlags::NETWORK
| service_flags::UTREEXO.into()
Expand Down Expand Up @@ -137,12 +139,11 @@ where
// peer and create a utreexo and CBS connection
if !self.has_utreexo_peers() {
if self.peer_ids.len() == 10 {
let peer = random::<usize>() % self.peer_ids.len();
let peer = self
.peer_ids
.get(peer)
.expect("we've modulo before, we should have it");
self.send_to_peer(*peer, NodeRequest::Shutdown)?;
self.peers
.values()
.filter(|peer| matches!(peer.kind, ConnectionKind::Regular(_)))
.choose(&mut thread_rng())
.and_then(|p| p.channel.send(NodeRequest::Shutdown).ok());
}

self.maybe_open_connection(UTREEXO.into())?;
Expand All @@ -153,12 +154,11 @@ where
return Ok(());
}
if self.peer_ids.len() == 10 {
let peer = random::<usize>() % self.peer_ids.len();
let peer = self
.peer_ids
.get(peer)
.expect("we've modulo before, we should have it");
self.send_to_peer(*peer, NodeRequest::Shutdown)?;
self.peers
.values()
.filter(|peer| matches!(peer.kind, ConnectionKind::Regular(_)))
.choose(&mut thread_rng())
.and_then(|p| p.channel.send(NodeRequest::Shutdown).ok());
}

self.maybe_open_connection(ServiceFlags::COMPACT_FILTERS)?;
Expand Down Expand Up @@ -409,6 +409,7 @@ where
periodic_job!(
self.last_connection => self.check_connections(),
RunningNode::TRY_NEW_CONNECTION,
no_log,
);

// Check if some of our peers have timed out a request
Expand All @@ -418,6 +419,7 @@ where
periodic_job!(
self.last_feeler => self.open_feeler_connection(),
RunningNode::FEELER_INTERVAL,
no_log,
);

// The jobs below need a connected peer to work
Expand Down Expand Up @@ -692,6 +694,7 @@ where
let peer_to_disconnect = self
.peers
.iter()
// Don't disconnect manual connections
.filter(|(_, info)| matches!(info.kind, ConnectionKind::Regular(_)))
.min_by_key(|(k, _)| self.get_peer_score(**k))
.map(|(peer, _)| *peer);
Expand Down Expand Up @@ -720,12 +723,7 @@ where
(peer, Instant::now()),
);
}

// update the peer info
self.peers.entry(peer).and_modify(|info| {
info.kind = ConnectionKind::Regular(peer_info.services);
});
}
}

PeerMessages::Ready(version) => {
debug!(
Expand Down
Loading
Loading