Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/kind-e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.18.x
go-version: 1.19.x

- name: Install Dependencies
working-directory: ./
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module knative.dev/eventing

go 1.18
go 1.19
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed to be able to use x509.CertPool.Clone()


require (
github.com/ahmetb/gen-crd-api-reference-docs v0.3.1-0.20210420163308-c1402a70e2f1
Expand Down
21 changes: 19 additions & 2 deletions pkg/adapter/v2/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import (
"github.com/cloudevents/sdk-go/v2/protocol"
"github.com/cloudevents/sdk-go/v2/protocol/http"
"go.opencensus.io/plugin/ochttp"

duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/tracing/propagation/tracecontextb3"

"knative.dev/eventing/pkg/adapter/v2/util/crstatusevent"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/metrics/source"
obsclient "knative.dev/eventing/pkg/observability/client"
)
Expand Down Expand Up @@ -92,8 +92,25 @@ func newCloudEventsClientCRStatus(env EnvConfigAccessor, ceOverrides *duckv1.Clo
if sinkWait := env.GetSinktimeout(); sinkWait > 0 {
pOpts = append(pOpts, setTimeOut(time.Duration(sinkWait)*time.Second))
}
var err error
if caCerts := env.GetCACerts(); (caCerts != nil && *caCerts != "") && eventingtls.IsHttpsSink(env.GetSink()) {
var err error

clientConfig := eventingtls.NewDefaultClientConfig()
clientConfig.CACerts = caCerts

transport := nethttp.DefaultTransport.(*nethttp.Transport).Clone()
transport.TLSClientConfig, err = eventingtls.GetTLSClientConfig(clientConfig)
if err != nil {
return nil, err
}

pOpts = append(pOpts, http.WithRoundTripper(&ochttp.Transport{
Base: transport,
Propagation: tracecontextb3.TraceContextEgress,
}))
}
if ceOverrides == nil {
var err error
ceOverrides, err = env.GetCloudEventOverrides()
if err != nil {
return nil, err
Expand Down
71 changes: 70 additions & 1 deletion pkg/adapter/v2/cloudevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,16 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
v2client "github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/protocol/http"
cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/stretchr/testify/assert"
"k8s.io/utils/pointer"
duckv1 "knative.dev/pkg/apis/duck/v1"

. "knative.dev/pkg/reconciler/testing"

"knative.dev/eventing/pkg/adapter/v2/test"
"knative.dev/eventing/pkg/eventingtls/eventingtlstesting"
"knative.dev/eventing/pkg/metrics/source"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

type mockReporter struct {
Expand Down Expand Up @@ -293,6 +300,68 @@ func TestNewCloudEventsClient_request(t *testing.T) {
}
}

func TestTLS(t *testing.T) {

ctx, _ := SetupFakeContext(t)
ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, 20*time.Millisecond, 5)
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)

ca := eventingtlstesting.StartServer(ctx, t, 8333)

event := cetest.MinEvent()

tt := []struct {
name string
sink string
caCerts *string
wantErr bool
}{
{
name: "https sink URL, no CA certs fail",
sink: "https://localhost:8333",
wantErr: true,
},
{
name: "https sink URL with ca certs",
sink: "https://localhost:8333",
caCerts: pointer.String(ca),
},
{
name: "http sink URL with ca certs",
sink: "http://localhost:8333",
caCerts: pointer.String(ca),
wantErr: true,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
reporter, err := source.NewStatsReporter()
assert.Nil(t, err)

c, err := NewCloudEventsClientCRStatus(
&EnvConfig{
Sink: tc.sink,
CACerts: tc.caCerts,
},
reporter,
nil,
)
assert.Nil(t, err)

result := c.Send(ctx, event)
if tc.wantErr {
if cloudevents.IsACK(result) {
t.Fatalf("wantErr %v, got %v IsACK %v", tc.wantErr, result, cloudevents.IsACK(result))
}
} else if cloudevents.IsNACK(result) || cloudevents.IsUndelivered(result) {
t.Fatalf("wantErr %v, got %v IsACK %v", tc.wantErr, result, cloudevents.IsACK(result))
}
})
}
}

func validateSent(t *testing.T, ce *test.TestCloudEventsClient, want string) {
if got := len(ce.Sent()); got != 1 {
t.Error("Expected 1 event to be sent, got", got)
Expand Down
12 changes: 12 additions & 0 deletions pkg/adapter/v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ type EnvConfig struct {
// Sink is the URI messages will be sent.
Sink string `envconfig:"K_SINK"`

// CACerts are the Certification Authority (CA) certificates in PEM format
// according to https://www.rfc-editor.org/rfc/rfc7468.
// +optional
CACerts *string `envconfig:"K_CA_CERTS"`

// CEOverrides are the CloudEvents overrides to be applied to the outbound event.
CEOverrides string `envconfig:"K_CE_OVERRIDES"`

Expand Down Expand Up @@ -104,6 +109,9 @@ type EnvConfigAccessor interface {
// Get the URI where messages will be forwarded to.
GetSink() string

// GetCACerts gets the CACerts of the Sink.
GetCACerts() *string

// Get the namespace of the adapter.
GetNamespace() string

Expand Down Expand Up @@ -163,6 +171,10 @@ func (e *EnvConfig) GetSink() string {
return e.Sink
}

func (e *EnvConfig) GetCACerts() *string {
return e.CACerts
}

func (e *EnvConfig) GetNamespace() string {
return e.Namespace
}
Expand Down
95 changes: 47 additions & 48 deletions pkg/adapter/v2/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package adapter
import (
"encoding/json"
"fmt"
"os"
"testing"
"time"

Expand All @@ -36,25 +35,14 @@ type myEnvConfig struct {
}

func TestEnvConfig(t *testing.T) {
os.Setenv("K_SINK", "http://sink")
os.Setenv("NAMESPACE", "ns")
os.Setenv("K_METRICS_CONFIG", "metrics")
os.Setenv("K_LOGGING_CONFIG", "logging")
os.Setenv("K_TRACING_CONFIG", "tracing")
os.Setenv("K_LEADER_ELECTION_CONFIG", "leaderelection")
os.Setenv("K_SINK_TIMEOUT", "999")
os.Setenv("MODE", "mymode") // note: custom to this test impl

defer func() {
os.Unsetenv("K_SINK")
os.Unsetenv("NAMESPACE")
os.Unsetenv("K_METRICS_CONFIG")
os.Unsetenv("K_LOGGING_CONFIG")
os.Unsetenv("K_TRACING_CONFIG")
os.Unsetenv("K_LEADER_ELECTION_CONFIG")
os.Unsetenv("MODE")
os.Unsetenv("K_SINK_TIMEOUT")
}()
t.Setenv("K_SINK", "http://sink")
t.Setenv("NAMESPACE", "ns")
t.Setenv("K_METRICS_CONFIG", "metrics")
t.Setenv("K_LOGGING_CONFIG", "logging")
t.Setenv("K_TRACING_CONFIG", "tracing")
t.Setenv("K_LEADER_ELECTION_CONFIG", "leaderelection")
t.Setenv("K_SINK_TIMEOUT", "999")
t.Setenv("MODE", "mymode") // note: custom to this test impl

var env myEnvConfig
err := envconfig.Process("", &env)
Expand All @@ -81,13 +69,14 @@ func TestEnvConfig(t *testing.T) {
if env.EnvSinkTimeout != "999" {
t.Error("Expected env.EnvSinkTimeout to be 999, got:", env.EnvSinkTimeout)
}

if env.CACerts != nil {
t.Error("Expected CACerts to be nil")
}
}

func TestEmptySinkTimeout(t *testing.T) {
os.Setenv("K_SINK_TIMEOUT", "")
defer func() {
os.Unsetenv("K_SINK_TIMEOUT")
}()
t.Setenv("K_SINK_TIMEOUT", "")

var env myEnvConfig
err := envconfig.Process("", &env)
Expand Down Expand Up @@ -117,10 +106,7 @@ func TestGetName(t *testing.T) {
func TestGetName_Override(t *testing.T) {
want := "custom-name"

os.Setenv("NAME", want)
defer func() {
os.Unsetenv("NAME")
}()
t.Setenv("NAME", want)

var env myEnvConfig
err := envconfig.Process("", &env)
Expand Down Expand Up @@ -150,10 +136,7 @@ func TestGetNamespace(t *testing.T) {
func TestGetNamespace_Override(t *testing.T) {
want := "custom-namespace"

os.Setenv("NAMESPACE", want)
defer func() {
os.Unsetenv("NAMESPACE")
}()
t.Setenv("NAMESPACE", want)

var env myEnvConfig
err := envconfig.Process("", &env)
Expand All @@ -171,10 +154,7 @@ func TestGetCloudEventOverrides(t *testing.T) {
want.Extensions = map[string]string{"quack": "attack"}
wantJson, _ := json.Marshal(want)

os.Setenv("K_CE_OVERRIDES", string(wantJson))
defer func() {
os.Unsetenv("K_CE_OVERRIDES")
}()
t.Setenv("K_CE_OVERRIDES", string(wantJson))

var env myEnvConfig
err := envconfig.Process("", &env)
Expand All @@ -190,10 +170,7 @@ func TestGetCloudEventOverrides(t *testing.T) {
}

func TestGetCloudEventOverrides_BadJson(t *testing.T) {
os.Setenv("K_CE_OVERRIDES", "quack attack")
defer func() {
os.Unsetenv("K_CE_OVERRIDES")
}()
t.Setenv("K_CE_OVERRIDES", "quack attack")

var env myEnvConfig
err := envconfig.Process("", &env)
Expand All @@ -207,10 +184,7 @@ func TestGetCloudEventOverrides_BadJson(t *testing.T) {
}

func TestGetLeaderElectionConfig(t *testing.T) {
os.Setenv("K_COMPONENT", "Gotham")
defer func() {
os.Unsetenv("K_COMPONENT")
}()
t.Setenv("K_COMPONENT", "Gotham")

want := new(kle.ComponentConfig)
want.Buckets = 2
Expand All @@ -224,10 +198,7 @@ func TestGetLeaderElectionConfig(t *testing.T) {

fmt.Println(wantJson)

os.Setenv("K_LEADER_ELECTION_CONFIG", wantJson)
defer func() {
os.Unsetenv("K_LEADER_ELECTION_CONFIG")
}()
t.Setenv("K_LEADER_ELECTION_CONFIG", wantJson)

var env myEnvConfig
err := envconfig.Process("", &env)
Expand All @@ -241,3 +212,31 @@ func TestGetLeaderElectionConfig(t *testing.T) {
t.Errorf("GetLeaderElectionConfig (-want, +got) = %v", diff)
}
}

func TestCACerts(t *testing.T) {
tt := []struct {
value string
}{
{value: "foo"},
{value: ""},
}

for _, tc := range tt {
t.Run(tc.value, func(t *testing.T) {
t.Setenv("K_CA_CERTS", tc.value)

var env myEnvConfig
err := envconfig.Process("", &env)
if err != nil {
t.Error("Expected no error:", err)
}

if env.CACerts == nil {
t.Error("Expected CACerts to be non nil")
}
if *env.CACerts != tc.value {
t.Errorf("Expected CACerts to be %v, got %v", tc.value, *env.CACerts)
}
})
}
}
Loading