Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion test/rekt/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/knative"
"knative.dev/reconciler-test/pkg/tracing"

"knative.dev/eventing/test/rekt/features/broker"
b "knative.dev/eventing/test/rekt/resources/broker"
Expand All @@ -42,10 +43,11 @@ func TestBrokerWithManyTriggers(t *testing.T) {
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
tracing.WithGatherer(t),
environment.WithPollTimings(5*time.Second, 4*time.Minute),
)

env.Test(ctx, t, broker.BrokerWithManyTriggers())
env.TestSet(ctx, t, broker.ManyTriggers())
}

// TestBrokerWorkFlowWithTransformation test broker transformation respectively follow
Expand Down
101 changes: 49 additions & 52 deletions test/rekt/features/broker/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ import (
"knative.dev/reconciler-test/pkg/resources/service"
)

func BrokerWithManyTriggers() *feature.Feature {
f := feature.NewFeatureNamed("broker With Many Triggers")
func ManyTriggers() *feature.FeatureSet {
fs := &feature.FeatureSet{Name: "Broker with many triggers"}

// Construct different type, source and extensions of events
any := eventingv1.TriggerAnyFilter
Expand All @@ -66,20 +66,20 @@ func BrokerWithManyTriggers() *feature.Feature {
nonMatchingExtensionValue := "nonmatchingextval"

eventFilters1 := make(map[string]eventTestCase)
eventFilters1["dumper-1"] = neweventTestCase(any, any)
eventFilters1["dumper-12"] = neweventTestCase(eventType1, any)
eventFilters1["dumper-123"] = neweventTestCase(any, eventSource1)
eventFilters1["dumper-1234"] = neweventTestCase(eventType1, eventSource1)
eventFilters1["dumper-1"] = newEventTestCase(any, any)
eventFilters1["dumper-12"] = newEventTestCase(eventType1, any)
eventFilters1["dumper-123"] = newEventTestCase(any, eventSource1)
eventFilters1["dumper-1234"] = newEventTestCase(eventType1, eventSource1)

eventFilters2 := make(map[string]eventTestCase)
eventFilters2["dumper-12345"] = neweventTestCaseWithExtensions(any, any, map[string]interface{}{extensionName1: extensionValue1})
eventFilters2["dumper-123456"] = neweventTestCaseWithExtensions(any, any, map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2})
eventFilters2["dumper-1234567"] = neweventTestCaseWithExtensions(any, any, map[string]interface{}{extensionName2: extensionValue2})
eventFilters2["dumper-654321"] = neweventTestCaseWithExtensions(eventType1, any, map[string]interface{}{extensionName1: extensionValue1})
eventFilters2["dumper-54321"] = neweventTestCaseWithExtensions(any, any, map[string]interface{}{extensionName1: any})
eventFilters2["dumper-4321"] = neweventTestCaseWithExtensions(any, eventSource1, map[string]interface{}{extensionName1: extensionValue1})
eventFilters2["dumper-321"] = neweventTestCaseWithExtensions(any, eventSource1, map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2})
eventFilters2["dumper-21"] = neweventTestCaseWithExtensions(any, eventSource2, map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue1})
eventFilters2["dumper-12345"] = newEventTestCaseWithExtensions(any, any, map[string]interface{}{extensionName1: extensionValue1})
eventFilters2["dumper-123456"] = newEventTestCaseWithExtensions(any, any, map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2})
eventFilters2["dumper-1234567"] = newEventTestCaseWithExtensions(any, any, map[string]interface{}{extensionName2: extensionValue2})
eventFilters2["dumper-654321"] = newEventTestCaseWithExtensions(eventType1, any, map[string]interface{}{extensionName1: extensionValue1})
eventFilters2["dumper-54321"] = newEventTestCaseWithExtensions(any, any, map[string]interface{}{extensionName1: any})
eventFilters2["dumper-4321"] = newEventTestCaseWithExtensions(any, eventSource1, map[string]interface{}{extensionName1: extensionValue1})
eventFilters2["dumper-321"] = newEventTestCaseWithExtensions(any, eventSource1, map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2})
eventFilters2["dumper-21"] = newEventTestCaseWithExtensions(any, eventSource2, map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue1})

tests := []struct {
name string
Expand Down Expand Up @@ -115,17 +115,16 @@ func BrokerWithManyTriggers() *feature.Feature {
},
}

