Skip to content

Commit 66dcefc

Browse files
authored
Merge pull request #2222 from hashicorp/f-raft-v2
Integrates Consul with "stage one" of HashiCorp Raft library v2.
2 parents fbcb932 + 5586ca3 commit 66dcefc

40 files changed

+3348
-1899
lines changed

consul/acl_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ func TestACL_NonAuthority_NotFound(t *testing.T) {
223223
}
224224

225225
testutil.WaitForResult(func() (bool, error) {
226-
p1, _ := s1.raftPeers.Peers()
227-
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
226+
p1, _ := s1.numPeers()
227+
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
228228
}, func(err error) {
229229
t.Fatalf("should have 2 peers: %v", err)
230230
})
@@ -275,8 +275,8 @@ func TestACL_NonAuthority_Found(t *testing.T) {
275275
}
276276

277277
testutil.WaitForResult(func() (bool, error) {
278-
p1, _ := s1.raftPeers.Peers()
279-
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
278+
p1, _ := s1.numPeers()
279+
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
280280
}, func(err error) {
281281
t.Fatalf("should have 2 peers: %v", err)
282282
})
@@ -351,8 +351,8 @@ func TestACL_NonAuthority_Management(t *testing.T) {
351351
}
352352

353353
testutil.WaitForResult(func() (bool, error) {
354-
p1, _ := s1.raftPeers.Peers()
355-
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
354+
p1, _ := s1.numPeers()
355+
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
356356
}, func(err error) {
357357
t.Fatalf("should have 2 peers: %v", err)
358358
})
@@ -408,8 +408,8 @@ func TestACL_DownPolicy_Deny(t *testing.T) {
408408
}
409409

410410
testutil.WaitForResult(func() (bool, error) {
411-
p1, _ := s1.raftPeers.Peers()
412-
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
411+
p1, _ := s1.numPeers()
412+
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
413413
}, func(err error) {
414414
t.Fatalf("should have 2 peers: %v", err)
415415
})
@@ -482,8 +482,8 @@ func TestACL_DownPolicy_Allow(t *testing.T) {
482482
}
483483

484484
testutil.WaitForResult(func() (bool, error) {
485-
p1, _ := s1.raftPeers.Peers()
486-
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
485+
p1, _ := s1.numPeers()
486+
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
487487
}, func(err error) {
488488
t.Fatalf("should have 2 peers: %v", err)
489489
})
@@ -558,8 +558,8 @@ func TestACL_DownPolicy_ExtendCache(t *testing.T) {
558558
}
559559

560560
testutil.WaitForResult(func() (bool, error) {
561-
p1, _ := s1.raftPeers.Peers()
562-
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
561+
p1, _ := s1.numPeers()
562+
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
563563
}, func(err error) {
564564
t.Fatalf("should have 2 peers: %v", err)
565565
})

consul/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,10 @@ func DefaultConfig() *Config {
330330
conf.SerfLANConfig.MemberlistConfig.BindPort = DefaultLANSerfPort
331331
conf.SerfWANConfig.MemberlistConfig.BindPort = DefaultWANSerfPort
332332

333+
// Enable interoperability with unversioned Raft library, and don't
334+
// start using new ID-based features yet.
335+
conf.RaftConfig.ProtocolVersion = 1
336+
333337
// Disable shutdown on removal
334338
conf.RaftConfig.ShutdownOnRemove = false
335339

consul/leader.go

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -543,10 +543,26 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
543543
}
544544
}
545545

