Skip to content

Commit ea4a59e

Browse files
committed
write logs to database
1 parent dcf621b commit ea4a59e

File tree

18 files changed

+681
-107
lines changed

18 files changed

+681
-107
lines changed

pkg/coordinator/coordinator.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ type testDescriptorEntry struct {
6969
func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort int) *Coordinator {
7070
return &Coordinator{
7171
log: logger.NewLogger(&logger.ScopeOptions{
72-
Parent: log,
73-
HistorySize: 5000,
72+
Parent: log,
73+
BufferSize: 5000,
7474
}),
7575
Config: config,
7676
metricsPort: metricsPort,
@@ -215,7 +215,7 @@ func (c *Coordinator) Logger() logrus.FieldLogger {
215215
return c.log.GetLogger()
216216
}
217217

218-
func (c *Coordinator) LogScope() *logger.LogScope {
218+
func (c *Coordinator) LogReader() logger.LogReader {
219219
return c.log
220220
}
221221

@@ -619,7 +619,6 @@ runLoop:
619619
c.testRegistryMutex.Unlock()
620620

621621
if nextTest != nil {
622-
623622
// run next test
624623
testFunc := func(nextTest types.TestRunner) {
625624
defer func() {

pkg/coordinator/db/schema/pgsql/20240913135112_init.sql

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,20 @@ CREATE TABLE IF NOT EXISTS public."task_states"
5353
"task_config" TEXT NOT NULL,
5454
"task_status" TEXT NOT NULL,
5555
"task_result" INTEGER NOT NULL,
56+
"task_error" TEXT NOT NULL,
5657
CONSTRAINT "task_states_pkey" PRIMARY KEY ("run_id", "task_id")
5758
);
5859

5960
CREATE TABLE IF NOT EXISTS public."task_logs"
6061
(
6162
"run_id" INTEGER NOT NULL,
6263
"task_id" INTEGER NOT NULL,
64+
"log_idx" INTEGER NOT NULL,
6365
"log_time" BIGINT NOT NULL,
64-
"log_level" VARCHAR(16) NOT NULL,
66+
"log_level" SMALLINT NOT NULL,
6567
"log_fields" TEXT NOT NULL,
6668
"log_message" TEXT NOT NULL,
67-
CONSTRAINT "task_logs_pkey" PRIMARY KEY ("run_id", "task_id", "log_time")
69+
CONSTRAINT "task_logs_pkey" PRIMARY KEY ("run_id", "task_id", "log_idx")
6870
);
6971

7072
-- +goose StatementEnd

pkg/coordinator/db/schema/sqlite/20240913135112_init.sql

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,20 @@ CREATE TABLE IF NOT EXISTS "task_states"
5353
"task_config" TEXT NOT NULL,
5454
"task_status" TEXT NOT NULL,
5555
"task_result" INTEGER NOT NULL,
56+
"task_error" TEXT NOT NULL,
5657
CONSTRAINT "task_states_pkey" PRIMARY KEY ("run_id", "task_id")
5758
);
5859

5960
CREATE TABLE IF NOT EXISTS "task_logs"
6061
(
6162
"run_id" INTEGER NOT NULL,
6263
"task_id" INTEGER NOT NULL,
64+
"log_idx" INTEGER NOT NULL,
6365
"log_time" INTEGER NOT NULL,
64-
"log_level" TEXT NOT NULL,
66+
"log_level" INTEGER NOT NULL,
6567
"log_fields" TEXT NOT NULL,
6668
"log_message" TEXT NOT NULL,
67-
CONSTRAINT "task_logs_pkey" PRIMARY KEY ("run_id", "task_id", "log_time")
69+
CONSTRAINT "task_logs_pkey" PRIMARY KEY ("run_id", "task_id", "log_idx")
6870
);
6971

7072
-- +goose StatementEnd

pkg/coordinator/db/task_logs.go

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ import "github.com/jmoiron/sqlx"
55
type TaskLog struct {
66
RunID int `db:"run_id"`
77
TaskID int `db:"task_id"`
8+
LogIndex int `db:"log_idx"`
89
LogTime int64 `db:"log_time"`
9-
LogLevel string `db:"log_level"`
10+
LogLevel int `db:"log_level"`
1011
LogFields string `db:"log_fields"`
1112
LogMessage string `db:"log_message"`
1213
}
@@ -15,21 +16,52 @@ func (db *Database) InsertTaskLog(tx *sqlx.Tx, log *TaskLog) error {
1516
_, err := tx.Exec(db.EngineQuery(map[EngineType]string{
1617
EnginePgsql: `
1718
INSERT INTO task_logs (
18-
run_id, task_id, log_time, log_level, log_fields, log_message
19-
) VALUES ($1, $2, $3, $4, $5, $6)
19+
run_id, task_id, log_idx, log_time, log_level, log_fields, log_message
20+
) VALUES ($1, $2, $3, $4, $5, $6, $7)
2021
ON CONFLICT (run_id, task_id, log_time) DO UPDATE SET
22+
log_time = excluded.log_time,
2123
log_level = excluded.log_level,
2224
log_fields = excluded.log_fields,
2325
log_message = excluded.log_message`,
2426
EngineSqlite: `
2527
INSERT OR REPLACE INTO task_logs (
26-
run_id, task_id, log_time, log_level, log_fields, log_message
27-
) VALUES ($1, $2, $3, $4, $5, $6)`,
28+
run_id, task_id, log_idx, log_time, log_level, log_fields, log_message
29+
) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
2830
}),
29-
log.RunID, log.TaskID, log.LogTime, log.LogLevel, log.LogFields, log.LogMessage)
31+
log.RunID, log.TaskID, log.LogIndex, log.LogTime, log.LogLevel, log.LogFields, log.LogMessage)
3032
if err != nil {
3133
return err
3234
}
3335

3436
return nil
3537
}
38+
39+
func (db *Database) GetTaskLogs(runID, taskID, fromIdx, limit int) ([]*TaskLog, error) {
40+
var logs []*TaskLog
41+
42+
err := db.reader.Select(&logs, `
43+
SELECT * FROM task_logs
44+
WHERE run_id = $1 AND task_id = $2 AND log_idx >= $3
45+
ORDER BY log_idx ASC
46+
LIMIT $4`,
47+
runID, taskID, fromIdx, limit)
48+
if err != nil {
49+
return nil, err
50+
}
51+
52+
return logs, nil
53+
}
54+
55+
func (db *Database) GetLastLogIndex(runID, taskID int) (int, error) {
56+
var logIdx int
57+
58+
err := db.reader.Get(&logIdx, `
59+
SELECT COALESCE(MAX(log_idx), 0) FROM task_logs
60+
WHERE run_id = $1 AND task_id = $2`,
61+
runID, taskID)
62+
if err != nil {
63+
return 0, err
64+
}
65+
66+
return logIdx, nil
67+
}

pkg/coordinator/db/task_states.go

Lines changed: 76 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package db
22

3-
import "github.com/jmoiron/sqlx"
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"github.com/jmoiron/sqlx"
8+
)
49

510
/*
611
CREATE TABLE IF NOT EXISTS public."task_states"
@@ -42,15 +47,16 @@ type TaskState struct {
4247
TaskConfig string `db:"task_config"`
4348
TaskStatus string `db:"task_status"`
4449
TaskResult int `db:"task_result"`
50+
TaskError string `db:"task_error"`
4551
}
4652

4753
func (db *Database) InsertTaskState(tx *sqlx.Tx, state *TaskState) error {
4854
_, err := tx.Exec(db.EngineQuery(map[EngineType]string{
4955
EnginePgsql: `
5056
INSERT INTO task_states (
5157
run_id, task_id, parent_task, name, title, timeout, ifcond, is_cleanup, is_started, is_running, is_skipped, is_timeout,
52-
start_time, stop_time, task_config, task_status, task_result
53-
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
58+
start_time, stop_time, task_config, task_status, task_result, task_error
59+
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18)
5460
ON CONFLICT (run_id, task_id) DO UPDATE SET
5561
parent_task = excluded.parent_task,
5662
name = excluded.name,
@@ -66,16 +72,79 @@ func (db *Database) InsertTaskState(tx *sqlx.Tx, state *TaskState) error {
6672
stop_time = excluded.stop_time,
6773
task_config = excluded.task_config,
6874
task_status = excluded.task_status,
69-
task_result = excluded.task_result`,
75+
task_result = excluded.task_result,
76+
task_error = excluded.task_error`,
7077
EngineSqlite: `
7178
INSERT OR REPLACE INTO task_states (
7279
run_id, task_id, parent_task, name, title, timeout, ifcond, is_cleanup, is_started, is_running, is_skipped, is_timeout,
73-
start_time, stop_time, task_config, task_status, task_result
74-
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)`,
80+
start_time, stop_time, task_config, task_status, task_result, task_error
81+
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18)`,
7582
}),
7683
state.RunID, state.TaskID, state.ParentTask, state.Name, state.Title, state.Timeout, state.IfCond, state.IsCleanup, state.IsStarted, state.IsRunning,
7784
state.IsSkipped, state.IsTimeout, state.StartTime, state.StopTime, state.TaskConfig, state.TaskStatus,
78-
state.TaskResult)
85+
state.TaskResult, state.TaskError)
86+
if err != nil {
87+
return err
88+
}
89+
90+
return nil
91+
}
92+
93+
func (db *Database) UpdateTaskState(tx *sqlx.Tx, state *TaskState, updateFields []string) error {
94+
var sql strings.Builder
95+
96+
args := []any{}
97+
98+
fmt.Fprint(&sql, `UPDATE task_states SET `)
99+
100+
for i, field := range updateFields {
101+
if i > 0 {
102+
fmt.Fprint(&sql, `, `)
103+
}
104+
105+
switch field {
106+
case "title":
107+
fmt.Fprintf(&sql, `title = $%v`, len(args)+1)
108+
args = append(args, state.Title)
109+
case "is_started":
110+
fmt.Fprintf(&sql, `is_started = $%v`, len(args)+1)
111+
args = append(args, state.IsStarted)
112+
case "is_running":
113+
fmt.Fprintf(&sql, `is_running = $%v`, len(args)+1)
114+
args = append(args, state.IsRunning)
115+
case "is_skipped":
116+
fmt.Fprintf(&sql, `is_skipped = $%v`, len(args)+1)
117+
args = append(args, state.IsSkipped)
118+
case "is_timeout":
119+
fmt.Fprintf(&sql, `is_timeout = $%v`, len(args)+1)
120+
args = append(args, state.IsTimeout)
121+
case "start_time":
122+
fmt.Fprintf(&sql, `start_time = $%v`, len(args)+1)
123+
args = append(args, state.StartTime)
124+
case "stop_time":
125+
fmt.Fprintf(&sql, `stop_time = $%v`, len(args)+1)
126+
args = append(args, state.StopTime)
127+
case "task_config":
128+
fmt.Fprintf(&sql, `task_config = $%v`, len(args)+1)
129+
args = append(args, state.TaskConfig)
130+
case "task_status":
131+
fmt.Fprintf(&sql, `task_status = $%v`, len(args)+1)
132+
args = append(args, state.TaskStatus)
133+
case "task_result":
134+
fmt.Fprintf(&sql, `task_result = $%v`, len(args)+1)
135+
args = append(args, state.TaskResult)
136+
case "task_error":
137+
fmt.Fprintf(&sql, `task_error = $%v`, len(args)+1)
138+
args = append(args, state.TaskError)
139+
default:
140+
return fmt.Errorf("unknown field %q", field)
141+
}
142+
}
143+
144+
fmt.Fprintf(&sql, ` WHERE run_id = $%v AND task_id = $%v`, len(args)+1, len(args)+2)
145+
args = append(args, state.RunID, state.TaskID)
146+
147+
_, err := tx.Exec(sql.String(), args...)
79148
if err != nil {
80149
return err
81150
}

pkg/coordinator/logger/dbreader.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package logger
2+
3+
import "github.com/ethpandaops/assertoor/pkg/coordinator/db"
4+
5+
type LogDBReader struct {
6+
database *db.Database
7+
testRunID uint64
8+
taskID uint64
9+
lastIdx *int
10+
}
11+
12+
func NewLogDBReader(database *db.Database, testRunID, taskID uint64) *LogDBReader {
13+
return &LogDBReader{
14+
database: database,
15+
testRunID: testRunID,
16+
taskID: taskID,
17+
}
18+
}
19+
20+
func (ls *LogDBReader) GetLogEntryCount() int {
21+
if ls.lastIdx == nil {
22+
lastIdx, err := ls.database.GetLastLogIndex(int(ls.testRunID), int(ls.taskID))
23+
if err != nil {
24+
return 0
25+
}
26+
27+
ls.lastIdx = &lastIdx
28+
}
29+
30+
if ls.lastIdx == nil {
31+
return 0
32+
}
33+
34+
return *ls.lastIdx
35+
}
36+
37+
func (ls *LogDBReader) GetLogEntries(from, limit int) []*db.TaskLog {
38+
dbEntries, err := ls.database.GetTaskLogs(int(ls.testRunID), int(ls.taskID), from, limit)
39+
if err != nil {
40+
return nil
41+
}
42+
43+
return dbEntries
44+
}

0 commit comments

Comments
 (0)