Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,7 @@ func (cp *Compactor) Compact(ctx context.Context, opts *brtypes.CompactOptions)
}
}

defer func() {
embeddedEtcd.Server.Stop()
embeddedEtcd.Close()
}()
defer embeddedEtcd.Close()

ep := []string{embeddedEtcd.Clients[0].Addr().String()}

Expand Down
4 changes: 2 additions & 2 deletions pkg/compactor/compactor_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
resp := &utils.EtcdDataPopulationResponse{}
wg := &sync.WaitGroup{}
wg.Add(1)
go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, resp)
go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, "", "", resp)

// Take snapshots (Full + Delta) of the ETCD database
deltaSnapshotPeriod := time.Second
Expand All @@ -80,7 +80,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
compressionConfig.Enabled = true
compressionConfig.CompressionPolicy = "gzip"
snapstoreConfig := brtypes.SnapstoreConfig{Container: testSnapshotDir, Provider: "Local"}
err = utils.RunSnapshotter(logger, snapstoreConfig, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig)
err = utils.RunSnapshotter(logger, snapstoreConfig, deltaSnapshotPeriod, endpoints, "", "", ctx.Done(), true, compressionConfig)
Expect(err).ShouldNot(HaveOccurred())

// Wait until the populator finishes with populating ETCD
Expand Down
4 changes: 2 additions & 2 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ var _ = Describe("Running Compactor", func() {
err = restorer.RestoreAndStopEtcd(*restoreOpts, nil)

Expect(err).ShouldNot(HaveOccurred())
err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, keyTo, logger)
err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, keyTo, "", "", logger)
Expect(err).ShouldNot(HaveOccurred())
})
})
Expand Down Expand Up @@ -269,7 +269,7 @@ var _ = Describe("Running Compactor", func() {
err = restorer.RestoreAndStopEtcd(*restoreOpts, nil)

Expect(err).ShouldNot(HaveOccurred())
err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, keyTo, logger)
err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.DataDir, keyTo, "", "", logger)
Expect(err).ShouldNot(HaveOccurred())
})
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/defragmentor/defrag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ var _ = Describe("Defrag", func() {
resp := &utils.EtcdDataPopulationResponse{}
wg := &sync.WaitGroup{}
wg.Add(1)
go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, resp)
go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, "", "", resp)
Expect(resp.Err).ShouldNot(HaveOccurred())

// Wait unitil the populator finishes with populating ETCD
Expand Down
5 changes: 1 addition & 4 deletions pkg/initializer/validator/datavalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,7 @@ func (d *DataValidator) checkFullRevisionConsistency(dataDir string, latestSnaps
d.Logger.Infof("unable to start embedded etcd: %v", err)
return DataDirectoryCorrupt, err
}
defer func() {
e.Server.Stop()
e.Close()
}()
defer e.Close()