546+
// TODO (slackpad) - This will need to be changed once we support node IDs.
547+
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
548+
549+
// See if it's already in the configuration. It's harmless to re-add it
550+
// but we want to avoid doing that if possible to prevent useless Raft
551+
// log entries.
552+
configFuture := s.raft.GetConfiguration()
553+
if err := configFuture.Error(); err != nil {
554+
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
555+
return err
556+
}
557+
for _, server := range configFuture.Configuration().Servers {
558+
if server.Address == raft.ServerAddress(addr) {
559+
return nil
560+
}
561+
}
562+
546563
// Attempt to add as a peer
547-
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port}
548-
future := s.raft.AddPeer(addr.String())
549-
if err := future.Error(); err != nil && err != raft.ErrKnownPeer {
564+
addFuture := s.raft.AddPeer(raft.ServerAddress(addr))
565+
if err := addFuture.Error(); err != nil {
550566
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
551567
return err
552568
}
@@ -555,15 +571,31 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
555571

556572
// removeConsulServer is used to try to remove a consul server that has left
557573
func (s *Server) removeConsulServer(m serf.Member, port int) error {
558-
// Attempt to remove as peer
559-
peer := &net.TCPAddr{IP: m.Addr, Port: port}
560-
future := s.raft.RemovePeer(peer.String())
561-
if err := future.Error(); err != nil && err != raft.ErrUnknownPeer {
574+
// TODO (slackpad) - This will need to be changed once we support node IDs.
575+
addr := (&net.TCPAddr{IP: m.Addr, Port: port}).String()
576+
577+
// See if it's already in the configuration. It's harmless to re-remove it
578+
// but we want to avoid doing that if possible to prevent useless Raft
579+
// log entries.
580+
configFuture := s.raft.GetConfiguration()
581+
if err := configFuture.Error(); err != nil {
582+
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
583+
return err
584+
}
585+
for _, server := range configFuture.Configuration().Servers {
586+
if server.Address == raft.ServerAddress(addr) {
587+
goto REMOVE
588+
}
589+
}
590+
return nil
591+
592+
REMOVE:
593+
// Attempt to remove as a peer.
594+
future := s.raft.RemovePeer(raft.ServerAddress(addr))
595+
if err := future.Error(); err != nil {
562596
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
563-
peer, err)
597+
addr, err)
564598
return err
565-
} else if err == nil {
566-
s.logger.Printf("[INFO] consul: removed server '%s' as peer", m.Name)
567599
}
568600
return nil
569601
}

