Skip to content

Commit ee844df

Browse files
committed
Adds tests, role binding. Dispatcher sets up TLS config
1 parent 6c269a6 commit ee844df

12 files changed

Lines changed: 194 additions & 44 deletions

File tree

config/channels/in-memory-channel/200-imc-dispatcher-serviceaccount.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,17 @@ roleRef:
3636
kind: ClusterRole
3737
name: imc-dispatcher
3838
apiGroup: rbac.authorization.k8s.io
39+
---
40+
apiVersion: rbac.authorization.k8s.io/v1
41+
kind: RoleBinding
42+
metadata:
43+
name: tls-role-binding
44+
namespace: knative-eventing
45+
subjects:
46+
- kind: ServiceAccount
47+
name: imc-dispatcher
48+
apiGroup: ""
49+
roleRef:
50+
kind: Role
51+
name: tls-role
52+
apiGroup: rbac.authorization.k8s.io

config/tls/role.yaml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Copyright 2023 The Knative Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# This is a role to read Secrets within the system namespace for eventing.
16+
# Role should be used by any component setting up a TLS server
17+
18+
apiVersion: rbac.authorization.k8s.io/v1
19+
kind: Role
20+
metadata:
21+
name: tls-role
22+
namespace: knative-eventing
23+
rules:
24+
- apiGroups:
25+
- ""
26+
resources:
27+
- secrets
28+
verbs:
29+
- get
30+
- list
31+
- watch

pkg/channel/fanout/fanout_message_handler_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,12 @@ func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.Unb
325325
t.Fatal(err)
326326
}
327327

328+
calledChan := make(chan bool, 1)
329+
recvOptionFunc := func(*channel.MessageReceiver) error {
330+
calledChan <- true
331+
return nil
332+
}
333+
328334
h, err := NewFanoutMessageHandler(
329335
logger,
330336
channel.NewMessageDispatcher(logger),
@@ -333,7 +339,9 @@ func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.Unb
333339
AsyncHandler: async,
334340
},
335341
reporter,
342+
recvOptionFunc,
336343
)
344+
<-calledChan
337345
if err != nil {
338346
t.Fatal("NewHandler failed =", err)
339347
}

pkg/channel/multichannelfanout/multi_channel_fanout_message_handler.go

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,6 @@ type MultiChannelMessageHandler interface {
4545
CountChannelHandlers() int
4646
}
4747