clientFactory := etcdutil.NewClientFactory(nil, brtypes.EtcdConnectionConfig{
Endpoints: []string{e.Clients[0].Addr().String()},
Expand Down
4 changes: 2 additions & 2 deletions pkg/initializer/validator/datavalidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ var _ = Describe("Running Datavalidator", func() {
endpoints := []string{etcd.Clients[0].Addr().String()}
// populate etcd but with lesser data than previous populate call, so that the new db has a lower revision
resp := &utils.EtcdDataPopulationResponse{}
utils.PopulateEtcd(testCtx, logger, endpoints, 0, int(keyTo/2), resp)
utils.PopulateEtcd(testCtx, logger, endpoints, "", "", 0, int(keyTo/2), resp)
Expect(resp.Err).ShouldNot(HaveOccurred())
etcd.Close()

Expand Down Expand Up @@ -270,7 +270,7 @@ var _ = Describe("Running Datavalidator", func() {

resp := &utils.EtcdDataPopulationResponse{}
// populate the etcd with some more keys
utils.PopulateEtcd(testCtx, logger, endpoints, 0, int(keyTo/2), resp)
utils.PopulateEtcd(testCtx, logger, endpoints, "", "", 0, int(keyTo/2), resp)
Expect(resp.Err).ShouldNot(HaveOccurred())

//run the snapshotter
Expand Down
2 changes: 1 addition & 1 deletion pkg/initializer/validator/validator_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
resp := &utils.EtcdDataPopulationResponse{}
wg := &sync.WaitGroup{}
wg.Add(1)
go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, resp)
go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, "", "", resp)

deltaSnapshotPeriod := 5 * time.Second
ctx := utils.ContextWithWaitGroupFollwedByGracePeriod(populatorCtx, wg, deltaSnapshotPeriod+2*time.Second)
Expand Down
38 changes: 36 additions & 2 deletions pkg/miscellaneous/miscellaneous.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/pkg/types"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -155,8 +156,32 @@ func getStructuredBackupList(snapList brtypes.SnapList) []backup {
return backups
}

// EmbeddedEtcd contains a running etcd server and its listeners.
type EmbeddedEtcd struct {
*embed.Etcd

// authWasDisabled tracks if auth was disabled during startup and we need to
// enable it again before closing the server.
authWasDisabled bool
}

// Close closes the etcd embedded server. If auth was disabled during startup,
// it will re-enable auth before closing the server.
func (e *EmbeddedEtcd) Close() {
if e.authWasDisabled {
if err := e.Etcd.Server.AuthStore().AuthEnable(); err != nil {
e.Etcd.GetLogger().Error("failed to enable auth again", zap.Error(err))
}
}

e.Etcd.Server.Stop()
e.Etcd.Close()
}

// StartEmbeddedEtcd starts the embedded etcd server.
func StartEmbeddedEtcd(logger *logrus.Entry, ro *brtypes.RestoreOptions) (*embed.Etcd, error) {
// Etcd auth is automatically disabled if needed: clients do not need to provide any credentials when sending requests.
// If auth was disabled, it's automatically enabled again when server is closed.
func StartEmbeddedEtcd(logger *logrus.Entry, ro *brtypes.RestoreOptions) (*EmbeddedEtcd, error) {
cfg := embed.NewConfig()
cfg.Dir = filepath.Join(ro.Config.DataDir)
DefaultListenPeerURLs := "http://localhost:0"
Expand Down Expand Up @@ -202,7 +227,16 @@ func StartEmbeddedEtcd(logger *logrus.Entry, ro *brtypes.RestoreOptions) (*embed
e.Close()
return nil, fmt.Errorf("server took too long to start")
}
return e, nil

embeddedEtcd := &EmbeddedEtcd{Etcd: e}

if e.Server.AuthStore().IsAuthEnabled() {
// Disable auth and record this action so that we can enable it again later.
embeddedEtcd.authWasDisabled = true
e.Server.AuthStore().AuthDisable()
}

return embeddedEtcd, nil
}

// GetKubernetesClientSetOrError creates and returns a kubernetes clientset or an error if creation fails
Expand Down
4 changes: 2 additions & 2 deletions pkg/snapshot/copier/copier_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ var _ = SynchronizedBeforeSuite(func() []byte {
resp := &utils.EtcdDataPopulationResponse{}
wg := &sync.WaitGroup{}
wg.Add(1)
go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, resp)
go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, "", "", resp)

deltaSnapshotPeriod := time.Second
ctx := utils.ContextWithWaitGroupFollwedByGracePeriod(populatorCtx, wg, deltaSnapshotPeriod+2*time.Second)

compressionConfig := compressor.NewCompressorConfig()
snapstoreConfig := brtypes.SnapstoreConfig{Container: snapstoreDir, Provider: "Local"}
err = utils.RunSnapshotter(logger, snapstoreConfig, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig)
err = utils.RunSnapshotter(logger, snapstoreConfig, deltaSnapshotPeriod, endpoints, "", "", ctx.Done(), true, compressionConfig)
Expect(err).ShouldNot(HaveOccurred())

keyTo = resp.KeyTo
Expand Down
4 changes: 1 addition & 3 deletions pkg/snapshot/restorer/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/snapshot"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -68,15 +67,14 @@ func (r *Restorer) RestoreAndStopEtcd(ro brtypes.RestoreOptions, m member.Contro
embeddedEtcd, err := r.Restore(ro, m)
defer func() {
if embeddedEtcd != nil {
embeddedEtcd.Server.Stop()
embeddedEtcd.Close()
}
}()
return err
}

// Restore restores the etcd data directory as per specified restore options but returns the ETCD server that it statrted.
func (r *Restorer) Restore(ro brtypes.RestoreOptions, m member.Control) (*embed.Etcd, error) {
func (r *Restorer) Restore(ro brtypes.RestoreOptions, m member.Control) (*miscellaneous.EmbeddedEtcd, error) {
r.logger.Infof("Creating temporary directory %s for persisting full and delta snapshots locally.", ro.Config.TempSnapshotsDir)
err := os.MkdirAll(ro.Config.TempSnapshotsDir, 0700)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/snapshot/restorer/restorer_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ var _ = SynchronizedBeforeSuite(func() []byte {
resp := &utils.EtcdDataPopulationResponse{}
wg := &sync.WaitGroup{}
wg.Add(1)
go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, resp)
go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, "", "", resp)

deltaSnapshotPeriod := time.Second
ctx := utils.ContextWithWaitGroupFollwedByGracePeriod(populatorCtx, wg, deltaSnapshotPeriod+2*time.Second)

compressionConfig := compressor.NewCompressorConfig()
snapstoreConfig := brtypes.SnapstoreConfig{Container: snapstoreDir, Provider: "Local"}
err = utils.RunSnapshotter(logger, snapstoreConfig, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig)
err = utils.RunSnapshotter(logger, snapstoreConfig, deltaSnapshotPeriod, endpoints, "", "", ctx.Done(), true, compressionConfig)
Expect(err).ShouldNot(HaveOccurred())

keyTo = resp.KeyTo
Expand Down
Loading