consul/leader_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,8 @@ func TestLeader_LeftServer(t *testing.T) {
341341

342342
for _, s := range servers {
343343
testutil.WaitForResult(func() (bool, error) {
344-
peers, _ := s.raftPeers.Peers()
345-
return len(peers) == 3, nil
344+
peers, _ := s.numPeers()
345+
return peers == 3, nil
346346
}, func(err error) {
347347
t.Fatalf("should have 3 peers")
348348
})
@@ -358,8 +358,8 @@ func TestLeader_LeftServer(t *testing.T) {
358358
}
359359

360360
for _, s := range servers[1:] {
361-
peers, _ := s.raftPeers.Peers()
362-
return len(peers) == 2, errors.New(fmt.Sprintf("%v", peers))
361+
peers, _ := s.numPeers()
362+
return peers == 2, errors.New(fmt.Sprintf("%d", peers))
363363
}
364364

365365
return true, nil
@@ -394,8 +394,8 @@ func TestLeader_LeftLeader(t *testing.T) {
394394

395395
for _, s := range servers {
396396
testutil.WaitForResult(func() (bool, error) {
397-
peers, _ := s.raftPeers.Peers()
398-
return len(peers) == 3, nil
397+
peers, _ := s.numPeers()
398+
return peers == 3, nil
399399
}, func(err error) {
400400
t.Fatalf("should have 3 peers")
401401
})
@@ -423,8 +423,8 @@ func TestLeader_LeftLeader(t *testing.T) {
423423
}
424424
remain = s
425425
testutil.WaitForResult(func() (bool, error) {
426-
peers, _ := s.raftPeers.Peers()
427-
return len(peers) == 2, errors.New(fmt.Sprintf("%v", peers))
426+
peers, _ := s.numPeers()
427+
return peers == 2, errors.New(fmt.Sprintf("%d", peers))
428428
}, func(err error) {
429429
t.Fatalf("should have 2 peers: %v", err)
430430
})
@@ -472,8 +472,8 @@ func TestLeader_MultiBootstrap(t *testing.T) {
472472

473473
// Ensure we don't have multiple raft peers
474474
for _, s := range servers {
475-
peers, _ := s.raftPeers.Peers()
476-
if len(peers) != 1 {
475+
peers, _ := s.numPeers()
476+
if peers != 1 {
477477
t.Fatalf("should only have 1 raft peer!")
478478
}
479479
}
@@ -505,8 +505,8 @@ func TestLeader_TombstoneGC_Reset(t *testing.T) {
505505

506506
for _, s := range servers {
507507
testutil.WaitForResult(func() (bool, error) {
508-
peers, _ := s.raftPeers.Peers()
509-
return len(peers) == 3, nil
508+
peers, _ := s.numPeers()
509+
return peers == 3, nil
510510
}, func(err error) {
511511
t.Fatalf("should have 3 peers")
512512
})

consul/raft_rpc.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/hashicorp/consul/tlsutil"
10+
"github.com/hashicorp/raft"
1011
)
1112

1213
// RaftLayer implements the raft.StreamLayer interface,
@@ -80,8 +81,8 @@ func (l *RaftLayer) Addr() net.Addr {
8081
}
8182

8283
// Dial is used to create a new outgoing connection
83-
func (l *RaftLayer) Dial(address string, timeout time.Duration) (net.Conn, error) {
84-
conn, err := net.DialTimeout("tcp", address, timeout)
84+
func (l *RaftLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net.Conn, error) {
85+
conn, err := net.DialTimeout("tcp", string(address), timeout)
8586
if err != nil {
8687
return nil, err
8788
}

consul/serf.go

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"strings"
66

77
"github.com/hashicorp/consul/consul/agent"
8+
"github.com/hashicorp/raft"
89
"github.com/hashicorp/serf/serf"
910
)
1011

@@ -53,7 +54,7 @@ func (s *Server) lanEventHandler() {
5354
case serf.EventMemberUpdate: // Ignore
5455
case serf.EventQuery: // Ignore
5556
default:
56-
s.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
57+
s.logger.Printf("[WARN] consul: Unhandled LAN Serf Event: %#v", e)
5758
}
5859

5960
case <-s.shutdownCh:
@@ -77,7 +78,7 @@ func (s *Server) wanEventHandler() {
7778
case serf.EventUser:
7879
case serf.EventQuery: // Ignore
7980
default:
80-
s.logger.Printf("[WARN] consul: unhandled WAN Serf Event: %#v", e)
81+
s.logger.Printf("[WARN] consul: Unhandled WAN Serf Event: %#v", e)
8182
}
8283

8384
case <-s.shutdownCh:
@@ -127,7 +128,7 @@ func (s *Server) localEvent(event serf.UserEvent) {
127128
}
128129
case isUserEvent(name):
129130
event.Name = rawUserEventName(name)
130-
s.logger.Printf("[DEBUG] consul: user event: %s", event.Name)
131+
s.logger.Printf("[DEBUG] consul: User event: %s", event.Name)
131132

132133
// Trigger the callback
133134
if s.config.UserEventHandler != nil {
@@ -145,12 +146,12 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
145146
if !ok {
146147
continue
147148
}
148-
s.logger.Printf("[INFO] consul: adding LAN server %s", parts)
149+
s.logger.Printf("[INFO] consul: Adding LAN server %s", parts)
149150

150151
// See if it's configured as part of our DC.
151152
if parts.Datacenter == s.config.Datacenter {
152153
s.localLock.Lock()
153-
s.localConsuls[parts.Addr.String()] = parts
154+
s.localConsuls[raft.ServerAddress(parts.Addr.String())] = parts
154155
s.localLock.Unlock()
155156
}
156157

@@ -166,10 +167,10 @@ func (s *Server) wanNodeJoin(me serf.MemberEvent) {
166167
for _, m := range me.Members {
167168
ok, parts := agent.IsConsulServer(m)
168169
if !ok {
169-
s.logger.Printf("[WARN] consul: non-server in WAN pool: %s", m.Name)
170+
s.logger.Printf("[WARN] consul: Non-server in WAN pool: %s", m.Name)
170171
continue
171172
}
172-
s.logger.Printf("[INFO] consul: adding WAN server %s", parts)
173+
s.logger.Printf("[INFO] consul: Adding WAN server %s", parts)
173174

174175
// Search for this node in our existing remotes.
175176
found := false
@@ -193,20 +194,20 @@ func (s *Server) wanNodeJoin(me serf.MemberEvent) {
193194

194195
// maybeBootsrap is used to handle bootstrapping when a new consul server joins
195196
func (s *Server) maybeBootstrap() {
197+
// Bootstrap can only be done if there are no committed logs, remove our
198+
// expectations of bootstrapping. This is slightly cheaper than the full
199+
// check that BootstrapCluster will do, so this is a good pre-filter.
196200
index, err := s.raftStore.LastIndex()
197201
if err != nil {
198-
s.logger.Printf("[ERR] consul: failed to read last raft index: %v", err)
202+
s.logger.Printf("[ERR] consul: Failed to read last raft index: %v", err)
199203
return
200204
}
201-
202-
// Bootstrap can only be done if there are no committed logs,
203-
// remove our expectations of bootstrapping
204205
if index != 0 {
205206
s.config.BootstrapExpect = 0
206207
return
207208
}
208209

209-
// Scan for all the known servers
210+
// Scan for all the known servers.
210211
members := s.serfLAN.Members()
211212
addrs := make([]string, 0)
212213
for _, member := range members {
@@ -230,18 +231,30 @@ func (s *Server) maybeBootstrap() {
230231
addrs = append(addrs, addr.String())
231232
}
232233

233-
// Skip if we haven't met the minimum expect count
234+
// Skip if we haven't met the minimum expect count.
234235
if len(addrs) < s.config.BootstrapExpect {
235236
return
236237
}
237238

238-
// Update the peer set
239-
s.logger.Printf("[INFO] consul: Attempting bootstrap with nodes: %v", addrs)
240-
if err := s.raft.SetPeers(addrs).Error(); err != nil {
241-
s.logger.Printf("[ERR] consul: failed to bootstrap peers: %v", err)
239+
// Attempt a live bootstrap!
240+
var configuration raft.Configuration
241+
for _, addr := range addrs {
242+
// TODO (slackpad) - This will need to be updated once we support
243+
// node IDs.
244+
server := raft.Server{
245+
ID: raft.ServerID(addr),
246+
Address: raft.ServerAddress(addr),
247+
}
248+
configuration.Servers = append(configuration.Servers, server)
249+
}
250+
s.logger.Printf("[INFO] consul: Found expected number of peers (%s), attempting to bootstrap cluster...",
251+
strings.Join(addrs, ","))
252+
future := s.raft.BootstrapCluster(configuration)
253+
if err := future.Error(); err != nil {
254+
s.logger.Printf("[ERR] consul: Failed to bootstrap cluster: %v", err)
242255
}
243256

244-
// Bootstrapping complete, don't enter this again
257+
// Bootstrapping complete, don't enter this again.
245258
s.config.BootstrapExpect = 0
246259
}
247260

@@ -252,10 +265,10 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) {
252265
if !ok {
253266
continue
254267
}
255-
s.logger.Printf("[INFO] consul: removing LAN server %s", parts)
268+
s.logger.Printf("[INFO] consul: Removing LAN server %s", parts)
256269

257270
s.localLock.Lock()
258-
delete(s.localConsuls, parts.Addr.String())
271+
delete(s.localConsuls, raft.ServerAddress(parts.Addr.String()))
259272
s.localLock.Unlock()
260273
}
261274
}
@@ -267,7 +280,7 @@ func (s *Server) wanNodeFailed(me serf.MemberEvent) {
267280
if !ok {
268281
continue
269282
}
270-
s.logger.Printf("[INFO] consul: removing WAN server %s", parts)
283+
s.logger.Printf("[INFO] consul: Removing WAN server %s", parts)
271284

272285
// Remove the server if known
273286
s.remoteLock.Lock()

0 commit comments

Comments
 (0)