Skip to content

Commit 540c000

Browse files
DeepzimaEvanev7
authored andcommitted
Feat/static peer discovery (exo-explore#1690)
**Enabling peers to be discovered in environments where mDNS is unavailable (SSH sessions, headless servers, Docker).** ## Motivation Exo discovers peers exclusively via mDNS, which works great on a local network but breaks once you move beyond a single L2 broadcast domain: - SSH sessions on macOS — TCC blocks mDNS multicast from non-GUI sessions (exo-explore#1488) - Headless servers/rack machines — exo-explore#1682 ("DGX Spark does not find other nodes") - Docker Compose — mDNS is often unavailable across container networks; e.g. exo-explore#1462 (E2E test framework) needs an alternative Related works: exo-explore#1488 (working implementation made by @AlexCheema and closed because SSH had a GUI workaround), exo-explore#1023 (Headscale WAN then closed due to merge conflicts), exo-explore#1656 (discovery cleanup, open). This PR introduces an optional bootstrap mechanism for peer discovery while leaving the existing mDNS behavior unchanged. ## Changes Adds two new CLI flags: - `--bootstrap-peers` (env: `EXO_BOOTSTRAP_PEERS`) — comma-separated libp2p multiaddrs to dial on startup and retry periodically - `--libp2p-port` — fixed TCP port for libp2p to listen on (default: OS-assigned). Required when bootstrap peers, so other nodes know which port to dial. 8 files: - `rust/networking/src/discovery.rs`: Store bootstrap addrs, dial in existing retry loop - `rust/networking/src/swarm.rs`: Thread `bootstrap_peers` parameter to `Behaviour` - `rust/networking/examples/chatroom.rs`: Updated call site for new create_swarm signature - `rust/networking/tests/bootstrap_peers.rs`: Integration tests - `rust/exo_pyo3_bindings/src/networking.rs`: Accept optional `bootstrap_peers` in PyO3 constructor - `rust/exo_pyo3_bindings/exo_pyo3_bindings.pyi` : Update type stub - `src/exo/routing/router.py`: Pass peers to `NetworkingHandle` - `src/exo/main.py` : `--bootstrap-peers` CLI arg + `EXO_BOOTSTRAP_PEERS` env var ## Why It Works Bootstrap peers are dialed in the existing retry loop — the same path taken by peers when mDNS-discovered. The swarm handles connection, Noise handshake, and gossipsub mesh joining from there. PeerId is intentionally not required in the multiaddr, the Noise handshake discovers it. Docker Compose example: ```yaml services: exo-1: environment: EXO_BOOTSTRAP_PEERS: "/ip4/exo-2/tcp/30000" exo-2: environment: EXO_BOOTSTRAP_PEERS: "/ip4/exo-1/tcp/30000" ``` ## Test Plan ### Manual Testing <details> <summary>Docker Compose config</summary> ``` services: exo-node1: build: context: . dockerfile: Dockerfile.bootstrap-test container_name: exo-bootstrap-node1 hostname: exo-node1 command: ["-q", "--libp2p-port", "30000", "--bootstrap-peers", "/ip4/172.30.20.3/tcp/30000"] environment: - EXO_LIBP2P_NAMESPACE=bootstrap-test ports: - "52415:52415" networks: bootstrap-net: ipv4_address: 172.30.20.2 deploy: resources: limits: memory: 4g exo-node2: build: context: . dockerfile: Dockerfile.bootstrap-test container_name: exo-bootstrap-node2 hostname: exo-node2 command: ["-q", "--libp2p-port", "30000", "--bootstrap-peers", "/ip4/172.30.20.2/tcp/30000"] environment: - EXO_LIBP2P_NAMESPACE=bootstrap-test ports: - "52416:52415" networks: bootstrap-net: ipv4_address: 172.30.20.3 deploy: resources: limits: memory: 4g networks: bootstrap-net: driver: bridge ipam: config: - subnet: 172.30.20.0/24 ``` </details> Two containers on a bridge network (`172.30.20.0/24`), fixed IPs, `--libp2p-port 30000`, cross-referencing `--bootstrap-peers`. Both nodes found each other and established a connection then ran the election protocol. ### Automated Testing 4 Rust integration tests in `rust/networking/tests/bootstrap_peers.rs` (`cargo test -p networking`): | Test | What it verifies | Result | |------|-----------------|--------| | `two_nodes_connect_via_bootstrap_peers` | Node B discovers Node A via bootstrap addr (real TCP connection) | PASS | | `create_swarm_with_empty_bootstrap_peers` | Backward compatibility — no bootstrap peers works | PASS | | `create_swarm_ignores_invalid_bootstrap_addrs` | Invalid multiaddrs silently filtered | PASS | | `create_swarm_with_fixed_port` | `listen_port` parameter works | PASS | All 4 pass. The connection test takes ~6s --------- Signed-off-by: DeepZima <deepzima@outlook.com> Co-authored-by: Evan <evanev7@gmail.com>
1 parent c51d466 commit 540c000

8 files changed

Lines changed: 189 additions & 16 deletions

File tree

rust/exo_pyo3_bindings/exo_pyo3_bindings.pyi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class MessageTooLargeError(builtins.Exception):
4242

4343
@typing.final
4444
class NetworkingHandle:
45-
def __new__(cls, identity: Keypair) -> NetworkingHandle: ...
45+
def __new__(cls, identity: Keypair, bootstrap_peers: list[builtins.str], listen_port: builtins.int) -> NetworkingHandle: ...
4646
async def gossipsub_subscribe(self, topic: builtins.str) -> builtins.bool:
4747
r"""
4848
Subscribe to a `GossipSub` topic.

rust/exo_pyo3_bindings/src/networking.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,12 @@ impl PyNetworkingHandle {
180180
// ---- Lifecycle management methods ----
181181

182182
#[new]
183-
fn py_new(identity: Bound<'_, PyKeypair>) -> PyResult<Self> {
183+
#[pyo3(signature = (identity, bootstrap_peers, listen_port))]
184+
fn py_new(
185+
identity: Bound<'_, PyKeypair>,
186+
bootstrap_peers: Vec<String>,
187+
listen_port: u16,
188+
) -> PyResult<Self> {
184189
// create communication channels
185190
let (to_swarm, from_client) = mpsc::channel(MPSC_CHANNEL_SIZE);
186191

@@ -189,7 +194,9 @@ impl PyNetworkingHandle {
189194

190195
// create networking swarm (within tokio context!! or it crashes)
191196
let _guard = pyo3_async_runtimes::tokio::get_runtime().enter();
192-
let swarm = create_swarm(identity, from_client).pyerr()?.into_stream();
197+
let swarm = create_swarm(identity, from_client, bootstrap_peers, listen_port)
198+
.pyerr()?
199+
.into_stream();
193200

194201
Ok(Self {
195202
swarm: Arc::new(Mutex::new(swarm)),

rust/networking/examples/chatroom.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,14 @@ async fn main() {
1616
let (to_swarm, from_client) = mpsc::channel(20);
1717

1818
// Configure swarm
19-
let mut swarm = swarm::create_swarm(identity::Keypair::generate_ed25519(), from_client)
20-
.expect("Swarm creation failed")
21-
.into_stream();
19+
let mut swarm = swarm::create_swarm(
20+
identity::Keypair::generate_ed25519(),
21+
from_client,
22+
vec![],
23+
0,
24+
)
25+
.expect("Swarm creation failed")
26+
.into_stream();
2227

2328
// Create a Gossipsub topic & subscribe
2429
let (tx, rx) = oneshot::channel();

rust/networking/src/discovery.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ pub struct Behaviour {
104104
// state-tracking for managed behaviors & mDNS-discovered peers
105105
managed: managed::Behaviour,
106106
mdns_discovered: HashMap<PeerId, BTreeSet<Multiaddr>>,
107+
bootstrap_peers: Vec<Multiaddr>,
107108

108109
retry_delay: Delay, // retry interval
109110

@@ -112,10 +113,11 @@ pub struct Behaviour {
112113
}
113114

114115
impl Behaviour {
115-
pub fn new(keypair: &identity::Keypair) -> io::Result<Self> {
116+
pub fn new(keypair: &identity::Keypair, bootstrap_peers: Vec<Multiaddr>) -> io::Result<Self> {
116117
Ok(Self {
117118
managed: managed::Behaviour::new(keypair)?,
118119
mdns_discovered: HashMap::new(),
120+
bootstrap_peers,
119121
retry_delay: Delay::new(RETRY_CONNECT_INTERVAL),
120122
pending_events: WakerDeque::new(),
121123
})
@@ -368,6 +370,12 @@ impl NetworkBehaviour for Behaviour {
368370
self.dial(p, ma)
369371
}
370372
}
373+
// dial bootstrap peers (for environments where mDNS is unavailable)
374+
for addr in &self.bootstrap_peers {
375+
self.pending_events.push_back(ToSwarm::Dial {
376+
opts: DialOpts::unknown_peer_id().address(addr.clone()).build(),
377+
})
378+
}
371379
self.retry_delay.reset(RETRY_CONNECT_INTERVAL) // reset timeout
372380
}
373381

rust/networking/src/swarm.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,19 +142,29 @@ fn filter_swarm_event(event: SwarmEvent<BehaviourEvent>) -> Option<FromSwarm> {
142142
}
143143
}
144144

145-
/// Create and configure a swarm which listens to all ports on OS
145+
/// Create and configure a swarm.
146+
///
147+
/// - `listen_port`: TCP port to listen on. `0` lets the OS assign one.
148+
/// - `bootstrap_peers`: multiaddrs to dial for environments without mDNS.
146149
pub fn create_swarm(
147150
keypair: identity::Keypair,
148151
from_client: mpsc::Receiver<ToSwarm>,
152+
bootstrap_peers: Vec<String>,
153+
listen_port: u16,
149154
) -> alias::AnyResult<Swarm> {
155+
let parsed_bootstrap_peers: Vec<libp2p::Multiaddr> = bootstrap_peers
156+
.iter()
157+
.filter(|s| !s.is_empty())
158+
.filter_map(|s| s.parse().ok())
159+
.collect();
160+
150161
let mut swarm = SwarmBuilder::with_existing_identity(keypair)
151162
.with_tokio()
152163
.with_other_transport(tcp_transport)?
153-
.with_behaviour(Behaviour::new)?
164+
.with_behaviour(|keypair| Behaviour::new(keypair, parsed_bootstrap_peers))?
154165
.build();
155166

156-
// Listen on all interfaces and whatever port the OS assigns
157-
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
167+
swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{listen_port}").parse()?)?;
158168
Ok(Swarm { swarm, from_client })
159169
}
160170

@@ -246,9 +256,12 @@ mod behaviour {
246256
}
247257

248258
impl Behaviour {
249-
pub fn new(keypair: &identity::Keypair) -> alias::AnyResult<Self> {
259+
pub fn new(
260+
keypair: &identity::Keypair,
261+
bootstrap_peers: Vec<libp2p::Multiaddr>,
262+
) -> alias::AnyResult<Self> {
250263
Ok(Self {
251-
discovery: discovery::Behaviour::new(keypair)?,
264+
discovery: discovery::Behaviour::new(keypair, bootstrap_peers)?,
252265
gossipsub: gossipsub_behaviour(keypair),
253266
})
254267
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
use futures_lite::StreamExt;
2+
use networking::swarm::{FromSwarm, create_swarm};
3+
use std::time::Duration;
4+
use tokio::sync::mpsc;
5+
use tokio::time::timeout;
6+
7+
/// Helper: find a free TCP port.
8+
fn free_port() -> u16 {
9+
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
10+
listener.local_addr().unwrap().port()
11+
}
12+
13+
/// Two nodes connect via bootstrap peers — no mDNS needed.
14+
///
15+
/// Node A listens on a fixed port. Node B bootstraps to A's address.
16+
/// We verify that B emits `FromSwarm::Discovered` for A's peer ID.
17+
#[tokio::test]
18+
async fn two_nodes_connect_via_bootstrap_peers() {
19+
let port_a = free_port();
20+
21+
// Node A: listens on a known port, no bootstrap peers
22+
let keypair_a = libp2p::identity::Keypair::generate_ed25519();
23+
let peer_id_a = keypair_a.public().to_peer_id();
24+
let (_tx_a, rx_a) = mpsc::channel(16);
25+
let swarm_a = create_swarm(keypair_a, rx_a, vec![], port_a).expect("create swarm A");
26+
let mut stream_a = swarm_a.into_stream();
27+
28+
// Node B: bootstraps to A's address
29+
let keypair_b = libp2p::identity::Keypair::generate_ed25519();
30+
let (_tx_b, rx_b) = mpsc::channel(16);
31+
let swarm_b = create_swarm(
32+
keypair_b,
33+
rx_b,
34+
vec![format!("/ip4/127.0.0.1/tcp/{port_a}")],
35+
0,
36+
)
37+
.expect("create swarm B");
38+
let mut stream_b = swarm_b.into_stream();
39+
40+
// Wait for B to discover A (connection established)
41+
let connected = timeout(Duration::from_secs(10), async {
42+
loop {
43+
tokio::select! {
44+
Some(event) = stream_a.next() => {
45+
// A will also see B connect, but we check from B's perspective
46+
let _ = event;
47+
}
48+
Some(event) = stream_b.next() => {
49+
if let FromSwarm::Discovered { peer_id } = event {
50+
if peer_id == peer_id_a {
51+
return true;
52+
}
53+
}
54+
}
55+
}
56+
}
57+
})
58+
.await;
59+
60+
assert!(
61+
connected.is_ok() && connected.unwrap(),
62+
"Node B should discover Node A via bootstrap peer"
63+
);
64+
}
65+
66+
/// Empty bootstrap peers should work (backward compatible).
67+
#[tokio::test]
68+
async fn create_swarm_with_empty_bootstrap_peers() {
69+
let keypair = libp2p::identity::Keypair::generate_ed25519();
70+
let (_tx, rx) = mpsc::channel(16);
71+
let swarm = create_swarm(keypair, rx, vec![], 0);
72+
assert!(
73+
swarm.is_ok(),
74+
"create_swarm with no bootstrap peers should succeed"
75+
);
76+
}
77+
78+
/// Invalid multiaddr strings are silently filtered out.
79+
#[tokio::test]
80+
async fn create_swarm_ignores_invalid_bootstrap_addrs() {
81+
let keypair = libp2p::identity::Keypair::generate_ed25519();
82+
let (_tx, rx) = mpsc::channel(16);
83+
let swarm = create_swarm(
84+
keypair,
85+
rx,
86+
vec![
87+
"not-a-valid-multiaddr".to_string(),
88+
"".to_string(),
89+
"/ip4/10.0.0.1/tcp/30000".to_string(), // valid
90+
],
91+
0,
92+
);
93+
assert!(
94+
swarm.is_ok(),
95+
"create_swarm should succeed even with invalid bootstrap addrs"
96+
);
97+
}
98+
99+
/// Fixed listen port works correctly.
100+
#[tokio::test]
101+
async fn create_swarm_with_fixed_port() {
102+
let port = free_port();
103+
let keypair = libp2p::identity::Keypair::generate_ed25519();
104+
let (_tx, rx) = mpsc::channel(16);
105+
let swarm = create_swarm(keypair, rx, vec![], port);
106+
assert!(swarm.is_ok(), "create_swarm with fixed port should succeed");
107+
}

src/exo/main.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ async def create(cls, args: "Args") -> Self:
5757
keypair = get_node_id_keypair()
5858
node_id = NodeId(keypair.to_node_id())
5959
session_id = SessionId(master_node_id=node_id, election_clock=0)
60-
router = Router.create(keypair)
60+
router = Router.create(
61+
keypair,
62+
bootstrap_peers=args.bootstrap_peers,
63+
listen_port=args.libp2p_port,
64+
)
6165
await router.register_topic(topics.GLOBAL_EVENTS)
6266
await router.register_topic(topics.LOCAL_EVENTS)
6367
await router.register_topic(topics.COMMANDS)
@@ -493,6 +497,9 @@ def main():
493497
if args.offline:
494498
logger.info("Running in OFFLINE mode — no internet checks, local models only")
495499

500+
if args.bootstrap_peers:
501+
logger.info(f"Bootstrap peers: {args.bootstrap_peers}")
502+
496503
if args.no_batch:
497504
os.environ["EXO_NO_BATCH"] = "1"
498505
logger.info("Continuous batching disabled (--no-batch)")
@@ -529,6 +536,8 @@ class Args(CamelCaseModel):
529536
offline: bool = os.getenv("EXO_OFFLINE", "false").lower() == "true"
530537
no_batch: bool = False
531538
fast_synch: bool | None = None # None = auto, True = force on, False = force off
539+
bootstrap_peers: list[str] = []
540+
libp2p_port: int
532541

533542
@classmethod
534543
def parse(cls) -> Self:
@@ -586,6 +595,22 @@ def parse(cls) -> Self:
586595
action="store_true",
587596
help="Disable continuous batching, use sequential generation",
588597
)
598+
parser.add_argument(
599+
"--bootstrap-peers",
600+
type=lambda s: [p for p in s.split(",") if p],
601+
default=os.getenv("EXO_BOOTSTRAP_PEERS", "").split(",")
602+
if os.getenv("EXO_BOOTSTRAP_PEERS")
603+
else [],
604+
dest="bootstrap_peers",
605+
help="Comma-separated libp2p multiaddrs to dial on startup (env: EXO_BOOTSTRAP_PEERS)",
606+
)
607+
parser.add_argument(
608+
"--libp2p-port",
609+
type=int,
610+
default=0,
611+
dest="libp2p_port",
612+
help="Fixed TCP port for libp2p to listen on (0 = OS-assigned).",
613+
)
589614
fast_synch_group = parser.add_mutually_exclusive_group()
590615
fast_synch_group.add_argument(
591616
"--fast-synch",

src/exo/routing/router.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from collections.abc import Sequence
12
from copy import copy
23
from itertools import count
34
from math import inf
@@ -102,8 +103,15 @@ async def _send_out(self, item: T):
102103

103104
class Router:
104105
@classmethod
105-
def create(cls, identity: Keypair) -> "Router":
106-
return cls(handle=NetworkingHandle(identity))
106+
def create(
107+
cls,
108+
identity: Keypair,
109+
bootstrap_peers: Sequence[str] = (),
110+
listen_port: int = 0,
111+
) -> "Router":
112+
return cls(
113+
handle=NetworkingHandle(identity, list(bootstrap_peers), listen_port)
114+
)
107115

108116
def __init__(self, handle: NetworkingHandle):
109117
self.topic_routers: dict[str, TopicRouter[CamelCaseModel]] = {}

0 commit comments

Comments
 (0)