// Map to save the expected matchers per dumper so that we can verify the delivery.
// matcherBySink is to verify the sink and corresponding matcher
matcherBySink := make(map[string][]eventshub.EventInfoMatcher)
for _, testcase := range tests {

// Create the broker
brokerName := feature.MakeRandomK8sName("broker")
f.Setup("install broker", broker.Install(brokerName, broker.WithEnvConfig()...))
f.Setup("broker is ready", broker.IsReady(brokerName))
f.Setup("broker is addressable", broker.IsAddressable(brokerName))
f := feature.NewFeatureNamed(testcase.name)

// Create the broker
brokerName := feature.MakeRandomK8sName("broker")
f.Setup("install broker", broker.Install(brokerName, broker.WithEnvConfig()...))
f.Setup("broker is ready", broker.IsReady(brokerName))
f.Setup("broker is addressable", broker.IsAddressable(brokerName))

for _, testcase := range tests {
for sink, eventFilter := range testcase.eventFilters {
f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver))
filter := eventingv1.TriggerFilterAttributes{
Expand All @@ -141,20 +140,19 @@ func BrokerWithManyTriggers() *feature.Feature {
}

// Install the trigger
via := feature.MakeRandomK8sName("via")
f.Setup("install trigger", trigger.Install(via, brokerName, cfg...))
f.Setup("trigger goes ready", trigger.IsReady(via))
f.Setup("install trigger", trigger.Install(sink, brokerName, cfg...))
f.Setup("trigger goes ready", trigger.IsReady(sink))
}

for _, event := range testcase.eventsToSend {
eventToSend := cloudevents.NewEvent()
eventToSend.SetID(uuid.New().String())
eventToSend.SetID(event.toID())
eventToSend.SetType(event.Type)
eventToSend.SetSource(event.Source)
for k, v := range event.Extensions {
eventToSend.SetExtension(k, v)
}
data := fmt.Sprintf(`{"msg":"%s"}`, uuid.New())
data := fmt.Sprintf(`{"msg":"%s"}`, eventToSend.ID())
eventToSend.SetData(cloudevents.ApplicationJSON, []byte(data))

source := feature.MakeRandomK8sName("source")
Expand All @@ -164,41 +162,32 @@ func BrokerWithManyTriggers() *feature.Feature {
eventshub.InputEvent(eventToSend),
))

// Sent event matcher
sentEventMatcher := test.AllOf(
test.HasId(eventToSend.ID()),
event.toEventMatcher(),
f.Assert("source sent event", eventasssert.OnStore(source).
MatchSentEvent(test.HasId(eventToSend.ID())).
AtLeast(1),
)

// Check on every dumper whether we should expect this event or not
for sink, eventFilter := range testcase.eventFilters {
sink := sink // capture variable
matcher := event.toEventMatcher() // capture variable

// Check on every dumper whether we should expect this event or not
if eventFilter.toEventMatcher()(eventToSend) == nil {
// This filter should match this event
matcherBySink[sink] = append(
matcherBySink[sink],
eventasssert.MatchEvent(sentEventMatcher),
)
}
}
}

// Let's check that all expected matchers are fulfilled
for sink, matchers := range matcherBySink {
for _, matcher := range matchers {
// One match per event is enough
f.Stable("test message without explicit prefer header should have the header").
Must("delivers events", func(ctx context.Context, t feature.T) {
f.Assert(fmt.Sprintf("%s receive event %s", sink, eventToSend.ID()), func(ctx context.Context, t feature.T) {
eventasssert.OnStore(sink).
Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())).
Match(matcher).
MatchReceivedEvent(test.HasId(eventToSend.ID())).
MatchReceivedEvent(matcher).
AtLeast(1)(ctx, t)
})
}
}
}

fs.Features = append(fs.Features, f)
}

return f
return fs
}

func BrokerWorkFlowWithTransformation() *feature.FeatureSet {
Expand Down Expand Up @@ -1009,11 +998,11 @@ type eventTestCase struct {
Extensions map[string]interface{}
}

func neweventTestCase(tp, source string) eventTestCase {
func newEventTestCase(tp, source string) eventTestCase {
return eventTestCase{Type: tp, Source: source}
}

func neweventTestCaseWithExtensions(tp string, source string, extensions map[string]interface{}) eventTestCase {
func newEventTestCaseWithExtensions(tp string, source string, extensions map[string]interface{}) eventTestCase {
return eventTestCase{Type: tp, Source: source, Extensions: extensions}
}

Expand Down Expand Up @@ -1042,3 +1031,11 @@ func (tc eventTestCase) toEventMatcher() test.EventMatcher {

return test.AllOf(matchers...)
}

func (tc eventTestCase) toID() string {
id := fmt.Sprintf("%s-%s", tc.Type, tc.Source)
for k, v := range tc.Extensions {
id += fmt.Sprintf("-%s_%s", k, v)
}
return id
}
4 changes: 3 additions & 1 deletion test/rekt/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (

// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

_ "knative.dev/pkg/system/testing"
"knative.dev/reconciler-test/pkg/tracing"

"knative.dev/reconciler-test/pkg/environment"
)
Expand All @@ -38,6 +38,8 @@ var global environment.GlobalEnvironment

// TestMain is the first entry point for `go test`.
func TestMain(m *testing.M) {
defer tracing.Cleanup()

global = environment.NewStandardGlobalEnvironment()

// Run the tests.
Expand Down
50 changes: 50 additions & 0 deletions vendor/knative.dev/reconciler-test/pkg/tracing/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2022 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tracing

import (
"context"
"log"

"go.uber.org/zap"
"knative.dev/pkg/logging"
"knative.dev/pkg/test/zipkin"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/knative"
"knative.dev/reconciler-test/pkg/milestone"
)

// WithGatherer registers the trace gatherer with the environment and will be
// receiving milestone events.
func WithGatherer(t feature.T) environment.EnvOpts {
return func(ctx context.Context, env environment.Environment) (context.Context, error) {
gatherer, err := milestone.NewTracingGatherer(ctx, env.Namespace(),
knative.KnativeNamespaceFromContext(ctx), t)
if err == nil {
ctx, err = environment.WithEmitter(gatherer)(ctx, env)
}
if err != nil {
logging.FromContext(ctx).Error("failed to create tracing gatherer", zap.Error(err))
}
return ctx, nil
}
}

func Cleanup() {
zipkin.CleanupZipkinTracingSetup(log.Printf)
}
1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,7 @@ knative.dev/reconciler-test/pkg/resources/pod
knative.dev/reconciler-test/pkg/resources/secret
knative.dev/reconciler-test/pkg/resources/service
knative.dev/reconciler-test/pkg/state
knative.dev/reconciler-test/pkg/tracing
# sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2
## explicit; go 1.18
sigs.k8s.io/json
Expand Down