Skip to content

Commit 2c80e5d

Browse files
m25nwarshawd
andcommitted
use informer pattern in ConfigMapSource
Co-authored-by: Devon Warshaw <[email protected]> Signed-off-by: Matthew Conger-Eldeen <[email protected]>
1 parent d9d8786 commit 2c80e5d

File tree

5 files changed

+157
-176
lines changed

5 files changed

+157
-176
lines changed

src/vizier/services/query_broker/script_runner/BUILD.bazel

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,13 @@ go_library(
4848
"@in_gopkg_yaml_v2//:yaml_v2",
4949
"@io_k8s_api//core/v1:core",
5050
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
51-
"@io_k8s_apimachinery//pkg/watch",
52-
"@io_k8s_client_go//kubernetes/typed/core/v1:core",
51+
"@io_k8s_apimachinery//pkg/labels",
52+
"@io_k8s_client_go//informers",
53+
"@io_k8s_client_go//informers/core/v1:core",
54+
"@io_k8s_client_go//kubernetes",
55+
"@io_k8s_client_go//listers/core/v1:core",
5356
"@io_k8s_client_go//rest",
57+
"@io_k8s_client_go//tools/cache",
5458
"@org_golang_google_grpc//codes",
5559
"@org_golang_google_grpc//metadata",
5660
"@org_golang_google_grpc//status",
@@ -84,11 +88,7 @@ pl_go_test(
8488
"@com_github_stretchr_testify//require",
8589
"@io_k8s_api//core/v1:core",
8690
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
87-
"@io_k8s_apimachinery//pkg/labels",
88-
"@io_k8s_apimachinery//pkg/runtime",
89-
"@io_k8s_apimachinery//pkg/watch",
9091
"@io_k8s_client_go//kubernetes/fake",
91-
"@io_k8s_client_go//testing",
9292
"@org_golang_google_grpc//:go_default_library",
9393
"@org_golang_google_grpc//codes",
9494
"@org_golang_google_grpc//status",

src/vizier/services/query_broker/script_runner/config_map_source.go

Lines changed: 98 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -20,111 +20,151 @@ package scriptrunner
2020

2121
import (
2222
"context"
23+
"reflect"
24+
"sync/atomic"
2325
"time"
2426

2527
log "github.com/sirupsen/logrus"
2628
"gopkg.in/yaml.v2"
27-
v1 "k8s.io/api/core/v1"
29+
corev1 "k8s.io/api/core/v1"
2830
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29-
"k8s.io/apimachinery/pkg/watch"
30-
clientv1 "k8s.io/client-go/kubernetes/typed/core/v1"
31+
"k8s.io/apimachinery/pkg/labels"
32+
"k8s.io/client-go/informers"
33+
informercorev1 "k8s.io/client-go/informers/core/v1"
34+
"k8s.io/client-go/kubernetes"
35+
listercorev1 "k8s.io/client-go/listers/core/v1"
36+
"k8s.io/client-go/tools/cache"
3137

3238
"px.dev/pixie/src/shared/cvmsgspb"
3339
"px.dev/pixie/src/utils"
3440
)
3541

3642
// ConfigMapSource pulls cron scripts from config maps.
3743
type ConfigMapSource struct {
38-
stop func()
39-
client clientv1.ConfigMapInterface
44+
stop func()
45+
informer informercorev1.ConfigMapInformer
4046
}
4147

4248
// NewConfigMapSource constructs a [Source] that extracts cron scripts from config maps with the label "purpose=cron-script".
4349
// Each config map must contain
4450
// - a script.pxl with the pixel script
4551
// - a configs.yaml which will be stored in the Configs field of [cvmsgspb.CronScript]
4652
// - a cron.yaml that contains a "frequency_s" key
47-
func NewConfigMapSource(client clientv1.ConfigMapInterface) *ConfigMapSource {
48-
return &ConfigMapSource{client: client}
53+
func NewConfigMapSource(client kubernetes.Interface, namespace string) *ConfigMapSource {
54+
return &ConfigMapSource{
55+
informer: informers.NewSharedInformerFactoryWithOptions(
56+
client,
57+
12*time.Hour,
58+
informers.WithNamespace(namespace),
59+
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
60+
options.LabelSelector = "purpose=cron-script"
61+
}),
62+
).Core().V1().ConfigMaps(),
63+
}
4964
}
5065

5166
// Start watches for updates to matching configmaps and sends resulting updates on updatesCh.
52-
func (source *ConfigMapSource) Start(baseCtx context.Context, updatesCh chan<- *cvmsgspb.CronScriptUpdate) (map[string]*cvmsgspb.CronScript, error) {
53-
options := metav1.ListOptions{LabelSelector: "purpose=cron-script"}
54-
watcher, err := source.client.Watch(baseCtx, options)
67+
func (source *ConfigMapSource) Start(ctx context.Context, updatesCh chan<- *cvmsgspb.CronScriptUpdate) (map[string]*cvmsgspb.CronScript, error) {
68+
stopCh := make(chan struct{})
69+
isInitialized := &atomic.Bool{}
70+
_, err := source.informer.Informer().AddEventHandler(configMapEventHandlers(isInitialized, updatesCh))
71+
if err != nil {
72+
return nil, err
73+
}
74+
go source.informer.Informer().Run(stopCh)
75+
cache.WaitForCacheSync(ctx.Done(), source.informer.Informer().HasSynced)
76+
initialScripts, err := getInitialScripts(source.informer.Lister())
5577
if err != nil {
78+
close(stopCh)
5679
return nil, err
5780
}
58-
go configMapUpdater(watcher, updatesCh)
59-
configmaps, err := source.client.List(baseCtx, options)
81+
isInitialized.Store(true)
82+
source.stop = func() { close(stopCh) }
83+
return initialScripts, nil
84+
}
85+
86+
func getInitialScripts(lister listercorev1.ConfigMapLister) (map[string]*cvmsgspb.CronScript, error) {
87+
configMaps, err := lister.List(labels.Everything())
6088
if err != nil {
61-
watcher.Stop()
6289
return nil, err
6390
}
6491
scripts := map[string]*cvmsgspb.CronScript{}
65-
for _, configmap := range configmaps.Items {
66-
id, cronScript, err := configmapToCronScript(&configmap)
92+
for _, configMap := range configMaps {
93+
id, cronScript, err := configmapToCronScript(configMap)
6794
if err != nil {
6895
logCronScriptParseError(err)
6996
continue
7097
}
7198
scripts[id] = cronScript
7299
}
73-
source.stop = watcher.Stop
74100
return scripts, nil
75101
}
76102

77-
// Stop stops further updates from being sent.
78-
func (source *ConfigMapSource) Stop() {
79-
source.stop()
80-
}
81-
82-
func configMapUpdater(watcher watch.Interface, updatesCh chan<- *cvmsgspb.CronScriptUpdate) {
83-
for event := range watcher.ResultChan() {
84-
switch event.Type {
85-
case watch.Modified, watch.Added:
86-
configmap := event.Object.(*v1.ConfigMap)
87-
id, script, err := configmapToCronScript(configmap)
88-
if err != nil {
89-
logCronScriptParseError(err)
90-
continue
91-
}
92-
cronScriptUpdate := &cvmsgspb.CronScriptUpdate{
93-
Msg: &cvmsgspb.CronScriptUpdate_UpsertReq{
94-
UpsertReq: &cvmsgspb.RegisterOrUpdateCronScriptRequest{
95-
Script: script,
96-
},
97-
},
98-
RequestID: id,
99-
Timestamp: time.Now().Unix(),
103+
func configMapEventHandlers(isInitialized *atomic.Bool, updatesCh chan<- *cvmsgspb.CronScriptUpdate) cache.ResourceEventHandlerFuncs {
104+
return cache.ResourceEventHandlerFuncs{
105+
AddFunc: func(obj interface{}) {
106+
configmap := obj.(*corev1.ConfigMap)
107+
if !isInitialized.Load() {
108+
return
100109
}
101-
updatesCh <- cronScriptUpdate
102-
case watch.Deleted:
103-
configmap := event.Object.(*v1.ConfigMap)
104-
id, script, err := configmapToCronScript(configmap)
105-
if err != nil {
106-
logCronScriptParseError(err)
107-
continue
110+
updatesCh <- makeUpdate(configmap)
111+
},
112+
UpdateFunc: func(oldObj, newObj interface{}) {
113+
if reflect.DeepEqual(oldObj, newObj) {
114+
return
108115
}
109-
cronScriptUpdate := &cvmsgspb.CronScriptUpdate{
110-
Msg: &cvmsgspb.CronScriptUpdate_DeleteReq{
111-
DeleteReq: &cvmsgspb.DeleteCronScriptRequest{
112-
ScriptID: script.ID,
113-
},
114-
},
115-
RequestID: id,
116-
Timestamp: time.Now().Unix(),
117-
}
118-
updatesCh <- cronScriptUpdate
119-
}
116+
configmap := newObj.(*corev1.ConfigMap)
117+
updatesCh <- makeUpdate(configmap)
118+
},
119+
DeleteFunc: func(obj interface{}) {
120+
configmap := obj.(*corev1.ConfigMap)
121+
updatesCh <- makeDelete(configmap)
122+
},
120123
}
121124
}
122125

126+
func makeDelete(configmap *corev1.ConfigMap) *cvmsgspb.CronScriptUpdate {
127+
_, script, err := configmapToCronScript(configmap)
128+
if err != nil {
129+
logCronScriptParseError(err)
130+
return nil
131+
}
132+
return &cvmsgspb.CronScriptUpdate{
133+
Msg: &cvmsgspb.CronScriptUpdate_DeleteReq{
134+
DeleteReq: &cvmsgspb.DeleteCronScriptRequest{
135+
ScriptID: script.ID,
136+
},
137+
},
138+
Timestamp: time.Now().Unix(),
139+
}
140+
}
141+
142+
func makeUpdate(configmap *corev1.ConfigMap) *cvmsgspb.CronScriptUpdate {
143+
_, script, err := configmapToCronScript(configmap)
144+
if err != nil {
145+
logCronScriptParseError(err)
146+
return nil
147+
}
148+
return &cvmsgspb.CronScriptUpdate{
149+
Msg: &cvmsgspb.CronScriptUpdate_UpsertReq{
150+
UpsertReq: &cvmsgspb.RegisterOrUpdateCronScriptRequest{
151+
Script: script,
152+
},
153+
},
154+
Timestamp: time.Now().Unix(),
155+
}
156+
}
157+
158+
// Stop stops further updates from being sent.
159+
func (source *ConfigMapSource) Stop() {
160+
source.stop()
161+
}
162+
123163
func logCronScriptParseError(err error) {
124164
log.WithError(err).Error("Failed to parse cron.yaml from configmap cron script")
125165
}
126166

127-
func configmapToCronScript(configmap *v1.ConfigMap) (string, *cvmsgspb.CronScript, error) {
167+
func configmapToCronScript(configmap *corev1.ConfigMap) (string, *cvmsgspb.CronScript, error) {
128168
id := string(configmap.UID)
129169
cronScript := &cvmsgspb.CronScript{
130170
ID: utils.ProtoFromUUIDStrOrNil(id),

0 commit comments

Comments
 (0)