Skip to content

Commit bbba133

Browse files
m25nJoe GeeGlenn Oppegardwarshawd
committed
Convert Source into an interface
Co-authored-by: Joe Gee <[email protected]> Co-authored-by: Glenn Oppegard <[email protected]> Co-authored-by: Devon Warshaw <[email protected]> Signed-off-by: Matthew Conger-Eldeen <[email protected]>
1 parent 2884c27 commit bbba133

File tree

10 files changed

+298
-174
lines changed

10 files changed

+298
-174
lines changed

src/vizier/services/query_broker/script_runner/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ go_library(
2323
"cloud_source.go",
2424
"config_map_source.go",
2525
"script_runner.go",
26-
"script_source.go",
26+
"source.go",
27+
"sources.go",
2728
],
2829
importpath = "px.dev/pixie/src/vizier/services/query_broker/script_runner",
2930
visibility = ["//visibility:public"],

src/vizier/services/query_broker/script_runner/cloud_source.go

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,28 +37,57 @@ import (
3737
"px.dev/pixie/src/vizier/services/metadata/metadatapb"
3838
)
3939

40-
// CloudSource constructs a [Source] that will pull cron scripts from the control plane
41-
func CloudSource(nc *nats.Conn, csClient metadatapb.CronScriptStoreServiceClient, signingKey string) Source {
42-
return func(baseCtx context.Context, updateCb func(*cvmsgspb.CronScriptUpdate)) (map[string]*cvmsgspb.CronScript, func(), error) {
43-
ctx := metadata.AppendToOutgoingContext(baseCtx,
44-
"authorization", fmt.Sprintf("bearer %s", cronScriptStoreToken(signingKey)),
45-
)
46-
sub, err := nc.Subscribe(CronScriptUpdatesChannel, natsUpdater(ctx, nc, csClient, updateCb))
47-
if err != nil {
48-
return nil, nil, err
49-
}
50-
unsubscribe := func() {
51-
if err := sub.Unsubscribe(); err != nil {
52-
log.WithError(err).Error("could not unsubscribe from cloud cron script updates")
53-
}
54-
}
55-
initialScripts, err := fetchInitialScripts(ctx, nc, csClient)
56-
if err != nil {
57-
unsubscribe()
58-
return nil, nil, err
40+
// CloudSource is a Source that pulls cron scripts form the cloud
41+
type CloudSource struct {
42+
scripts map[string]*cvmsgspb.CronScript
43+
stop func()
44+
client metadatapb.CronScriptStoreServiceClient
45+
nc *nats.Conn
46+
signingKey string
47+
}
48+
49+
// NewCloudSource constructs a Source that will pull cron scripts from the cloud
50+
func NewCloudSource(nc *nats.Conn, csClient metadatapb.CronScriptStoreServiceClient, signingKey string) *CloudSource {
51+
return &CloudSource{
52+
client: csClient,
53+
nc: nc,
54+
signingKey: signingKey,
55+
}
56+
}
57+
58+
// Start subscribes to updates from the cloud on the CronScriptUpdatesChannel and sends resulting updates on updatesCh
59+
func (source *CloudSource) Start(baseCtx context.Context, updatesCh chan<- *cvmsgspb.CronScriptUpdate) error {
60+
ctx := metadata.AppendToOutgoingContext(baseCtx,
61+
"authorization", fmt.Sprintf("bearer %s", cronScriptStoreToken(source.signingKey)),
62+
)
63+
sub, err := source.nc.Subscribe(CronScriptUpdatesChannel, natsUpdater(ctx, source.nc, source.client, updatesCh))
64+
if err != nil {
65+
return err
66+
}
67+
unsubscribe := func() {
68+
if err := sub.Unsubscribe(); err != nil {
69+
log.WithError(err).Error("could not unsubscribe from cloud cron script updates")
5970
}
60-
return initialScripts, unsubscribe, nil
6171
}
72+
initialScripts, err := fetchInitialScripts(ctx, source.nc, source.client)
73+
if err != nil {
74+
unsubscribe()
75+
return err
76+
}
77+
78+
source.scripts = initialScripts
79+
source.stop = unsubscribe
80+
return nil
81+
}
82+
83+
// GetInitialScripts returns the initial set of scripts that all updates will be based on
84+
func (source *CloudSource) GetInitialScripts() map[string]*cvmsgspb.CronScript {
85+
return source.scripts
86+
}
87+
88+
// Stop stops further updates from being sent
89+
func (source *CloudSource) Stop() {
90+
source.stop()
6291
}
6392

6493
func cronScriptStoreToken(signingKey string) string {
@@ -67,15 +96,15 @@ func cronScriptStoreToken(signingKey string) string {
6796
return token
6897
}
6998

70-
func natsUpdater(ctx context.Context, nc *nats.Conn, csClient metadatapb.CronScriptStoreServiceClient, updateCb func(*cvmsgspb.CronScriptUpdate)) func(msg *nats.Msg) {
99+
func natsUpdater(ctx context.Context, nc *nats.Conn, csClient metadatapb.CronScriptStoreServiceClient, updatesCh chan<- *cvmsgspb.CronScriptUpdate) func(msg *nats.Msg) {
71100
return func(msg *nats.Msg) {
72101
var cronScriptUpdate cvmsgspb.CronScriptUpdate
73102
if err := unmarshalC2V(msg, &cronScriptUpdate); err != nil {
74103
log.WithError(err).Error("Failed to unmarshal c2v message")
75104
return
76105
}
77106

78-
updateCb(&cronScriptUpdate)
107+
updatesCh <- &cronScriptUpdate
79108

80109
switch cronScriptUpdate.Msg.(type) {
81110
case *cvmsgspb.CronScriptUpdate_UpsertReq:

src/vizier/services/query_broker/script_runner/cloud_source_test.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,10 @@ func TestCloudScriptsSource_InitialState(t *testing.T) {
166166
require.NoError(t, scriptSub.Unsubscribe())
167167
}()
168168

169-
_, stop, err := CloudSource(nc, fcs, "test")(context.Background(), dummyUpdateCb())
169+
source := NewCloudSource(nc, fcs, "test")
170+
err := source.Start(context.Background(), nil)
170171
require.NoError(t, err)
171-
defer stop()
172+
defer source.Stop()
172173

173174
select {
174175
case <-gotCronScripts:
@@ -216,8 +217,9 @@ func TestCloudScriptsSource_InitialState(t *testing.T) {
216217
}
217218
}()
218219

219-
updateCb, updatesCh := mockUpdateCb()
220-
_, _, err := CloudSource(nc, scs, "test")(context.Background(), updateCb)
220+
updatesCh := mockUpdatesCh()
221+
source := NewCloudSource(nc, scs, "test")
222+
err := source.Start(context.Background(), updatesCh)
221223
require.Error(t, err)
222224

223225
sendUpdates(t, nc, sentUpdates)
@@ -297,8 +299,9 @@ func TestCloudScriptsSource_InitialState(t *testing.T) {
297299
}
298300
}()
299301

300-
updateCb, updatesCh := mockUpdateCb()
301-
_, _, err := CloudSource(nc, scs, "test")(context.Background(), updateCb)
302+
updatesCh := mockUpdatesCh()
303+
source := NewCloudSource(nc, scs, "test")
304+
err := source.Start(context.Background(), updatesCh)
302305
require.Error(t, err)
303306

304307
sendUpdates(t, nc, sentUpdates)
@@ -447,10 +450,11 @@ func TestCloudScriptsSource_Updates(t *testing.T) {
447450
}
448451
}()
449452

450-
updateCb, updatesCh := mockUpdateCb()
451-
_, stop, err := CloudSource(nc, fcs, "test")(context.Background(), updateCb)
453+
updatesCh := mockUpdatesCh()
454+
source := NewCloudSource(nc, fcs, "test")
455+
err := source.Start(context.Background(), updatesCh)
452456
require.NoError(t, err)
453-
defer stop()
457+
defer source.Stop()
454458

455459
<-gotChecksumReq
456460
sendUpdates(t, nc, test.updates)
@@ -528,10 +532,11 @@ func TestCloudScriptsSource_Updates(t *testing.T) {
528532
}
529533
}()
530534

531-
updateCb, updatesCh := mockUpdateCb()
532-
_, stop, err := CloudSource(nc, fcs, "test")(context.Background(), updateCb)
535+
updatesCh := mockUpdatesCh()
536+
source := NewCloudSource(nc, fcs, "test")
537+
err := source.Start(context.Background(), updatesCh)
533538
require.NoError(t, err)
534-
stop()
539+
source.Stop()
535540

536541
sendUpdates(t, nc, sentUpdates)
537542

src/vizier/services/query_broker/script_runner/config_map_source.go

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,38 +33,60 @@ import (
3333
"px.dev/pixie/src/utils"
3434
)
3535

36-
// ConfigMapSource constructs a [Source] that extracts cron scripts from config maps with the label "purpose=cron-script".
36+
// ConfigMapSource pulls cron scripts from config maps
37+
type ConfigMapSource struct {
38+
scripts map[string]*cvmsgspb.CronScript
39+
stop func()
40+
client clientv1.ConfigMapInterface
41+
}
42+
43+
// NewConfigMapSource constructs a [Source] that extracts cron scripts from config maps with the label "purpose=cron-script".
3744
// Each config map must contain
3845
// - a script.pxl with the pixel script
3946
// - a configs.yaml which will be stored in the Configs field of [cvmsgspb.CronScript]
4047
// - a cron.yaml that contains a "frequency_s" key
41-
func ConfigMapSource(client clientv1.ConfigMapInterface) Source {
42-
return func(baseCtx context.Context, updateCb func(*cvmsgspb.CronScriptUpdate)) (map[string]*cvmsgspb.CronScript, func(), error) {
43-
options := metav1.ListOptions{LabelSelector: "purpose=cron-script"}
44-
watcher, err := client.Watch(baseCtx, options)
45-
if err != nil {
46-
return nil, nil, err
47-
}
48-
go configMapUpdater(watcher, updateCb)
49-
configmaps, err := client.List(baseCtx, options)
48+
func NewConfigMapSource(client clientv1.ConfigMapInterface) *ConfigMapSource {
49+
return &ConfigMapSource{client: client}
50+
}
51+
52+
// Start watches for updates to matching configmaps and sends resulting updates on updatesCh
53+
func (source *ConfigMapSource) Start(baseCtx context.Context, updatesCh chan<- *cvmsgspb.CronScriptUpdate) error {
54+
options := metav1.ListOptions{LabelSelector: "purpose=cron-script"}
55+
watcher, err := source.client.Watch(baseCtx, options)
56+
if err != nil {
57+
return err
58+
}
59+
go configMapUpdater(watcher, updatesCh)
60+
configmaps, err := source.client.List(baseCtx, options)
61+
if err != nil {
62+
watcher.Stop()
63+
return err
64+
}
65+
scripts := map[string]*cvmsgspb.CronScript{}
66+
for _, configmap := range configmaps.Items {
67+
id, cronScript, err := configmapToCronScript(&configmap)
5068
if err != nil {
51-
watcher.Stop()
52-
return nil, nil, err
69+
logCronScriptParseError(err)
70+
continue
5371
}
54-
scripts := map[string]*cvmsgspb.CronScript{}
55-
for _, configmap := range configmaps.Items {
56-
id, cronScript, err := configmapToCronScript(&configmap)
57-
if err != nil {
58-
logCronScriptParseError(err)
59-
continue
60-
}
61-
scripts[id] = cronScript
62-
}
63-
return scripts, watcher.Stop, nil
72+
scripts[id] = cronScript
6473
}
74+
source.scripts = scripts
75+
source.stop = watcher.Stop
76+
return nil
77+
}
78+
79+
// GetInitialScripts returns the initial set of scripts that all updates will be based on
80+
func (source *ConfigMapSource) GetInitialScripts() map[string]*cvmsgspb.CronScript {
81+
return source.scripts
82+
}
83+
84+
// Stop stops further updates from being sent
85+
func (source *ConfigMapSource) Stop() {
86+
source.stop()
6587
}
6688

67-
func configMapUpdater(watcher watch.Interface, updateCb func(*cvmsgspb.CronScriptUpdate)) {
89+
func configMapUpdater(watcher watch.Interface, updatesCh chan<- *cvmsgspb.CronScriptUpdate) {
6890
for event := range watcher.ResultChan() {
6991
switch event.Type {
7092
case watch.Modified, watch.Added:
@@ -83,7 +105,7 @@ func configMapUpdater(watcher watch.Interface, updateCb func(*cvmsgspb.CronScrip
83105
RequestID: id,
84106
Timestamp: time.Now().Unix(),
85107
}
86-
updateCb(cronScriptUpdate)
108+
updatesCh <- cronScriptUpdate
87109
case watch.Deleted:
88110
configmap := event.Object.(*v1.ConfigMap)
89111
id, script, err := configmapToCronScript(configmap)
@@ -100,7 +122,7 @@ func configMapUpdater(watcher watch.Interface, updateCb func(*cvmsgspb.CronScrip
100122
RequestID: id,
101123
Timestamp: time.Now().Unix(),
102124
}
103-
updateCb(cronScriptUpdate)
125+
updatesCh <- cronScriptUpdate
104126
}
105127
}
106128
}

0 commit comments

Comments
 (0)