Skip to content

Commit ed9dbbb

Browse files
committed
[operator] add kube types and factory logic
1 parent 34c4f97 commit ed9dbbb

File tree

2 files changed

+124
-0
lines changed

2 files changed

+124
-0
lines changed

pkg/kube/informerfactory/factory.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package informerfactory
22

33
import (
4+
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
45
"k8s.io/apimachinery/pkg/runtime/schema"
6+
57
"k8s.io/client-go/tools/cache"
8+
"sync"
69
)
710

811
type NewInformerFunc func() cache.SharedIndexInformer
@@ -19,6 +22,110 @@ type InformerFactory interface {
1922
Shutdown()
2023
}
2124

25+
type informerKey struct {
26+
gvr schema.GroupVersionResource
27+
labelSelector string
28+
fieldSelector string
29+
informerType kubetypes.InformerType
30+
namespace string
31+
}
32+
33+
type informerFactory struct {
34+
lock sync.Mutex
35+
informers map[informerKey]builtInformer
36+
startedInformers sets.Set[informerKey]
37+
wg sync.WaitGroup
38+
shuttingDown bool
39+
}
40+
41+
type builtInformer struct {
42+
informer cache.SharedIndexInformer
43+
objectTransform func(obj any) (any, error)
44+
}
45+
46+
func NewSharedInformerFactory() InformerFactory {
47+
return &informerFactory{
48+
informers: map[informerKey]builtInformer{},
49+
startedInformers: sets.New[informerKey](),
50+
}
51+
}
52+
2253
func (s StartableInformer) Start(stopCh <-chan struct{}) {
2354
s.start(stopCh)
2455
}
56+
57+
func (f *informerFactory) Start(stopCh <-chan struct{}) {
58+
f.lock.Lock()
59+
defer f.lock.Unlock()
60+
61+
if f.shuttingDown {
62+
return
63+
}
64+
65+
for informerType, informer := range f.informers {
66+
if !f.startedInformers.Contains(informerType) {
67+
informer := informer
68+
f.wg.Add(1)
69+
go func() {
70+
defer f.wg.Done()
71+
informer.informer.Run(stopCh)
72+
}()
73+
f.startedInformers.Insert(informerType)
74+
}
75+
}
76+
}
77+
78+
func (f *informerFactory) WaitForCacheSync(stopCh <-chan struct{}) bool {
79+
informers := func() []cache.SharedIndexInformer {
80+
f.lock.Lock()
81+
defer f.lock.Unlock()
82+
informers := make([]cache.SharedIndexInformer, 0, len(f.informers))
83+
for informerKey, informer := range f.informers {
84+
if f.startedInformers.Contains(informerKey) {
85+
informers = append(informers, informer.informer)
86+
}
87+
}
88+
return informers
89+
}()
90+
91+
for _, informer := range informers {
92+
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
93+
return false
94+
}
95+
}
96+
return true
97+
}
98+
99+
func (f *informerFactory) Shutdown() {
100+
defer f.wg.Wait()
101+
102+
f.lock.Lock()
103+
defer f.lock.Unlock()
104+
f.shuttingDown = true
105+
}
106+
107+
func (f *informerFactory) InformerFor(resource schema.GroupVersionResource, opts kubetypes.InformerOptions, newFunc NewInformerFunc) StartableInformer {
108+
f.lock.Lock()
109+
defer f.lock.Unlock()
110+
111+
key := informerKey{
112+
gvr: resource,
113+
labelSelector: opts.LabelSelector,
114+
fieldSelector: opts.FieldSelector,
115+
informerType: opts.InformerType,
116+
namespace: opts.Namespace,
117+
}
118+
inf, exists := f.informers[key]
119+
if exists {
120+
checkInformerOverlap(inf, resource, opts)
121+
return f.makeStartableInformer(inf.informer, key)
122+
}
123+
124+
informer := newFunc()
125+
f.informers[key] = builtInformer{
126+
informer: informer,
127+
objectTransform: opts.ObjectTransform,
128+
}
129+
130+
return f.makeStartableInformer(informer, key)
131+
}

pkg/kube/kubetypes/types.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package kubetypes
2+
3+
type InformerOptions struct {
4+
LabelSelector string
5+
FieldSelector string
6+
Namespace string
7+
ObjectTransform func(obj any) (any, error)
8+
InformerType InformerType
9+
}
10+
11+
type InformerType int
12+
13+
const (
14+
StandardInformer InformerType = iota
15+
DynamicInformer
16+
MetadataInformer
17+
)

0 commit comments

Comments
 (0)