Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions internal/pkg/store/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ func SetupDefault(dataDir string) error {
Fdb: definition.FdbConfig{},
}

return Setup(c)
return Setup(c, false)
}

func SetupWithConfig(sc *StoreConf) error {
func SetupWithConfig(sc *StoreConf, setupCheckpointDB bool) error {
c := definition.Config{
Type: sc.Type,
ExtStateType: sc.ExtStateType,
Expand All @@ -54,15 +54,29 @@ func SetupWithConfig(sc *StoreConf) error {
Fdb: sc.FdbConfig,
Pebble: sc.PebbleConfig,
}
return Setup(c)
return Setup(c, setupCheckpointDB)
}

func Setup(config definition.Config) error {
func Setup(config definition.Config, setupCheckpointDB bool) error {
s, err := newStores(config, "sqliteKV.db")
if err != nil {
return err
}
globalStores = s
if setupCheckpointDB {
s, err = newStores(config, "checkpoint.db")
if err != nil {
return err
}
checkpointStores = s
db, err := checkpointStores.GetKV("init_checkpointdb")
if err != nil {
checkpointStores = nil
} else {
// write sth to ensure checkpoint.db created
db.Set("init", "init")
}
}
s, err = newStores(config, "cache.db")
if err != nil {
return err
Expand Down
14 changes: 11 additions & 3 deletions internal/pkg/store/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ var (
storeBuilders = map[string]StoreCreator{
"sqlite": sql.BuildStores,
}
globalStores *stores = nil
cacheStores *stores = nil
extStateStores *stores = nil
globalStores *stores = nil
cacheStores *stores = nil
extStateStores *stores = nil
checkpointStores *stores = nil

TraceStores sql.Database
)
Expand Down Expand Up @@ -154,13 +155,20 @@ func GetKV(table string) (kv.KeyValue, error) {
}

func GetTS(table string) (kv.Tskv, error) {
if checkpointStores != nil {
return checkpointStores.GetTS(table)
}
if globalStores == nil {
return nil, fmt.Errorf("global stores are not initialized")
}
return globalStores.GetTS(table)
}

func DropTS(table string) error {
if checkpointStores != nil {
checkpointStores.DropTS(table)
return nil
}
if globalStores == nil {
return fmt.Errorf("global stores are not initialized")
}
Expand Down
26 changes: 25 additions & 1 deletion internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,34 @@ func getStoreConfigByKuiperConfig(c *model.KuiperConf) (*store.StoreConf, error)
return sc, nil
}

func canSetupCheckpointDB() (setup bool) {
s, err := conf.GetDataLoc()
if err != nil {
return false
}
_, err = os.Stat(filepath.Join(s, "checkpoint.db"))
// checkpoint.db already exists, setup it.
if err == nil {
return true
}
_, err = os.Stat(filepath.Join(s, "sqliteKV.db"))
if err != nil {
// sqliteKV.db not exists, setup checkpoint.db.
if os.IsNotExist(err) {
return true
}
// unexpected error happened
return false
}
// sqliteKV.db exists/ checkpoint.db not exists, don't setup.
return false
}

func StartUp(Version string) {
version = Version
startTimeStamp = time.Now().Unix()
createPaths()
needSetup := canSetupCheckpointDB()
conf.SetupEnv()
conf.InitConf()
if modules.ConfHook != nil {
Expand Down Expand Up @@ -184,7 +208,7 @@ func StartUp(Version string) {
if err != nil {
panic(err)
}
err = store.SetupWithConfig(sc)
err = store.SetupWithConfig(sc, needSetup)
if err != nil {
panic(err)
}
Expand Down