Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ require (
k8s.io/apiserver v0.32.1
k8s.io/client-go v0.32.2
k8s.io/utils v0.0.0-20241210054802-24370beab758
knative.dev/hack v0.0.0-20250331013814-c577ed9f7775
knative.dev/hack v0.0.0-20251022160648-4a7a07f9b7b4
knative.dev/hack/schema v0.0.0-20250331013814-c577ed9f7775
knative.dev/pkg v0.0.0-20250415155312-ed3e2158b883
knative.dev/reconciler-test v0.0.0-20250415170512-23f86169156f
knative.dev/reconciler-test v0.0.0-20251023120746-56e79a32e54e
sigs.k8s.io/yaml v1.4.0
)

Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1136,14 +1136,14 @@ k8s.io/kube-openapi v0.0.0-20241212222426-2c72e554b1e7 h1:hcha5B1kVACrLujCKLbr8X
k8s.io/kube-openapi v0.0.0-20241212222426-2c72e554b1e7/go.mod h1:GewRfANuJ70iYzvn+i4lezLDAFzvjxZYK1gn1lWcfas=
k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJJI8IUa1AmH/qa0=
k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
knative.dev/hack v0.0.0-20250331013814-c577ed9f7775 h1:UstB8/aowofYFHjLyZdPh1K7qB9BCx+lP1WuiCspYRE=
knative.dev/hack v0.0.0-20250331013814-c577ed9f7775/go.mod h1:R0ritgYtjLDO9527h5vb5X6gfvt5LCrJ55BNbVDsWiY=
knative.dev/hack v0.0.0-20251022160648-4a7a07f9b7b4 h1:CLFOOEXo5A278ScEdqOf4UnekCtYJ9l1PCsAi4eQp+E=
knative.dev/hack v0.0.0-20251022160648-4a7a07f9b7b4/go.mod h1:R0ritgYtjLDO9527h5vb5X6gfvt5LCrJ55BNbVDsWiY=
knative.dev/hack/schema v0.0.0-20250331013814-c577ed9f7775 h1:Sym8bvcHBX0J7CNwRvu5vpFBl3byyvEl0S7vcWTX6Lc=
knative.dev/hack/schema v0.0.0-20250331013814-c577ed9f7775/go.mod h1:KkibP1IazICP5ClxwN5D26LDSygsqbYnVGuGFTsHNOQ=
knative.dev/pkg v0.0.0-20250415155312-ed3e2158b883 h1:UeOY7009M0EHwdyW3P35Fc1U6FJHzBrj6Gf370do8zY=
knative.dev/pkg v0.0.0-20250415155312-ed3e2158b883/go.mod h1:ptwLYr04MAyeoRvhnhhz0FFkVZTdYJV2QWnw9sZyFSM=
knative.dev/reconciler-test v0.0.0-20250415170512-23f86169156f h1:4JZHD997Yav2K6JJU93sjxvcPXNHVY4lC1dWhzyeBXg=
knative.dev/reconciler-test v0.0.0-20250415170512-23f86169156f/go.mod h1:jrNdg5OPDhfxYxXDLqA4iv9zvfLhNYpYKmaQvz4ZpRM=
knative.dev/reconciler-test v0.0.0-20251023120746-56e79a32e54e h1:TTTOKCwJjCOf0Uix9fbaeuKWI46yCpDTZeqaM82G1S8=
knative.dev/reconciler-test v0.0.0-20251023120746-56e79a32e54e/go.mod h1:jrNdg5OPDhfxYxXDLqA4iv9zvfLhNYpYKmaQvz4ZpRM=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
Expand Down
2 changes: 1 addition & 1 deletion pkg/channel/event_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth
return
}

