Skip to content

Commit f8b2eab

Browse files
committed
allow parallel test execution via skip_queue flag
1 parent 8df241a commit f8b2eab

File tree

8 files changed

+70
-32
lines changed

8 files changed

+70
-32
lines changed

pkg/coordinator/coordinator.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,10 @@ func (c *Coordinator) Run(ctx context.Context) error {
191191
// start per epoch GC routine
192192
go c.runEpochGC(ctx)
193193

194-
// run tests
194+
// start off queue test execution loop
195+
go c.runner.RunOffQueueTestExecutionLoop(ctx)
196+
197+
// run test execution loop for queued tests
195198
c.runner.RunTestExecutionLoop(ctx, c.Config.Coordinator.MaxConcurrentTests)
196199

197200
return nil
@@ -289,8 +292,8 @@ func (c *Coordinator) DeleteTestRun(runID uint64) error {
289292
return err
290293
}
291294

292-
func (c *Coordinator) ScheduleTest(descriptor types.TestDescriptor, configOverrides map[string]any, allowDuplicate bool) (types.TestRunner, error) {
293-
return c.runner.ScheduleTest(descriptor, configOverrides, allowDuplicate)
295+
func (c *Coordinator) ScheduleTest(descriptor types.TestDescriptor, configOverrides map[string]any, allowDuplicate bool, skipQueue bool) (types.TestRunner, error) {
296+
return c.runner.ScheduleTest(descriptor, configOverrides, allowDuplicate, skipQueue)
294297
}
295298

296299
func (c *Coordinator) startMetrics() error {

pkg/coordinator/testrunner.go

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,24 @@ type TestRunner struct {
1616
coordinator types.Coordinator
1717

1818
runIDCounter uint64
19-
lastExecutedRunID uint64
2019
testSchedulerMutex sync.Mutex
2120

22-
testRunMap map[uint64]types.Test
23-
testQueue []types.TestRunner
24-
testRegistryMutex sync.RWMutex
25-
testNotificationChan chan bool
21+
testRunMap map[uint64]types.Test
22+
testQueue []types.TestRunner
23+
testRegistryMutex sync.RWMutex
24+
queueNotificationChan chan bool
25+
offQueueNotificationChan chan types.TestRunner
2626
}
2727

2828
func NewTestRunner(coordinator types.Coordinator, lastRunID uint64) *TestRunner {
2929
return &TestRunner{
30-
coordinator: coordinator,
31-
runIDCounter: lastRunID,
32-
lastExecutedRunID: lastRunID,
30+
coordinator: coordinator,
31+
runIDCounter: lastRunID,
3332

34-
testRunMap: map[uint64]types.Test{},
35-
testQueue: []types.TestRunner{},
36-
testNotificationChan: make(chan bool, 1),
33+
testRunMap: map[uint64]types.Test{},
34+
testQueue: []types.TestRunner{},
35+
queueNotificationChan: make(chan bool, 1),
36+
offQueueNotificationChan: make(chan types.TestRunner, 10),
3737
}
3838
}
3939

@@ -76,25 +76,29 @@ func (c *TestRunner) RemoveTestFromQueue(runID uint64) bool {
7676
return false
7777
}
7878

79-
func (c *TestRunner) ScheduleTest(descriptor types.TestDescriptor, configOverrides map[string]any, allowDuplicate bool) (types.TestRunner, error) {
79+
func (c *TestRunner) ScheduleTest(descriptor types.TestDescriptor, configOverrides map[string]any, allowDuplicate bool, skipQueue bool) (types.TestRunner, error) {
8080
if descriptor.Err() != nil {
8181
return nil, fmt.Errorf("cannot create test from failed test descriptor: %w", descriptor.Err())
8282
}
8383

84-
testRef, err := c.createTestRun(descriptor, configOverrides, allowDuplicate)
84+
testRef, err := c.createTestRun(descriptor, configOverrides, allowDuplicate, skipQueue)
8585
if err != nil {
8686
return nil, err
8787
}
8888

89-
select {
90-
case c.testNotificationChan <- true:
91-
default:
89+
if skipQueue {
90+
c.offQueueNotificationChan <- testRef
91+
} else {
92+
select {
93+
case c.queueNotificationChan <- true:
94+
default:
95+
}
9296
}
9397

9498
return testRef, nil
9599
}
96100

97-
func (c *TestRunner) createTestRun(descriptor types.TestDescriptor, configOverrides map[string]any, allowDuplicate bool) (types.TestRunner, error) {
101+
func (c *TestRunner) createTestRun(descriptor types.TestDescriptor, configOverrides map[string]any, allowDuplicate bool, skipQueue bool) (types.TestRunner, error) {
98102
c.testSchedulerMutex.Lock()
99103
defer c.testSchedulerMutex.Unlock()
100104

@@ -115,7 +119,9 @@ func (c *TestRunner) createTestRun(descriptor types.TestDescriptor, configOverri
115119
}
116120

117121
c.testRegistryMutex.Lock()
118-
c.testQueue = append(c.testQueue, testRef)
122+
if !skipQueue {
123+
c.testQueue = append(c.testQueue, testRef)
124+
}
119125
c.testRunMap[runID] = testRef
120126
c.testRegistryMutex.Unlock()
121127

@@ -164,7 +170,7 @@ runLoop:
164170
select {
165171
case <-ctx.Done():
166172
break runLoop
167-
case <-c.testNotificationChan:
173+
case <-c.queueNotificationChan:
168174
case <-time.After(60 * time.Second):
169175
}
170176
}
@@ -173,9 +179,18 @@ runLoop:
173179
waitGroup.Wait()
174180
}
175181

176-
func (c *TestRunner) runTest(ctx context.Context, testRef types.TestRunner) {
177-
c.lastExecutedRunID = testRef.RunID()
182+
func (c *TestRunner) RunOffQueueTestExecutionLoop(ctx context.Context) {
183+
for {
184+
select {
185+
case <-ctx.Done():
186+
return
187+
case testRef := <-c.offQueueNotificationChan:
188+
go c.runTest(ctx, testRef)
189+
}
190+
}
191+
}
178192

193+
func (c *TestRunner) runTest(ctx context.Context, testRef types.TestRunner) {
179194
if err := testRef.Validate(); err != nil {
180195
testRef.Logger().Errorf("test validation failed: %v", err)
181196
return
@@ -195,9 +210,10 @@ func (c *TestRunner) RunTestScheduler(ctx context.Context) {
195210

196211
// startup scheduler
197212
for _, testDescr := range c.getStartupTests() {
198-
_, err := c.ScheduleTest(testDescr, nil, false)
213+
testConfig := testDescr.Config()
214+
_, err := c.ScheduleTest(testDescr, nil, false, testConfig.Schedule.SkipQueue)
199215
if err != nil {
200-
c.coordinator.Logger().Errorf("could not schedule startup test execution for %v (%v): %v", testDescr.ID(), testDescr.Config().Name, err)
216+
c.coordinator.Logger().Errorf("could not schedule startup test execution for %v (%v): %v", testDescr.ID(), testConfig.Name, err)
201217
}
202218
}
203219

@@ -217,9 +233,10 @@ func (c *TestRunner) RunTestScheduler(ctx context.Context) {
217233
}
218234

219235
for _, testDescr := range c.getCronTests(cronTime) {
220-
_, err := c.ScheduleTest(testDescr, nil, false)
236+
testConfig := testDescr.Config()
237+
_, err := c.ScheduleTest(testDescr, nil, false, testConfig.Schedule.SkipQueue)
221238
if err != nil {
222-
c.coordinator.Logger().Errorf("could not schedule cron test execution for %v (%v): %v", testDescr.ID(), testDescr.Config().Name, err)
239+
c.coordinator.Logger().Errorf("could not schedule cron test execution for %v (%v): %v", testDescr.ID(), testConfig.Name, err)
223240
}
224241
}
225242
}

pkg/coordinator/types/coordinator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type Coordinator interface {
2424
GetTestByRunID(runID uint64) Test
2525
GetTestQueue() []Test
2626
GetTestHistory(testID string, firstRunID uint64, offset uint64, limit uint64) ([]Test, int)
27-
ScheduleTest(descriptor TestDescriptor, configOverrides map[string]any, allowDuplicate bool) (TestRunner, error)
27+
ScheduleTest(descriptor TestDescriptor, configOverrides map[string]any, allowDuplicate bool, skipQueue bool) (TestRunner, error)
2828
DeleteTestRun(runID uint64) error
2929
}
3030

pkg/coordinator/types/test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ type ExternalTestConfig struct {
6161
}
6262

6363
type TestSchedule struct {
64-
Startup bool `yaml:"startup" json:"startup"`
65-
Cron []string `yaml:"cron" json:"cron"`
64+
Startup bool `yaml:"startup" json:"startup"`
65+
Cron []string `yaml:"cron" json:"cron"`
66+
SkipQueue bool `yaml:"skipQueue" json:"skipQueue"`
6667
}
6768

6869
type TestDescriptor interface {

pkg/coordinator/web/api/docs/docs.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,6 +1104,9 @@ const docTemplate = `{
11041104
"type": "object",
11051105
"additionalProperties": {}
11061106
},
1107+
"skip_queue": {
1108+
"type": "boolean"
1109+
},
11071110
"test_id": {
11081111
"type": "string"
11091112
}
@@ -1272,6 +1275,9 @@ const docTemplate = `{
12721275
"type": "string"
12731276
}
12741277
},
1278+
"skipQueue": {
1279+
"type": "boolean"
1280+
},
12751281
"startup": {
12761282
"type": "boolean"
12771283
}

pkg/coordinator/web/api/docs/swagger.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,6 +1096,9 @@
10961096
"type": "object",
10971097
"additionalProperties": {}
10981098
},
1099+
"skip_queue": {
1100+
"type": "boolean"
1101+
},
10991102
"test_id": {
11001103
"type": "string"
11011104
}
@@ -1264,6 +1267,9 @@
12641267
"type": "string"
12651268
}
12661269
},
1270+
"skipQueue": {
1271+
"type": "boolean"
1272+
},
12671273
"startup": {
12681274
"type": "boolean"
12691275
}

pkg/coordinator/web/api/docs/swagger.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,8 @@ definitions:
222222
config:
223223
additionalProperties: {}
224224
type: object
225+
skip_queue:
226+
type: boolean
225227
test_id:
226228
type: string
227229
type: object
@@ -333,6 +335,8 @@ definitions:
333335
items:
334336
type: string
335337
type: array
338+
skipQueue:
339+
type: boolean
336340
startup:
337341
type: boolean
338342
type: object

pkg/coordinator/web/api/post_test_run_api.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type PostTestRunsScheduleRequest struct {
1313
TestID string `json:"test_id"`
1414
Config map[string]any `json:"config"`
1515
AllowDuplicate bool `json:"allow_duplicate"`
16+
SkipQueue bool `json:"skip_queue"`
1617
}
1718

1819
type PostTestRunsScheduleResponse struct {
@@ -77,7 +78,7 @@ func (ah *APIHandler) PostTestRunsSchedule(w http.ResponseWriter, r *http.Reques
7778
}
7879

7980
// create test run
80-
testInstance, err := ah.coordinator.ScheduleTest(testDescriptor, req.Config, req.AllowDuplicate)
81+
testInstance, err := ah.coordinator.ScheduleTest(testDescriptor, req.Config, req.AllowDuplicate, req.SkipQueue)
8182
if err != nil {
8283
ah.sendErrorResponse(w, r.URL.String(), fmt.Sprintf("failed creating test: %v", err), http.StatusInternalServerError)
8384
return

0 commit comments

Comments
 (0)