Skip to content

Commit 54d6f4e

Browse files
authored
dht: do not err on context deadline (#6581)
1 parent 3a91b39 commit 54d6f4e

2 files changed

Lines changed: 51 additions & 37 deletions

File tree

network/p2p/capabilities.go

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package p2p
1818

1919
import (
2020
"context"
21+
"errors"
2122
randv1 "math/rand"
22-
"math/rand/v2"
2323
"sync"
2424
"time"
2525

@@ -130,39 +130,15 @@ func (c *CapabilitiesDiscovery) AdvertiseCapabilities(capabilities ...Capability
130130
eb := ebf()
131131

132132
for {
133-
// shuffle capabilities to advertise in random order
134-
// since the DHT's internal advertisement happens concurrently for peers in its routing table
135-
// any peer error does not prevent advertisement of other peers.
136-
// on repeated advertisement, we want to avoid the same order to make sure all capabilities are advertised.
137-
if len(capabilities) > 1 {
138-
rand.Shuffle(len(capabilities), func(i, j int) {
139-
capabilities[i], capabilities[j] = capabilities[j], capabilities[i]
140-
})
141-
}
142133
select {
143134
case <-c.dht.Context().Done():
144135
return
145136
case <-nextExecution:
146-
var err error
147-
advertisementInterval := maxAdvertisementInterval
148-
for _, capa := range capabilities {
149-
ttl, err0 := c.advertise(c.dht.Context(), string(capa))
150-
if err0 != nil {
151-
err = err0
152-
loggerFn := c.log.Errorf
153-
if err0 == kbucket.ErrLookupFailure {
154-
// No peers in a routing table, it is typical for startup and not an error
155-
loggerFn = c.log.Debugf
156-
}
157-
loggerFn("failed to advertise for capability %s: %v", capa, err0)
158-
break
159-
}
160-
if ttl < advertisementInterval {
161-
advertisementInterval = ttl
162-
}
163-
c.log.Infof("advertised capability %s", capa)
137+
advertisementInterval, err := c.advertiseCaps(capabilities)
138+
if c.dht.Context().Err() != nil {
139+
return
164140
}
165-
// If we failed to advertise, retry every according to exp jitter delays until successful
141+
// If any capability failed to advertise, retry according to exp jitter delays
166142
if err != nil {
167143
nextExecution = time.After(eb.Delay())
168144
} else {
@@ -175,6 +151,44 @@ func (c *CapabilitiesDiscovery) AdvertiseCapabilities(capabilities ...Capability
175151
}()
176152
}
177153

154+
// advertiseCaps advertises all capabilities concurrently and returns
155+
// the minimum re-advertisement interval and the first error encountered (if any)
156+
func (c *CapabilitiesDiscovery) advertiseCaps(capabilities []Capability) (time.Duration, error) {
157+
type result struct {
158+
capa Capability
159+
ttl time.Duration
160+
err error
161+
}
162+
results := make(chan result, len(capabilities))
163+
for _, capa := range capabilities {
164+
go func(cap Capability) {
165+
ttl, err := c.advertise(c.dht.Context(), string(cap))
166+
results <- result{capa: cap, ttl: ttl, err: err}
167+
}(capa)
168+
}
169+
170+
var aggErr error
171+
advertisementInterval := maxAdvertisementInterval
172+
for range capabilities {
173+
r := <-results
174+
if r.err != nil {
175+
aggErr = errors.Join(aggErr, r.err)
176+
loggerFn := c.log.Warnf
177+
if r.err == kbucket.ErrLookupFailure {
178+
// No peers in a routing table, it is typical for startup and not an error
179+
loggerFn = c.log.Debugf
180+
}
181+
loggerFn("failed to advertise for capability %s: %v", r.capa, r.err)
182+
continue
183+
}
184+
if r.ttl < advertisementInterval {
185+
advertisementInterval = r.ttl
186+
}
187+
c.log.Infof("advertised capability %s", r.capa)
188+
}
189+
return advertisementInterval, aggErr
190+
}
191+
178192
// Sizer exposes the Size method
179193
type Sizer interface {
180194
Size() int

network/p2pNetwork_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func TestP2PSubmitTX(t *testing.T) {
108108
)
109109
require.Eventually(t, func() bool {
110110
return netA.hasPeers() && netB.hasPeers() && netC.hasPeers()
111-
}, 5*time.Second, 50*time.Millisecond)
111+
}, 10*time.Second, 50*time.Millisecond)
112112

113113
// for some reason the above check is not enough in race builds on CI
114114
time.Sleep(time.Second) // give time for peers to connect.
@@ -203,7 +203,7 @@ func TestP2PSubmitTXNoGossip(t *testing.T) {
203203

204204
require.Eventually(t, func() bool {
205205
return netA.hasPeers() && netB.hasPeers() && netC.hasPeers()
206-
}, 5*time.Second, 50*time.Millisecond)
206+
}, 10*time.Second, 50*time.Millisecond)
207207

208208
time.Sleep(time.Second) // give time for peers to connect.
209209

@@ -288,7 +288,7 @@ func TestP2PSubmitWS(t *testing.T) {
288288

289289
require.Eventually(t, func() bool {
290290
return netA.hasPeers() && netB.hasPeers() && netC.hasPeers()
291-
}, 5*time.Second, 50*time.Millisecond)
291+
}, 10*time.Second, 50*time.Millisecond)
292292

293293
time.Sleep(time.Second) // give time for peers to connect.
294294

@@ -731,7 +731,7 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) {
731731

732732
require.Eventually(t, func() bool {
733733
return netA.hasPeers() && netB.hasPeers() && netC.hasPeers()
734-
}, 2*time.Second, 50*time.Millisecond)
734+
}, 10*time.Second, 50*time.Millisecond)
735735

736736
t.Logf("peers connected")
737737

@@ -1009,7 +1009,7 @@ func TestP2PRelay(t *testing.T) {
10091009

10101010
require.Eventually(t, func() bool {
10111011
return netA.hasPeers() && netB.hasPeers()
1012-
}, 2*time.Second, 50*time.Millisecond)
1012+
}, 10*time.Second, 50*time.Millisecond)
10131013