/// Here we do the OIDC audience verification
// Here we do the OIDC audience verification
features := feature.FromContext(ctx)
if features.IsOIDCAuthentication() {
r.logger.Debug("OIDC authentication is enabled")
Expand Down
27 changes: 13 additions & 14 deletions pkg/channel/event_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
},
"path based channel reference": {
path: "/new-namespace/new-channel",
host: "test-name.test-namespace.svc." + network.GetClusterDomainName(),
host: host(),
receiverFunc: func(ctx context.Context, r ChannelReference, m event.Event, additionalHeaders nethttp.Header) error {
if r.Namespace != "new-namespace" || r.Name != "new-channel" {
return fmt.Errorf("bad channel reference %v", r)
Expand All @@ -107,9 +107,9 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
"x-requEst-id": {"1234"},
"knatIve-will-pass-through": {"true", "always"},
},
host: "test-name.test-namespace.svc." + network.GetClusterDomainName(),
host: host(),
receiverFunc: func(ctx context.Context, r ChannelReference, e event.Event, additionalHeaders nethttp.Header) error {
if r.Namespace != "test-namespace" || r.Name != "test-name" {
if r.Namespace != "test-namespace" || r.Name != "test-channel" {
return fmt.Errorf("test receiver func -- bad reference: %v", r)
}

Expand Down Expand Up @@ -138,7 +138,7 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
},
"OPTIONS okay": {
method: nethttp.MethodOptions,
host: "test-name.test-namespace.svc." + network.GetClusterDomainName(),
host: host(),
expected: nethttp.StatusOK,
responseValidator: func(res httptest.ResponseRecorder) error {
expectedHeaders := nethttp.Header{
Expand All @@ -164,7 +164,7 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
tc.path = "/"
}
if tc.host == "" {
tc.host = "test-channel.test-namespace.svc." + network.GetClusterDomainName()
tc.host = host()
}

f := tc.receiverFunc
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) {
done := make(chan struct{}, 1)

receiverFunc := func(ctx context.Context, r ChannelReference, e event.Event, additionalHeaders nethttp.Header) error {
if r.Namespace != "test-namespace" || r.Name != "test-name" {
if r.Namespace != "test-namespace" || r.Name != "test-channel" {
return fmt.Errorf("test receiver func -- bad reference: %v", r)
}

Expand All @@ -232,8 +232,6 @@ func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) {

// Default the common things.
method := nethttp.MethodPost
host := "test-name.test-namespace.svc." + network.GetClusterDomainName()

reporter := NewStatsReporter("testcontainer", "testpod")
logger, _ := zap.NewDevelopment()

Expand Down Expand Up @@ -261,7 +259,7 @@ func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) {
Propagation: tracecontextb3.TraceContextEgress,
}))
require.NoError(t, err)
p.RequestTemplate.Host = host
p.RequestTemplate.Host = host()

c, err := cloudevents.NewClient(p, client.WithObservabilityService(obsclient.New()))
require.NoError(t, err)
Expand All @@ -277,8 +275,6 @@ func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) {

func TestEventReceiver_WrongRequest(t *testing.T) {
reporter := NewStatsReporter("testcontainer", "testpod")
host := "http://test-channel.test-namespace.svc." + network.GetClusterDomainName() + "/"

f := func(_ context.Context, _ ChannelReference, _ event.Event, _ nethttp.Header) error {
return errors.New("test induced receiver function error")
}
Expand All @@ -287,7 +283,7 @@ func TestEventReceiver_WrongRequest(t *testing.T) {
t.Fatalf("Error creating new event receiver. Error:%s", err)
}

req := httptest.NewRequest(nethttp.MethodPost, host, bytes.NewReader([]byte("{}")))
req := httptest.NewRequest(nethttp.MethodPost, "http://"+host()+"/", bytes.NewReader([]byte("{}")))
req.Header.Set("content-type", "application/json")

res := httptest.ResponseRecorder{}
Expand All @@ -299,7 +295,6 @@ func TestEventReceiver_WrongRequest(t *testing.T) {
}

func TestEventReceiver_UnknownHost(t *testing.T) {
host := "http://test-channel.test-namespace.svc." + network.GetClusterDomainName() + "/"
reporter := NewStatsReporter("testcontainer", "testpod")

f := func(_ context.Context, _ ChannelReference, _ event.Event, _ nethttp.Header) error {
Expand All @@ -323,7 +318,7 @@ func TestEventReceiver_UnknownHost(t *testing.T) {
}

req := httptest.NewRequest("POST", "http://localhost:8080/", nil)
req.Host = host
req.Host = "http://" + host() + "/"

err = http.WriteRequest(context.TODO(), binding.ToMessage(&event), req)
if err != nil {
Expand All @@ -337,3 +332,7 @@ func TestEventReceiver_UnknownHost(t *testing.T) {
t.Fatal("Unexpected status code. Expected 404. Actual", res.Code)
}
}

func host() string {
return fmt.Sprintf("test-channel%s.test-namespace.svc.%s", K8ServiceNameSuffix, network.GetClusterDomainName())
}
10 changes: 10 additions & 0 deletions pkg/channel/references.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (
"strings"
)

const (
// K8ServiceNameSuffix is added to the k8 service name which is owned by the channel
K8ServiceNameSuffix = "-kn-channel"
)

// ChannelReference references a Channel within the cluster by name and
// namespace.
type ChannelReference struct {
Expand All @@ -38,6 +43,11 @@ func ParseChannelFromHost(host string) (ChannelReference, error) {
if len(chunks) < 2 {
return ChannelReference{}, BadRequestError(fmt.Sprintf("bad host format %q", host))
}

if channelName, found := strings.CutSuffix(chunks[0], K8ServiceNameSuffix); found {
chunks[0] = channelName
}

return ChannelReference{
Name: chunks[0],
Namespace: chunks[1],
Expand Down
22 changes: 15 additions & 7 deletions pkg/channel/references_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,24 @@ func TestParseChannelFromHost(t *testing.T) {
wantErr bool
expectedChannelRef ChannelReference
}{
"host based with channel k8 service suffix": {
host: fmt.Sprintf("%s%s.%s.svc.cluster.local", referencesTestChannelName, K8ServiceNameSuffix, referencesTestNamespace),
wantErr: false,
expectedChannelRef: ChannelReference{
Namespace: referencesTestNamespace,
Name: referencesTestChannelName,
},
},
"host based": {
host: "test-channel.test-namespace.svc.cluster.local",
host: fmt.Sprintf("%s.%s.svc.cluster.local", referencesTestChannelName, referencesTestNamespace),
wantErr: false,
expectedChannelRef: ChannelReference{
Namespace: "test-namespace",
Name: "test-channel",
Namespace: referencesTestNamespace,
Name: referencesTestChannelName,
},
},
"bad host format should return error": {
host: "test-channel",
host: referencesTestChannelName,
wantErr: true,
},
}
Expand Down Expand Up @@ -91,11 +99,11 @@ func TestParseChannelFromPath(t *testing.T) {
expectedChannelRef ChannelReference
}{
"path based": {
path: "/new-namespace/new-channel/",
path: fmt.Sprintf("/%s/%s/", referencesTestNamespace, referencesTestChannelName),
wantErr: false,
expectedChannelRef: ChannelReference{
Namespace: "new-namespace",
Name: "new-channel",
Namespace: referencesTestNamespace,
Name: referencesTestChannelName,
},
},

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"knative.dev/pkg/resolver"

"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/channel"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/eventingtls/eventingtlstesting"
Expand Down Expand Up @@ -70,6 +71,7 @@ const (
dlsName = "test-dls"
testNS = "test-namespace"
imcName = "test-imc"
imcSvcName = imcName + channel.K8ServiceNameSuffix
imageName = "test-image"
maxIdleConns = 2000
maxIdleConnsPerHost = 200
Expand All @@ -83,7 +85,7 @@ const (
var (
channelServiceAddress = duckv1.Addressable{
Name: pointer.String("http"),
URL: apis.HTTP("test-imc-kn-channel.test-namespace.svc.cluster.local"),
URL: apis.HTTP(fmt.Sprintf("%s.test-namespace.svc.cluster.local", imcSvcName)),
}

channelAudience = fmt.Sprintf("messaging.knative.dev/inmemorychannel/%s/%s", testNS, imcName)
Expand Down Expand Up @@ -426,11 +428,11 @@ func TestAllCases(t *testing.T) {
WithInMemoryChannelDeploymentReady(),
WithInMemoryChannelServiceReady(),
WithInMemoryChannelEndpointsReady(),
WithInMemoryChannelChannelServiceNotReady("ChannelServiceFailed", `Channel Service failed: inmemorychannel: test-namespace/test-imc does not own Service: "test-imc-kn-channel"`),
WithInMemoryChannelChannelServiceNotReady("ChannelServiceFailed", fmt.Sprintf(`Channel Service failed: inmemorychannel: test-namespace/test-imc does not own Service: "%s"`, imcSvcName)),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeWarning, "InternalError", `inmemorychannel: test-namespace/test-imc does not own Service: "test-imc-kn-channel"`),
Eventf(corev1.EventTypeWarning, "InternalError", fmt.Sprintf(`inmemorychannel: test-namespace/test-imc does not own Service: "%s"`, imcSvcName)),
},
}, {
Name: "Works, channel exists with subscribers",
Expand Down Expand Up @@ -986,7 +988,7 @@ func makeChannelService(imc *v1.InMemoryChannel) *corev1.Service {
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNS,
Name: fmt.Sprintf("%s-kn-channel", imcName),
Name: imcSvcName,
Labels: map[string]string{
resources.MessagingRoleLabel: resources.MessagingRole,
},
Expand Down Expand Up @@ -1016,7 +1018,7 @@ func makeChannelServiceNotOwnedByUs(imc *v1.InMemoryChannel) *corev1.Service {
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNS,
Name: fmt.Sprintf("%s-kn-channel", imcName),
Name: imcSvcName,
Labels: map[string]string{
resources.MessagingRoleLabel: resources.MessagingRole,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/channel"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/network"
)
Expand All @@ -35,7 +36,7 @@ const (
type K8sServiceOption func(*corev1.Service) error

func CreateChannelServiceName(name string) string {
return kmeta.ChildName(name, "-kn-channel")
return kmeta.ChildName(name, channel.K8ServiceNameSuffix)
}

// ExternalService is a functional option for CreateK8sService to create a K8s service of type ExternalName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@ package resources

import (
"errors"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/channel"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/network"
)

const (
serviceName = "my-test-service"
imcName = "my-test-imc"
imcSvcName = imcName + channel.K8ServiceNameSuffix
testNS = "my-test-ns"
dispatcherNS = "dispatcher-namespace"
dispatcherName = "dispatcher-name"
Expand All @@ -44,7 +45,7 @@ func TestCreateExternalServiceAddress(t *testing.T) {
}

func TestCreateChannelServiceAddress(t *testing.T) {
if want, got := "my-test-imc-kn-channel", CreateChannelServiceName(imcName); want != got {
if want, got := imcSvcName, CreateChannelServiceName(imcName); want != got {
t.Errorf("Want: %q got %q", want, got)
}
}
Expand All @@ -62,7 +63,7 @@ func TestNewK8sService(t *testing.T) {
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-kn-channel", imcName),
Name: CreateChannelServiceName(imcName),
Namespace: testNS,
Labels: map[string]string{
MessagingRoleLabel: MessagingRole,
Expand Down Expand Up @@ -105,7 +106,7 @@ func TestNewK8sServiceWithExternal(t *testing.T) {
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-kn-channel", imcName),
Name: CreateChannelServiceName(imcName),
Namespace: testNS,
Labels: map[string]string{
MessagingRoleLabel: MessagingRole,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dispatcher

import (
"context"
"fmt"
"net/http"
"reflect"
"testing"
Expand All @@ -43,6 +44,7 @@ import (
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/channel/fanout"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
"knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/inmemorychannel"
Expand All @@ -63,7 +65,7 @@ const (

var (
channelServiceAddress = duckv1.Addressable{
URL: apis.HTTP("test-imc-kn-channel.test-namespace.svc.cluster.local"),
URL: apis.HTTP(fmt.Sprintf("%s%s.%s.svc.cluster.local", imcName, channel.K8ServiceNameSuffix, testNS)),
}

linear = eventingduckv1.BackoffPolicyLinear
Expand Down
Loading
Loading