Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/channel/event_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth
return
}

/// Here we do the OIDC audience verification
// Here we do the OIDC audience verification
features := feature.FromContext(ctx)
if features.IsOIDCAuthentication() {
r.logger.Debug("OIDC authentication is enabled")
Expand Down
28 changes: 13 additions & 15 deletions pkg/channel/event_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
},
"path based channel reference": {
path: "/new-namespace/new-channel",
host: "test-name.test-namespace.svc." + network.GetClusterDomainName(),
host: host(),
receiverFunc: func(ctx context.Context, r ChannelReference, m event.Event, additionalHeaders nethttp.Header) error {
if r.Namespace != "new-namespace" || r.Name != "new-channel" {
return fmt.Errorf("bad channel reference %v", r)
Expand All @@ -107,9 +107,9 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
"x-requEst-id": {"1234"},
"knatIve-will-pass-through": {"true", "always"},
},
host: "test-name.test-namespace.svc." + network.GetClusterDomainName(),
host: host(),
receiverFunc: func(ctx context.Context, r ChannelReference, e event.Event, additionalHeaders nethttp.Header) error {
if r.Namespace != "test-namespace" || r.Name != "test-name" {
if r.Namespace != "test-namespace" || r.Name != "test-channel" {
return fmt.Errorf("test receiver func -- bad reference: %v", r)
}

Expand Down Expand Up @@ -138,7 +138,7 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
},
"OPTIONS okay": {
method: nethttp.MethodOptions,
host: "test-name.test-namespace.svc." + network.GetClusterDomainName(),
host: host(),
expected: nethttp.StatusOK,
responseValidator: func(res httptest.ResponseRecorder) error {
expectedHeaders := nethttp.Header{
Expand All @@ -163,7 +163,7 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
tc.path = "/"
}
if tc.host == "" {
tc.host = "test-channel.test-namespace.svc." + network.GetClusterDomainName()
tc.host = host()
}

f := tc.receiverFunc
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) {
done := make(chan struct{}, 1)

receiverFunc := func(ctx context.Context, r ChannelReference, e event.Event, additionalHeaders nethttp.Header) error {
if r.Namespace != "test-namespace" || r.Name != "test-name" {
if r.Namespace != "test-namespace" || r.Name != "test-channel" {
return fmt.Errorf("test receiver func -- bad reference: %v", r)
}

Expand All @@ -253,8 +253,6 @@ func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) {

// Default the common things.
method := nethttp.MethodPost
host := "test-name.test-namespace.svc." + network.GetClusterDomainName()

logger, _ := zap.NewDevelopment()

r, err := NewEventReceiver(receiverFunc, logger)
Expand All @@ -276,7 +274,7 @@ func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) {
),
))
require.NoError(t, err)
p.RequestTemplate.Host = host
p.RequestTemplate.Host = host()

c, err := cloudevents.NewClient(p)
require.NoError(t, err)
Expand All @@ -291,8 +289,6 @@ func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) {
}

func TestEventReceiver_WrongRequest(t *testing.T) {
host := "http://test-channel.test-namespace.svc." + network.GetClusterDomainName() + "/"

f := func(_ context.Context, _ ChannelReference, _ event.Event, _ nethttp.Header) error {
return errors.New("test induced receiver function error")
}
Expand All @@ -301,7 +297,7 @@ func TestEventReceiver_WrongRequest(t *testing.T) {
t.Fatalf("Error creating new event receiver. Error:%s", err)
}

req := httptest.NewRequest(nethttp.MethodPost, host, bytes.NewReader([]byte("{}")))
req := httptest.NewRequest(nethttp.MethodPost, "http://"+host()+"/", bytes.NewReader([]byte("{}")))
req.Header.Set("content-type", "application/json")

res := httptest.ResponseRecorder{}
Expand All @@ -313,8 +309,6 @@ func TestEventReceiver_WrongRequest(t *testing.T) {
}

func TestEventReceiver_UnknownHost(t *testing.T) {
host := "http://test-channel.test-namespace.svc." + network.GetClusterDomainName() + "/"

f := func(_ context.Context, _ ChannelReference, _ event.Event, _ nethttp.Header) error {
return errors.New("test induced receiver function error")
}
Expand All @@ -335,7 +329,7 @@ func TestEventReceiver_UnknownHost(t *testing.T) {
}

req := httptest.NewRequest("POST", "http://localhost:8080/", nil)
req.Host = host
req.Host = "http://" + host() + "/"

err = http.WriteRequest(context.TODO(), binding.ToMessage(&event), req)
if err != nil {
Expand All @@ -349,3 +343,7 @@ func TestEventReceiver_UnknownHost(t *testing.T) {
t.Fatal("Unexpected status code. Expected 404. Actual", res.Code)
}
}

func host() string {
return fmt.Sprintf("test-channel%s.test-namespace.svc.%s", K8ServiceNameSuffix, network.GetClusterDomainName())
}
10 changes: 10 additions & 0 deletions pkg/channel/references.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (
"strings"
)

const (
// K8ServiceNameSuffix is added to the k8 service name which is owned by the channel
K8ServiceNameSuffix = "-kn-channel"
)

// ChannelReference references a Channel within the cluster by name and
// namespace.
type ChannelReference struct {
Expand All @@ -38,6 +43,11 @@ func ParseChannelFromHost(host string) (ChannelReference, error) {
if len(chunks) < 2 {
return ChannelReference{}, BadRequestError(fmt.Sprintf("bad host format %q", host))
}

if channelName, found := strings.CutSuffix(chunks[0], K8ServiceNameSuffix); found {
chunks[0] = channelName
}

return ChannelReference{
Name: chunks[0],
Namespace: chunks[1],
Expand Down
22 changes: 15 additions & 7 deletions pkg/channel/references_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,24 @@ func TestParseChannelFromHost(t *testing.T) {
wantErr bool
expectedChannelRef ChannelReference
}{
"host based with channel k8 service suffix": {
host: fmt.Sprintf("%s%s.%s.svc.cluster.local", referencesTestChannelName, K8ServiceNameSuffix, referencesTestNamespace),
wantErr: false,
expectedChannelRef: ChannelReference{
Namespace: referencesTestNamespace,
Name: referencesTestChannelName,
},
},
"host based": {
host: "test-channel.test-namespace.svc.cluster.local",
host: fmt.Sprintf("%s.%s.svc.cluster.local", referencesTestChannelName, referencesTestNamespace),
wantErr: false,
expectedChannelRef: ChannelReference{
Namespace: "test-namespace",
Name: "test-channel",
Namespace: referencesTestNamespace,
Name: referencesTestChannelName,
},
},
"bad host format should return error": {
host: "test-channel",
host: referencesTestChannelName,
wantErr: true,
},
}
Expand Down Expand Up @@ -91,11 +99,11 @@ func TestParseChannelFromPath(t *testing.T) {
expectedChannelRef ChannelReference
}{
"path based": {
path: "/new-namespace/new-channel/",
path: fmt.Sprintf("/%s/%s/", referencesTestNamespace, referencesTestChannelName),
wantErr: false,
expectedChannelRef: ChannelReference{
Namespace: "new-namespace",
Name: "new-channel",
Namespace: referencesTestNamespace,
Name: referencesTestChannelName,
},
},

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"knative.dev/pkg/resolver"

"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/channel"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/eventingtls/eventingtlstesting"
Expand Down Expand Up @@ -70,6 +71,7 @@ const (
dlsName = "test-dls"
testNS = "test-namespace"
imcName = "test-imc"
imcSvcName = imcName + channel.K8ServiceNameSuffix
imageName = "test-image"
maxIdleConns = 2000
maxIdleConnsPerHost = 200
Expand All @@ -83,7 +85,7 @@ const (
var (
channelServiceAddress = duckv1.Addressable{
Name: pointer.String("http"),
URL: apis.HTTP("test-imc-kn-channel.test-namespace.svc.cluster.local"),
URL: apis.HTTP(fmt.Sprintf("%s.test-namespace.svc.cluster.local", imcSvcName)),
}

channelAudience = fmt.Sprintf("messaging.knative.dev/inmemorychannel/%s/%s", testNS, imcName)
Expand Down Expand Up @@ -426,11 +428,11 @@ func TestAllCases(t *testing.T) {
WithInMemoryChannelDeploymentReady(),
WithInMemoryChannelServiceReady(),
WithInMemoryChannelEndpointsReady(),
WithInMemoryChannelChannelServiceNotReady("ChannelServiceFailed", `Channel Service failed: inmemorychannel: test-namespace/test-imc does not own Service: "test-imc-kn-channel"`),
WithInMemoryChannelChannelServiceNotReady("ChannelServiceFailed", fmt.Sprintf(`Channel Service failed: inmemorychannel: test-namespace/test-imc does not own Service: "%s"`, imcSvcName)),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeWarning, "InternalError", `inmemorychannel: test-namespace/test-imc does not own Service: "test-imc-kn-channel"`),
Eventf(corev1.EventTypeWarning, "InternalError", fmt.Sprintf(`inmemorychannel: test-namespace/test-imc does not own Service: "%s"`, imcSvcName)),
},
}, {
Name: "Works, channel exists with subscribers",
Expand Down Expand Up @@ -986,7 +988,7 @@ func makeChannelService(imc *v1.InMemoryChannel) *corev1.Service {
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNS,
Name: fmt.Sprintf("%s-kn-channel", imcName),
Name: imcSvcName,
Labels: map[string]string{
resources.MessagingRoleLabel: resources.MessagingRole,
},
Expand Down Expand Up @@ -1016,7 +1018,7 @@ func makeChannelServiceNotOwnedByUs(imc *v1.InMemoryChannel) *corev1.Service {
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNS,
Name: fmt.Sprintf("%s-kn-channel", imcName),
Name: imcSvcName,
Labels: map[string]string{
resources.MessagingRoleLabel: resources.MessagingRole,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/channel"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/network"
)
Expand All @@ -35,7 +36,7 @@ const (
type K8sServiceOption func(*corev1.Service) error

func CreateChannelServiceName(name string) string {
return kmeta.ChildName(name, "-kn-channel")
return kmeta.ChildName(name, channel.K8ServiceNameSuffix)
}

// ExternalService is a functional option for CreateK8sService to create a K8s service of type ExternalName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@ package resources

import (
"errors"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/channel"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/network"
)

const (
serviceName = "my-test-service"
imcName = "my-test-imc"
imcSvcName = imcName + channel.K8ServiceNameSuffix
testNS = "my-test-ns"
dispatcherNS = "dispatcher-namespace"
dispatcherName = "dispatcher-name"
Expand All @@ -44,7 +45,7 @@ func TestCreateExternalServiceAddress(t *testing.T) {
}

func TestCreateChannelServiceAddress(t *testing.T) {
if want, got := "my-test-imc-kn-channel", CreateChannelServiceName(imcName); want != got {
if want, got := imcSvcName, CreateChannelServiceName(imcName); want != got {
t.Errorf("Want: %q got %q", want, got)
}
}
Expand All @@ -62,7 +63,7 @@ func TestNewK8sService(t *testing.T) {
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-kn-channel", imcName),
Name: CreateChannelServiceName(imcName),
Namespace: testNS,
Labels: map[string]string{
MessagingRoleLabel: MessagingRole,
Expand Down Expand Up @@ -105,7 +106,7 @@ func TestNewK8sServiceWithExternal(t *testing.T) {
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-kn-channel", imcName),
Name: CreateChannelServiceName(imcName),
Namespace: testNS,
Labels: map[string]string{
MessagingRoleLabel: MessagingRole,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dispatcher

import (
"context"
"fmt"
"net/http"
"reflect"
"testing"
Expand Down Expand Up @@ -48,6 +49,7 @@ import (
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/channel/fanout"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
"knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/inmemorychannel"
Expand All @@ -68,7 +70,7 @@ const (

var (
channelServiceAddress = duckv1.Addressable{
URL: apis.HTTP("test-imc-kn-channel.test-namespace.svc.cluster.local"),
URL: apis.HTTP(fmt.Sprintf("%s%s.%s.svc.cluster.local", imcName, channel.K8ServiceNameSuffix, testNS)),
}

linear = eventingduckv1.BackoffPolicyLinear
Expand Down
11 changes: 6 additions & 5 deletions test/conformance/helpers/channel_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/openzipkin/zipkin-go/model"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

ch "knative.dev/eventing/pkg/channel"
tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
Expand Down Expand Up @@ -143,7 +144,7 @@ func setupChannelTracingWithReply(
Span: tracinghelper.MatchHTTPSpanNoReply(
model.Server,
tracinghelper.WithHTTPHostAndPath(
fmt.Sprintf("%s-kn-channel.%s.svc", channelName, client.Namespace),
fmt.Sprintf("%s%s.%s.svc", channelName, ch.K8ServiceNameSuffix, client.Namespace),
"/",
),
),
Expand Down Expand Up @@ -177,14 +178,14 @@ func setupChannelTracingWithReply(
},
{
// 6. Channel Dispatcher span
Span: channelSpan(eventID, fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace), ""),
Span: channelSpan(eventID, fmt.Sprintf("%s%s.%s.svc", replyChannelName, ch.K8ServiceNameSuffix, client.Namespace), ""),
Children: []tracinghelper.TestSpanTree{
{
// 7. Channel sends reply from Mutator Pod to the reply Channel.
Span: tracinghelper.MatchHTTPSpanNoReply(
model.Client,
tracinghelper.WithHTTPURL(
fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace),
fmt.Sprintf("%s%s.%s.svc", replyChannelName, ch.K8ServiceNameSuffix, client.Namespace),
"",
),
),
Expand All @@ -194,7 +195,7 @@ func setupChannelTracingWithReply(
Span: tracinghelper.MatchHTTPSpanNoReply(
model.Server,
tracinghelper.WithHTTPHostAndPath(
fmt.Sprintf("%s-kn-channel.%s.svc", replyChannelName, client.Namespace),
fmt.Sprintf("%s%s.%s.svc", replyChannelName, ch.K8ServiceNameSuffix, client.Namespace),
"/",
),
),
Expand Down Expand Up @@ -245,7 +246,7 @@ func setupChannelTracingWithReply(
Span: tracinghelper.MatchHTTPSpanNoReply(
model.Client,
tracinghelper.WithHTTPURL(
fmt.Sprintf("%s-kn-channel.%s.svc", channelName, client.Namespace),
fmt.Sprintf("%s%s.%s.svc", channelName, ch.K8ServiceNameSuffix, client.Namespace),
"",
),
tracinghelper.WithLocalEndpointServiceName("sender"),
Expand Down
Loading