10141014
type logMessages struct {
10151015
msgs [][]byte
@@ -1089,7 +1089,7 @@ func TestP2PRelay(t *testing.T) {
10891089
require.Eventually(t, func() bool {
10901090
return netA.hasPeers() && netB.hasPeers() && netC.hasPeers() &&
10911091
netA.hasPeer(netB.service.ID()) && netA.hasPeer(netC.service.ID())
1092-
}, 2*time.Second, 50*time.Millisecond)
1092+
}, 10*time.Second, 50*time.Millisecond)
10931093

10941094
// Wait for gossipsub heartbeat to establish mesh links.
10951095
// ListPeersForTopic returns subscribed peers but mesh links are established
@@ -1367,7 +1367,7 @@ func TestP2PwsStreamHandlerDedup(t *testing.T) {
13671367
// now allow the peer made outgoing connection to handle conn closing initiated by the other side
13681368
require.Eventually(t, func() bool {
13691369
return !netA.hasPeers() && !netB.hasPeers()
1370-
}, 2*time.Second, 50*time.Millisecond)
1370+
}, 10*time.Second, 50*time.Millisecond)
13711371
}
13721372

13731373
// TestP2PEnableGossipService_NodeDisable ensures that a node with EnableGossipService=false
@@ -1421,7 +1421,7 @@ func TestP2PEnableGossipService_NodeDisable(t *testing.T) {
14211421

14221422
require.Eventually(t, func() bool {
14231423
return netA.hasPeers() && netB.hasPeers()
1424-
}, 1*time.Second, 50*time.Millisecond)
1424+
}, 10*time.Second, 50*time.Millisecond)
14251425

14261426
testTag := protocol.AgreementVoteTag
14271427

0 commit comments

Comments
 (0)