Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
journal_mode: [delete, wal]
journal_mode: [delete, persist, wal]
steps:
- uses: actions/checkout@v3

Expand Down Expand Up @@ -66,7 +66,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
journal_mode: [delete, wal]
journal_mode: [delete, persist, wal]
steps:
- uses: actions/checkout@v3

Expand Down
15 changes: 11 additions & 4 deletions cmd/litefs/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,21 @@ func TestSingleNode_DatabaseChecksumMismatch(t *testing.T) {
t.Fatal(err)
}

if testingutil.IsWALMode() {
if err := m0.Run(context.Background()); err == nil || err.Error() != `cannot open store: open databases: open database("db"): verify database file: database checksum (a9e884061ea4e488) does not match latest LTX checksum (fa337f5ece449f39)` {
switch mode := testingutil.JournalMode(); mode {
case "delete":
if err := m0.Run(context.Background()); err == nil || err.Error() != `cannot open store: open databases: open database("db"): verify database file: database checksum (9d81a60d39fb4760) does not match latest LTX checksum (ce5a5d55e91b3cd1)` {
t.Fatalf("unexpected error: %s", err)
}
} else {
if err := m0.Run(context.Background()); err == nil || err.Error() != `cannot open store: open databases: open database("db"): verify database file: database checksum (9d81a60d39fb4760) does not match latest LTX checksum (ce5a5d55e91b3cd1)` {
case "persist":
if err := m0.Run(context.Background()); err == nil || err.Error() != `cannot open store: open databases: open database("db"): verify database file: database checksum (ff2d4d6d60fd80cd) does not match latest LTX checksum (ce5a5d55e91b3cd1)` {
t.Fatalf("unexpected error: %s", err)
}
case "wal":
if err := m0.Run(context.Background()); err == nil || err.Error() != `cannot open store: open databases: open database("db"): verify database file: database checksum (a9e884061ea4e488) does not match latest LTX checksum (fa337f5ece449f39)` {
t.Fatalf("unexpected error: %s", err)
}
default:
t.Fatalf("invalid journal mode: %q", mode)
}
}

Expand Down
39 changes: 36 additions & 3 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,11 +815,29 @@ func (db *DB) WriteJournal(f *os.File, data []byte, offset int64) error {
if !db.store.IsPrimary() {
return ErrReadOnlyReplica
}

// Assume this is a PERSIST commit if the initial header bytes are cleared.
if offset == 0 && len(data) == SQLITE_DATABASE_SIZE_OFFSET && isByteSliceZero(data) {
if err := db.CommitJournal(JournalModePersist); err != nil {
return fmt.Errorf("commit journal (PERSIST): %w", err)
}
}

_, err := f.WriteAt(data, offset)
dbJournalWriteCountMetricVec.WithLabelValues(db.name).Inc()
return err
}

// isByteSliceZero returns true if b only contains NULL bytes.
func isByteSliceZero(b []byte) bool {
for _, v := range b {
if v != 0 {
return false
}
}
return true
}

// CommitJournal deletes the journal file which commits or rolls back the transaction.
func (db *DB) CommitJournal(mode JournalMode) error {
db.mu.Lock()
Expand Down Expand Up @@ -1050,7 +1068,19 @@ func (db *DB) invalidateJournal(mode JournalMode) error {
}

case JournalModePersist:
return fmt.Errorf("journal mode not implemented: PERSIST")
f, err := os.OpenFile(db.JournalPath(), os.O_RDWR, 0666)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("open journal: %w", err)
}
defer func() { _ = f.Close() }()

if _, err := f.Write(make([]byte, SQLITE_JOURNAL_HEADER_SIZE)); err != nil {
return fmt.Errorf("clear journal header: %w", err)
} else if err := f.Sync(); err != nil {
return fmt.Errorf("sync journal: %w", err)
} else if err := f.Close(); err != nil {
return fmt.Errorf("close journal: %w", err)
}

default:
return fmt.Errorf("invalid journal: %q", mode)
Expand Down Expand Up @@ -1604,12 +1634,15 @@ func TrimName(name string) string {
const (
SQLITE_DATABASE_HEADER_STRING = "SQLite format 3\x00"

// Location of the database size, in pages, in the main database file.
SQLITE_DATABASE_SIZE_OFFSET = 28

/// Magic header string that identifies a SQLite journal header.
/// https://www.sqlite.org/fileformat.html#the_rollback_journal
SQLITE_JOURNAL_HEADER_STRING = "\xd9\xd5\x05\xf9\x20\xa1\x63\xd7"

// Location of the database size, in pages, in the main database file.
SQLITE_DATABASE_SIZE_OFFSET = 28
// Size of the journal header, in bytes.
SQLITE_JOURNAL_HEADER_SIZE = 28
)

// LockType represents a SQLite lock type.
Expand Down
26 changes: 19 additions & 7 deletions fuse/file_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,6 @@ func TestFileSystem_MultipleJournalSegments(t *testing.T) {
}

func TestFileSystem_ReadOnly(t *testing.T) {
if testingutil.IsWALMode() {
t.Skip("SQLITE_READONLY not yet supported in WAL mode")
}

dir := t.TempDir()
fs := newOpenFileSystem(t, dir, litefs.NewStaticLeaser(true, "localhost", "http://localhost:20202"))
dsn := filepath.Join(fs.Path(), "db")
Expand All @@ -280,10 +276,26 @@ func TestFileSystem_ReadOnly(t *testing.T) {

// Attempt to write to read-only database.
db = testingutil.OpenSQLDB(t, dsn)
var e sqlite3.Error
if _, err := db.Exec(`INSERT INTO t VALUES (100)`); !errors.As(err, &e) || e.Code != sqlite3.ErrReadonly {
t.Fatalf("unexpected error: %s", err)
_, err := db.Exec(`INSERT INTO t VALUES (100)`)

switch mode := testingutil.JournalMode(); mode {
case "delete":
var e sqlite3.Error
if !errors.As(err, &e) || e.Code != sqlite3.ErrReadonly {
t.Fatalf("unexpected error: %s", err)
}
case "persist":
if err == nil || err.Error() != `disk I/O error: permission denied` {
t.Fatalf("unexpected error: %s", err)
}
case "wal":
if err == nil || err.Error() != `disk I/O error` {
t.Fatalf("unexpected error: %s", err)
}
default:
t.Fatalf("invalid journal mode: %q", mode)
}

if err := db.Close(); err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion fuse/journal_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (h *JournalHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *f
func (h *JournalHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
if err := h.node.db.WriteJournal(h.file, req.Data, req.Offset); err != nil {
log.Printf("fuse: write(): journal error: %s", err)
return err
return ToError(err)
}
resp.Size = len(req.Data)
return nil
Expand Down