Skip to content

Commit c26bdc6

Browse files
authored
[ENH] Add log sealing to the go service. (#4554)
## Description of changes This PR does the following: - Adds a migration to add the `is_sealed` column to the collection table in the log database. - Updates the go/ README.md to mention the correct atlas command to run. - Updates the go store queries to recognize the new field. - Add a new SealLog query to unconditionally seal a log. - Propagate the `is_sealed` column from table to `IsSealed` field in the protobuf message. - Add a test that push logs will fail when sealed. - Add support to the RustLogService for the new IDL. - Translate `is_sealed` responses into explicit errors in the log client. ## Test plan CI, as usual. ## Documentation Changes N/A
1 parent 7202f83 commit c26bdc6

File tree

15 files changed

+104
-19
lines changed

15 files changed

+104
-19
lines changed

go/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,12 @@ ALSO, ensure you have copied the `/include` directory of the release to `../incl
3535
Then, to install the plugins, run the `go install` commands from the `Dockerfile`. The exact commands are not here because we would be duplicating where versions live if we did. The `Dockerfile` is the source of truth for the versions.
3636

3737
Once those are all installed, you can run `make build` to build the project and most importantly, the generated protobuf files which your IDE will complain about until they are generated.
38+
39+
## Schema Migrations
40+
41+
From the directory with the migrations/ and schema/ directories, you can generate a new schema by
42+
changing the files in schema directly and running this command:
43+
44+
```
45+
atlas migrate diff --dir file://migrations --to file://schema --dev-url 'docker://postgres/15/dev?search_path=public'
46+
```

go/pkg/log/repository/log.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type LogRepository struct {
2323
sysDb sysdb.ISysDB
2424
}
2525

26-
func (r *LogRepository) InsertRecords(ctx context.Context, collectionId string, records [][]byte) (insertCount int64, err error) {
26+
func (r *LogRepository) InsertRecords(ctx context.Context, collectionId string, records [][]byte) (insertCount int64, isSealed bool, err error) {
2727
var tx pgx.Tx
2828
tx, err = r.conn.BeginTx(ctx, pgx.TxOptions{})
2929
if err != nil {
@@ -66,6 +66,13 @@ func (r *LogRepository) InsertRecords(ctx context.Context, collectionId string,
6666
return
6767
}
6868
}
69+
if collection.IsSealed {
70+
insertCount = 0
71+
isSealed = true
72+
err = nil
73+
return
74+
}
75+
isSealed = false
6976
params := make([]log.InsertRecordParams, len(records))
7077
for i, record := range records {
7178
offset := collection.RecordEnumerationOffsetPosition + int64(i) + 1

go/pkg/log/repository/log_test.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,15 @@ func (suite *LogTestSuite) TestGarbageCollection() {
4545
collectionID2 := types.NewUniqueID()
4646

4747
// Add records to collection 1
48-
count, err := suite.lr.InsertRecords(ctx, collectionID1.String(), [][]byte{{1, 2, 3}})
48+
count, isSealed, err := suite.lr.InsertRecords(ctx, collectionID1.String(), [][]byte{{1, 2, 3}})
4949
assert.NoError(suite.t, err, "Failed to insert records")
50+
assert.False(suite.t, isSealed, count, "Log sealed")
5051
assert.Equal(suite.t, int64(1), count, "Failed to insert records")
5152

5253
// Add records to collection 2
53-
count, err = suite.lr.InsertRecords(ctx, collectionID2.String(), [][]byte{{1, 2, 3}})
54+
count, isSealed, err = suite.lr.InsertRecords(ctx, collectionID2.String(), [][]byte{{1, 2, 3}})
5455
assert.NoError(suite.t, err, "Failed to insert records")
56+
assert.False(suite.t, isSealed, count, "Log sealed")
5557
assert.Equal(suite.t, int64(1), count, "Failed to insert records")
5658

5759
// Add collection 1 to sysdb
@@ -72,8 +74,9 @@ func (suite *LogTestSuite) TestGarbageCollection() {
7274
assert.Equal(suite.t, 0, len(records), "Failed to run garbage collection")
7375

7476
// Add records to collection 2, expect offset to reset
75-
count, err = suite.lr.InsertRecords(ctx, collectionID2.String(), [][]byte{{4, 5, 6}})
77+
count, isSealed, err = suite.lr.InsertRecords(ctx, collectionID2.String(), [][]byte{{4, 5, 6}})
7678
assert.NoError(suite.t, err, "Failed to insert records")
79+
assert.False(suite.t, isSealed, count, "Log sealed")
7780
assert.Equal(suite.t, int64(1), count, "Failed to insert records")
7881
records, err = suite.lr.PullRecords(ctx, collectionID2.String(), 1, 1, time.Now().UnixNano())
7982
assert.NoError(suite.t, err, "Failed to pull records")
@@ -110,6 +113,24 @@ func (suite *LogTestSuite) TestUniqueConstraintPushLogs() {
110113
}
111114
}
112115

116+
func (suite *LogTestSuite) TestSealedLogWontPush() {
117+
ctx := context.Background()
118+
collectionId := types.NewUniqueID()
119+
params := log.InsertCollectionParams {
120+
ID: collectionId.String(),
121+
RecordEnumerationOffsetPosition: 1,
122+
RecordCompactionOffsetPosition: 0,
123+
}
124+
_, err := suite.lr.queries.InsertCollection(ctx, params)
125+
assert.NoError(suite.t, err, "Initializing log should not fail.")
126+
_, err = suite.lr.queries.SealLog(ctx, collectionId.String())
127+
assert.NoError(suite.t, err, "Sealing log should not fail.")
128+
var isSealed bool
129+
_, isSealed, err = suite.lr.InsertRecords(ctx, collectionId.String(), [][]byte{{1,2,3}})
130+
assert.NoError(suite.t, err, "Failed to push logs")
131+
assert.True(suite.t, isSealed, "Did not report was sealed")
132+
}
133+
113134
func TestLogTestSuite(t *testing.T) {
114135
testSuite := new(LogTestSuite)
115136
testSuite.t = t

go/pkg/log/server/server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@ func (s *logServer) PushLogs(ctx context.Context, req *logservicepb.PushLogsRequ
3636
recordsContent = append(recordsContent, data)
3737
}
3838
var recordCount int64
39-
recordCount, err = s.lr.InsertRecords(ctx, collectionID.String(), recordsContent)
39+
var isSealed bool
40+
recordCount, isSealed, err = s.lr.InsertRecords(ctx, collectionID.String(), recordsContent)
4041
if err != nil {
4142
return
4243
}
4344
res = &logservicepb.PushLogsResponse{
4445
RecordCount: int32(recordCount),
46+
LogIsSealed: isSealed,
4547
}
4648
return
4749
}

go/pkg/log/store/db/copyfrom.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/pkg/log/store/db/db.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/pkg/log/store/db/models.go

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/pkg/log/store/db/queries.sql.go

Lines changed: 31 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- Modify "collection" table
2+
ALTER TABLE "collection" ADD COLUMN "is_sealed" boolean NOT NULL DEFAULT false;
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1-
h1:kG+ejV1DS3youx+m5SNNFYabJeDqfYTdSQHbJtR2/eU=
1+
h1:Ha4eNnpWJQC0flnfRZKnpw06LblDCh5ocPykLIy5koE=
22
20240404181827_initial.sql h1:xnoD1FcXImqQPJOvaDbTOwTGPLtCP3RibetuaaZeATI=
3+
20250515150704.sql h1:ySONZilpdnd0BHQhsVAoSj5ud7/3gpI03VuwwWqZqrE=

0 commit comments

Comments
 (0)