diff --git a/internal/pkg/store/setup.go b/internal/pkg/store/setup.go index 260898db8b..adda40ed7a 100644 --- a/internal/pkg/store/setup.go +++ b/internal/pkg/store/setup.go @@ -42,10 +42,10 @@ func SetupDefault(dataDir string) error { Fdb: definition.FdbConfig{}, } - return Setup(c) + return Setup(c, false) } -func SetupWithConfig(sc *StoreConf) error { +func SetupWithConfig(sc *StoreConf, setupCheckpointDB bool) error { c := definition.Config{ Type: sc.Type, ExtStateType: sc.ExtStateType, @@ -54,15 +54,29 @@ func SetupWithConfig(sc *StoreConf) error { Fdb: sc.FdbConfig, Pebble: sc.PebbleConfig, } - return Setup(c) + return Setup(c, setupCheckpointDB) } -func Setup(config definition.Config) error { +func Setup(config definition.Config, setupCheckpointDB bool) error { s, err := newStores(config, "sqliteKV.db") if err != nil { return err } globalStores = s + if setupCheckpointDB { + s, err = newStores(config, "checkpoint.db") + if err != nil { + return err + } + checkpointStores = s + db, err := checkpointStores.GetKV("init_checkpointdb") + if err != nil { + checkpointStores = nil + } else { + // write sth to ensure checkpoint.db created + db.Set("init", "init") + } + } s, err = newStores(config, "cache.db") if err != nil { return err diff --git a/internal/pkg/store/stores.go b/internal/pkg/store/stores.go index fbc0759fb3..e468d7a96b 100644 --- a/internal/pkg/store/stores.go +++ b/internal/pkg/store/stores.go @@ -31,9 +31,10 @@ var ( storeBuilders = map[string]StoreCreator{ "sqlite": sql.BuildStores, } - globalStores *stores = nil - cacheStores *stores = nil - extStateStores *stores = nil + globalStores *stores = nil + cacheStores *stores = nil + extStateStores *stores = nil + checkpointStores *stores = nil TraceStores sql.Database ) @@ -154,6 +155,9 @@ func GetKV(table string) (kv.KeyValue, error) { } func GetTS(table string) (kv.Tskv, error) { + if checkpointStores != nil { + return checkpointStores.GetTS(table) + } if globalStores == nil { return nil, fmt.Errorf("global stores are not initialized") } @@ -161,6 +165,10 @@ func GetTS(table string) (kv.Tskv, error) { } func DropTS(table string) error { + if checkpointStores != nil { + checkpointStores.DropTS(table) + return nil + } if globalStores == nil { return fmt.Errorf("global stores are not initialized") } diff --git a/internal/server/server.go b/internal/server/server.go index 151059dde5..8a81fd61b9 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -140,10 +140,34 @@ func getStoreConfigByKuiperConfig(c *model.KuiperConf) (*store.StoreConf, error) return sc, nil } +func canSetupCheckpointDB() (setup bool) { + s, err := conf.GetDataLoc() + if err != nil { + return false + } + _, err = os.Stat(filepath.Join(s, "checkpoint.db")) + // checkpoint.db already exists, setup it. + if err == nil { + return true + } + _, err = os.Stat(filepath.Join(s, "sqliteKV.db")) + if err != nil { + // sqliteKV.db not exists, setup checkpoint.db. + if os.IsNotExist(err) { + return true + } + // unexpected error happened + return false + } + // sqliteKV.db exists/ checkpoint.db not exists, don't setup. + return false +} + func StartUp(Version string) { version = Version startTimeStamp = time.Now().Unix() createPaths() + needSetup := canSetupCheckpointDB() conf.SetupEnv() conf.InitConf() if modules.ConfHook != nil { @@ -184,7 +208,7 @@ func StartUp(Version string) { if err != nil { panic(err) } - err = store.SetupWithConfig(sc) + err = store.SetupWithConfig(sc, needSetup) if err != nil { panic(err) }