48-
// makeChannelKeyFromConfig creates the channel key for a given channelConfig. It is a helper around
49-
// MakeChannelKey.
50-
func makeChannelKeyFromConfig(config ChannelConfig) string {
51-
return config.HostName
52-
}
53-
5448
// Handler is an http.Handler that introspects the incoming request to determine what Channel it is
5549
// on, and then delegates handling of that request to the single fanout.FanoutMessageHandler corresponding to
5650
// that Channel.
@@ -70,21 +64,26 @@ func NewMessageHandler(_ context.Context, logger *zap.Logger, messageDispatcher
7064

7165
// NewMessageHandlerWithConfig creates a new Handler with the specified configuration. This is really meant for tests
7266
// where you want to apply a fully specified configuration for tests. Reconciler operates on single channel at a time.
73-
func NewMessageHandlerWithConfig(_ context.Context, logger *zap.Logger, messageDispatcher channel.MessageDispatcher, conf Config, reporter channel.StatsReporter) (*MessageHandler, error) {
67+
func NewMessageHandlerWithConfig(_ context.Context, logger *zap.Logger, messageDispatcher channel.MessageDispatcher, conf Config, reporter channel.StatsReporter, recvOptions ...channel.MessageReceiverOptions) (*MessageHandler, error) {
7468
handlers := make(map[string]fanout.MessageHandler, len(conf.ChannelConfigs))
7569

7670
for _, cc := range conf.ChannelConfigs {
77-
key := makeChannelKeyFromConfig(cc)
78-
handler, err := fanout.NewFanoutMessageHandler(logger, messageDispatcher, cc.FanoutConfig, reporter)
79-
if err != nil {
80-
logger.Error("Failed creating new fanout handler.", zap.Error(err))
81-
return nil, err
82-
}
83-
if _, present := handlers[key]; present {
84-
logger.Error("Duplicate channel key", zap.String("channelKey", key))
85-
return nil, fmt.Errorf("duplicate channel key: %v", key)
71+
keys := []string{cc.HostName, cc.Path}
72+
for _, key := range keys {
73+
if key == "" {
74+
continue
75+
}
76+
handler, err := fanout.NewFanoutMessageHandler(logger, messageDispatcher, cc.FanoutConfig, reporter, recvOptions...)
77+
if err != nil {
78+
logger.Error("Failed creating new fanout handler.", zap.Error(err))
79+
return nil, err
80+
}
81+
if _, present := handlers[key]; present {
82+
logger.Error("Duplicate channel key", zap.String("channelKey", key))
83+
return nil, fmt.Errorf("duplicate channel key: %v", key)
84+
}
85+
handlers[key] = handler
8686
}
87-
handlers[key] = handler
8887
}
8988
return &MessageHandler{
9089
logger: logger,

pkg/channel/multichannelfanout/multi_channel_fanout_message_handler_test.go

Lines changed: 78 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,19 +122,63 @@ func TestServeHTTPMessageHandler(t *testing.T) {
122122
config Config
123123
eventID *string
124124
respStatusCode int
125-
key string
125+
hostKey string
126+
pathKey string
127+
recvOptions []channel.MessageReceiverOptions
126128
expectedStatusCode int
127129
}{
128-
"non-existent channel": {
130+
"non-existent channel host based": {
129131
config: Config{},
130-
key: "default.does-not-exist",
132+
hostKey: "default.does-not-exist",
133+
expectedStatusCode: http.StatusInternalServerError,
134+
},
135+
"non-existent channel path based": {
136+
hostKey: "first-channel.default",
137+
config: Config{
138+
ChannelConfigs: []ChannelConfig{
139+
{
140+
Namespace: "ns",
141+
Name: "name",
142+
HostName: "first-channel.default",
143+
FanoutConfig: fanout.Config{
144+
Subscriptions: []fanout.Subscription{
145+
{
146+
Reply: replaceDomain,
147+
},
148+
},
149+
},
150+
},
151+
},
152+
},
153+
pathKey: "some-namespace/wrong-channel",
131154
expectedStatusCode: http.StatusInternalServerError,
132155
},
133156
"bad host": {
134157
config: Config{},
135-
key: "no-dot",
158+
hostKey: "no-dot",
136159
expectedStatusCode: http.StatusInternalServerError,
137160
},
161+
"malformed path": {
162+
hostKey: "first-channel.default",
163+
config: Config{
164+
ChannelConfigs: []ChannelConfig{
165+
{
166+
Namespace: "ns",
167+
Name: "name",
168+
HostName: "first-channel.default",
169+
FanoutConfig: fanout.Config{
170+
Subscriptions: []fanout.Subscription{
171+
{
172+
Reply: replaceDomain,
173+
},
174+
},
175+
},
176+
},
177+
},
178+
},
179+
pathKey: "missing-forward-slash",
180+
expectedStatusCode: http.StatusBadRequest,
181+
},
138182
"pass through failure": {
139183
config: Config{
140184
ChannelConfigs: []ChannelConfig{
@@ -153,7 +197,7 @@ func TestServeHTTPMessageHandler(t *testing.T) {
153197
},
154198
},
155199
respStatusCode: http.StatusInternalServerError,
156-
key: "first-channel.default",
200+
hostKey: "first-channel.default",
157201
expectedStatusCode: http.StatusInternalServerError,
158202
},
159203
"invalid event": {
@@ -188,7 +232,7 @@ func TestServeHTTPMessageHandler(t *testing.T) {
188232
},
189233
eventID: ptr.String(""), // invalid id
190234
respStatusCode: http.StatusOK,
191-
key: "second-channel.default",
235+
hostKey: "second-channel.default",
192236
expectedStatusCode: http.StatusBadRequest,
193237
},
194238
"choose channel": {
@@ -222,8 +266,33 @@ func TestServeHTTPMessageHandler(t *testing.T) {
222266
},
223267
},
224268
respStatusCode: http.StatusOK,
225-
key: "second-channel.default",
269+
hostKey: "second-channel.default",
270+
expectedStatusCode: http.StatusAccepted,
271+
},
272+
"path based": {
273+
config: Config{
274+
ChannelConfigs: []ChannelConfig{
275+
{
276+
277+
Namespace: "ns",
278+
Name: "name",
279+
HostName: "first-channel.default",
280+
Path: "default/first-channel",
281+
FanoutConfig: fanout.Config{
282+
Subscriptions: []fanout.Subscription{
283+
{
284+
Subscriber: replaceDomain,
285+
},
286+
},
287+
},
288+
},
289+
},
290+
},
291+
respStatusCode: http.StatusOK,
292+
hostKey: "host.should.be.ignored",
293+
pathKey: "default/first-channel",
226294
expectedStatusCode: http.StatusAccepted,
295+
recvOptions: []channel.MessageReceiverOptions{channel.ResolveMessageChannelFromPath(channel.ParseChannelFromPath)},
227296
},
228297
}
229298
for n, tc := range testCases {
@@ -236,7 +305,7 @@ func TestServeHTTPMessageHandler(t *testing.T) {
236305

237306
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
238307
reporter := channel.NewStatsReporter("testcontainer", "testpod")
239-
h, err := NewMessageHandlerWithConfig(context.TODO(), logger, channel.NewMessageDispatcher(logger), tc.config, reporter)
308+
h, err := NewMessageHandlerWithConfig(context.TODO(), logger, channel.NewMessageDispatcher(logger), tc.config, reporter, tc.recvOptions...)
240309
if err != nil {
241310
t.Fatalf("Unexpected NewHandler error: '%v'", err)
242311
}
@@ -255,7 +324,7 @@ func TestServeHTTPMessageHandler(t *testing.T) {
255324
event.SetSource("testsource")
256325
event.SetData(cloudevents.ApplicationJSON, "{}")
257326

258-
req := httptest.NewRequest(http.MethodPost, "http://"+tc.key+"/", nil)
327+
req := httptest.NewRequest(http.MethodPost, "http://"+tc.hostKey+"/"+tc.pathKey, nil)
259328
err = bindingshttp.WriteRequest(ctx, binding.ToMessage(&event), req)
260329
if err != nil {
261330
t.Fatal(err)

pkg/eventingtls/eventingtlstesting/eventingtlstesting.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,7 @@ func StartServer(ctx context.Context, t *testing.T, port int, requests chan<- *n
4949
Name: "tls-secret",
5050
}
5151

52-
_ = secretinformer.Get(ctx).Informer().GetStore().Add(&corev1.Secret{
53-
ObjectMeta: metav1.ObjectMeta{
54-
Namespace: secret.Namespace,
55-
Name: secret.Name,
56-
},
57-
Data: map[string][]byte{
58-
"tls.key": Key,
59-
"tls.crt": Crt,
60-
},
61-
Type: corev1.SecretTypeTLS,
62-
})
52+
_ = secretinformer.Get(ctx).Informer().GetStore().Add(GetTLSSecret(secret.Namespace, secret.Name))
6353

6454
serverTLSConfig := eventingtls.NewDefaultServerConfig()
6555
serverTLSConfig.GetCertificate = eventingtls.GetCertificateFromSecret(ctx, secretinformer.Get(ctx), kubeclient.Get(ctx), secret)
@@ -187,3 +177,17 @@ O2dgzikq8iSy1BlRsVw=
187177
-----END CERTIFICATE-----
188178
`)
189179
}
180+
181+
func GetTLSSecret(namespace, name string) *corev1.Secret {
182+
return &corev1.Secret{
183+
ObjectMeta: metav1.ObjectMeta{
184+
Namespace: namespace,
185+
Name: name,
186+
},
187+
Data: map[string][]byte{
188+
"tls.key": Key,
189+
"tls.crt": Crt,
190+
},
191+
Type: corev1.SecretTypeTLS,
192+
}
193+
}

pkg/reconciler/inmemorychannel/dispatcher/controller.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import (
2121
"time"
2222

2323
"knative.dev/pkg/injection"
24+
"knative.dev/pkg/system"
2425

26+
"k8s.io/apimachinery/pkg/types"
2527
"k8s.io/client-go/tools/cache"
2628

2729
"github.com/google/uuid"
@@ -50,6 +52,8 @@ import (
5052
inmemorychannelinformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/inmemorychannel"
5153
inmemorychannelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/inmemorychannel"
5254
"knative.dev/eventing/pkg/inmemorychannel"
55+
kubeclient "knative.dev/pkg/client/injection/kube/client"
56+
secretinformer "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret"
5357
)
5458

5559
const (
@@ -58,6 +62,7 @@ const (
5862
httpPort = 8080
5963
httpsPort = 8443
6064
finalizerName = "imc-dispatcher"
65+
tlsSecretName = "imc-dispatcher-server-tls"
6166
)
6267

6368
type envConfig struct {
@@ -151,15 +156,24 @@ func NewController(
151156
httpDispatcher := inmemorychannel.NewMessageDispatcher(httpArgs)
152157
httpReceiver := httpDispatcher.GetReceiver()
153158

159+
secret := types.NamespacedName{
160+
Namespace: system.Namespace(),
161+
Name: tlsSecretName,
162+
}
163+
serverTLSConfig := eventingtls.NewDefaultServerConfig()
164+
serverTLSConfig.GetCertificate = eventingtls.GetCertificateFromSecret(ctx, secretinformer.Get(ctx), kubeclient.Get(ctx), secret)
165+
tlsConfig, err := eventingtls.GetTLSServerConfig(serverTLSConfig)
166+
if err != nil {
167+
logger.Panicf("unable to get tls config: %s", err)
168+
}
154169
httpsArgs := &inmemorychannel.InMemoryMessageDispatcherArgs{
155170
Port: httpsPort,
156171
ReadTimeout: readTimeout,
157172
WriteTimeout: writeTimeout,
158173
Handler: sh,
159174
Logger: logger.Desugar(),
160175

161-
// TODO: add tls config
162-
HTTPMessageReceiverOptions: []kncloudevents.HTTPMessageReceiverOption{},
176+
HTTPMessageReceiverOptions: []kncloudevents.HTTPMessageReceiverOption{kncloudevents.WithTLSConfig(tlsConfig)},
163177
}
164178
httpsDispatcher := inmemorychannel.NewMessageDispatcher(httpsArgs)
165179
httpsReceiver := httpsDispatcher.GetReceiver()

0 commit comments

Comments
 (0)