-
Notifications
You must be signed in to change notification settings - Fork 85
feat(wire): create connections in batch #865
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
7892cf1
e0027b9
8c13fd3
4ce49a7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,6 +59,12 @@ where | |
| // === CONNECTION CREATION === | ||
|
|
||
| pub(crate) fn create_connection(&mut self, kind: ConnectionKind) -> Result<(), WireError> { | ||
| // Connection with fixed peers should be marked as `manual`, rather than `regular` | ||
| let connection_kind = match (self.fixed_peer.as_ref(), kind) { | ||
| (Some(_), ConnectionKind::Regular(_)) => ConnectionKind::Manual, | ||
| _ => kind, | ||
| }; | ||
|
|
||
| let required_services = match kind { | ||
| ConnectionKind::Regular(services) => services, | ||
| _ => ServiceFlags::NONE, | ||
|
|
@@ -71,7 +77,7 @@ where | |
| .or_else(|| { | ||
| self.address_man.get_address_to_connect( | ||
| required_services, | ||
| matches!(kind, ConnectionKind::Feeler), | ||
| matches!(connection_kind, ConnectionKind::Feeler), | ||
| ) | ||
| }); | ||
|
|
||
|
|
@@ -110,10 +116,10 @@ where | |
| // or if we are connecting to a utreexo peer, since utreexod doesn't support V2 yet. | ||
| let is_fixed = self.fixed_peer.is_some(); | ||
| let allow_v1 = self.config.allow_v1_fallback | ||
| || kind == ConnectionKind::Regular(UTREEXO.into()) | ||
| || connection_kind == ConnectionKind::Regular(UTREEXO.into()) | ||
| || is_fixed; | ||
|
|
||
| self.open_connection(kind, peer_id, address, allow_v1)?; | ||
| self.open_connection(connection_kind, peer_id, address, allow_v1)?; | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
@@ -123,15 +129,19 @@ where | |
| if self.fixed_peer.is_some() { | ||
| return Ok(()); | ||
| } | ||
| self.create_connection(ConnectionKind::Feeler)?; | ||
|
|
||
| for _ in 0..T::NEW_CONNECTIONS_BATCH_SIZE { | ||
| self.create_connection(ConnectionKind::Feeler)?; | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Creates a new outgoing connection with `address`. | ||
| /// | ||
| /// `kind` may or may not be a [`ConnectionKind::Feeler`], a special connection type | ||
| /// that is used to learn about good peers, but are not kept after handshake | ||
| /// (others are [`ConnectionKind::Regular`] and [`ConnectionKind::Extra`]). | ||
| /// (others are [`ConnectionKind::Regular`], [`ConnectionKind::Manual`] and [`ConnectionKind::Extra`]). | ||
| /// | ||
| /// We will always try to open a V2 connection first. If the `allow_v1_fallback` is set, | ||
| /// we may retry the connection with the old V1 protocol if the V2 connection fails. | ||
|
|
@@ -211,6 +221,12 @@ where | |
| match kind { | ||
| ConnectionKind::Feeler => self.last_feeler = Instant::now(), | ||
| ConnectionKind::Regular(_) => self.last_connection = Instant::now(), | ||
| // Note: Crating a manual peer intentionally don't prevent us from checking our peers | ||
| // again, since we might need to disconnect someone to open up space for more | ||
| // utreexo or CBS connections. | ||
| // | ||
| // Extra connections are also not taken into account here because they will probably be | ||
| // short-lived. | ||
| _ => {} | ||
| } | ||
|
|
||
|
|
@@ -569,6 +585,9 @@ where | |
| ) -> Result<(), WireError> { | ||
| // try to connect with manually added peers | ||
| self.maybe_open_connection_with_added_peers()?; | ||
| if self.connected_peers() >= T::MAX_OUTGOING_PEERS { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| // If the user passes in a `--connect` cli argument, we only connect with | ||
| // that particular peer. | ||
|
|
@@ -583,7 +602,7 @@ where | |
| self.maybe_use_hardcoded_addresses(needs_utreexo); | ||
|
|
||
| let connection_kind = ConnectionKind::Regular(required_service); | ||
| if self.peers.len() < T::MAX_OUTGOING_PEERS { | ||
| for _ in 0..T::NEW_CONNECTIONS_BATCH_SIZE { | ||
| self.create_connection(connection_kind)?; | ||
| } | ||
|
Comment on lines
604
to
607
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had this in a private branch, I think makes sense // How many peers we can connect to without exceeding the limit, if any
let peer_capacity = T::MAX_OUTGOING_PEERS.saturating_sub(self.connected_peers());
let connection_kind = ConnectionKind::Regular(required_service);
// Try connecting to at most 4 peers (fewer if max capacity would be reached)
for _ in 0..peer_capacity.min(MAX_OPEN_CONNECTIONS_BATCH) {
self.create_connection(connection_kind)?;
}
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've tried this, but for the last connections we basically revert to the older logic. I had some runs where I would create 8/9 connections super quick, and then just hang for dozens of seconds. Using this logic we can get 10 peers for chain selection pretty quickly.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So you exceed the limit on purpose then you disconnect the exceeded peers. Btw the 8/9 fast connections happens to me on
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes! It should not get bigger than 20 tho, because we also abort the connection pretty quickly if they don't respond
For me as well, I think it has to do with the address manager getting populated by the addresses received from new connections? It's really weird tho. At least for me it doesn't happen on this PR |
||
|
|
||
|
|
@@ -617,7 +636,7 @@ where | |
|
|
||
| // Finally, open the connection with the node | ||
| self.open_connection( | ||
| ConnectionKind::Regular(ServiceFlags::NONE), | ||
| ConnectionKind::Manual, | ||
| peers_count as usize, | ||
| address, | ||
| added_peer.v1_fallback, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -106,7 +106,11 @@ where | |
| self.peers | ||
| .values() | ||
| .filter(|p| { | ||
| p.state == PeerStatus::Ready && matches!(p.kind, ConnectionKind::Regular(_)) | ||
| // Connections that will continue open for as long as we are running (and the other | ||
| // peer don't die) | ||
| let long_lived_connection = matches!(p.kind, ConnectionKind::Regular(_)) | ||
| || matches!(p.kind, ConnectionKind::Manual); | ||
| p.state == PeerStatus::Ready && long_lived_connection | ||
| }) | ||
| .count() | ||
| } | ||
|
|
@@ -249,6 +253,27 @@ where | |
| return Ok(()); | ||
| } | ||
|
|
||
| let good_peers_count = self.connected_peers(); | ||
| if good_peers_count > T::MAX_OUTGOING_PEERS { | ||
| // Don't allow our node to have more than T::MAX_OUTGOING_PEERS, unless this is a | ||
| // manual peer, those can exceed our quota | ||
| if version.kind != ConnectionKind::Manual { | ||
| debug!( | ||
| "Already have {} peers, disconnecting peer to avoid blowing up our max of {}", | ||
| good_peers_count, | ||
| T::MAX_OUTGOING_PEERS | ||
| ); | ||
|
|
||
| self.send_to_peer(peer, NodeRequest::Shutdown)?; | ||
| // this prevents logging "peer xx disconnected" | ||
| self.peers | ||
| .entry(peer) | ||
| .and_modify(|peer| peer.state = PeerStatus::Awaiting); | ||
|
|
||
| return Ok(()); | ||
| } | ||
| } | ||
|
Comment on lines
+256
to
+275
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm I think better to take into account how many peers we could connect to before exceeding the limit BEFORE trying more connections, see next comment
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, to control excess peers we can disconnect slow peers in IBD and random peers once synced... followup PR I have already done privately |
||
|
|
||
| info!( | ||
| "New peer id={} version={} blocks={} services={}", | ||
| version.id, version.user_agent, version.blocks, version.services | ||
|
|
@@ -415,7 +440,10 @@ where | |
| pub(crate) fn handle_disconnection(&mut self, peer: u32, idx: usize) -> Result<(), WireError> { | ||
| if let Some(p) = self.peers.remove(&peer) { | ||
| std::mem::drop(p.channel); | ||
| if matches!(p.kind, ConnectionKind::Regular(_)) && p.state == PeerStatus::Ready { | ||
|
|
||
| let long_lived = matches!(p.kind, ConnectionKind::Regular(_)) | ||
| || matches!(p.kind, ConnectionKind::Manual); | ||
| if long_lived && p.state == PeerStatus::Ready { | ||
| info!("Peer disconnected: {peer}"); | ||
| } | ||
|
|
||
|
|
@@ -474,6 +502,11 @@ where | |
| return Ok(()); | ||
| }; | ||
|
|
||
| // Manual connections are exempt from being punished | ||
| if matches!(peer.kind, ConnectionKind::Manual) { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| peer.banscore += factor; | ||
|
|
||
| // This peer is misbehaving too often, ban it | ||
|
|
@@ -886,7 +919,7 @@ where | |
| return Err(WireError::PeerAlreadyExists(addr, port)); | ||
| } | ||
|
|
||
| let kind = ConnectionKind::Regular(ServiceFlags::NONE); | ||
| let kind = ConnectionKind::Manual; | ||
|
|
||
| // Add this address to our address manager for later | ||
| // assume it has the bare-minimum services, otherwise `push_addresses` will ignore it | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would keep this with 4 peers as batch size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've increased this to make sure we GC the dead peers before attempting a new round. We remove all pending
Connects before trying to add more, in order to keep the inflight connections list small