diff --git a/.github/workflows/kind-e2e.yaml b/.github/workflows/kind-e2e.yaml index 4c82a28fd17..b50c163512e 100644 --- a/.github/workflows/kind-e2e.yaml +++ b/.github/workflows/kind-e2e.yaml @@ -59,7 +59,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.18.x + go-version: 1.19.x - name: Install Dependencies working-directory: ./ diff --git a/go.mod b/go.mod index 46fefcf4ce9..f2092f5586e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module knative.dev/eventing -go 1.18 +go 1.19 require ( github.com/ahmetb/gen-crd-api-reference-docs v0.3.1-0.20210420163308-c1402a70e2f1 diff --git a/pkg/adapter/v2/cloudevents.go b/pkg/adapter/v2/cloudevents.go index 6ecc37f534e..dae960b5e4b 100644 --- a/pkg/adapter/v2/cloudevents.go +++ b/pkg/adapter/v2/cloudevents.go @@ -31,11 +31,11 @@ import ( "github.com/cloudevents/sdk-go/v2/protocol" "github.com/cloudevents/sdk-go/v2/protocol/http" "go.opencensus.io/plugin/ochttp" - duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/tracing/propagation/tracecontextb3" "knative.dev/eventing/pkg/adapter/v2/util/crstatusevent" + "knative.dev/eventing/pkg/eventingtls" "knative.dev/eventing/pkg/metrics/source" obsclient "knative.dev/eventing/pkg/observability/client" ) @@ -92,8 +92,25 @@ func newCloudEventsClientCRStatus(env EnvConfigAccessor, ceOverrides *duckv1.Clo if sinkWait := env.GetSinktimeout(); sinkWait > 0 { pOpts = append(pOpts, setTimeOut(time.Duration(sinkWait)*time.Second)) } - var err error + if caCerts := env.GetCACerts(); (caCerts != nil && *caCerts != "") && eventingtls.IsHttpsSink(env.GetSink()) { + var err error + + clientConfig := eventingtls.NewDefaultClientConfig() + clientConfig.CACerts = caCerts + + transport := nethttp.DefaultTransport.(*nethttp.Transport).Clone() + transport.TLSClientConfig, err = eventingtls.GetTLSClientConfig(clientConfig) + if err != nil { + return nil, err + } + + pOpts = append(pOpts, http.WithRoundTripper(&ochttp.Transport{ + Base: transport, + Propagation: tracecontextb3.TraceContextEgress, + })) + } if ceOverrides == nil { + var err error ceOverrides, err = env.GetCloudEventOverrides() if err != nil { return nil, err diff --git a/pkg/adapter/v2/cloudevents_test.go b/pkg/adapter/v2/cloudevents_test.go index 7beda242ec1..2b9fd500ce4 100644 --- a/pkg/adapter/v2/cloudevents_test.go +++ b/pkg/adapter/v2/cloudevents_test.go @@ -26,9 +26,16 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" v2client "github.com/cloudevents/sdk-go/v2/client" "github.com/cloudevents/sdk-go/v2/protocol/http" + cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/stretchr/testify/assert" + "k8s.io/utils/pointer" + duckv1 "knative.dev/pkg/apis/duck/v1" + + . "knative.dev/pkg/reconciler/testing" + "knative.dev/eventing/pkg/adapter/v2/test" + "knative.dev/eventing/pkg/eventingtls/eventingtlstesting" "knative.dev/eventing/pkg/metrics/source" - duckv1 "knative.dev/pkg/apis/duck/v1" ) type mockReporter struct { @@ -293,6 +300,68 @@ func TestNewCloudEventsClient_request(t *testing.T) { } } +func TestTLS(t *testing.T) { + + ctx, _ := SetupFakeContext(t) + ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, 20*time.Millisecond, 5) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + ca := eventingtlstesting.StartServer(ctx, t, 8333) + + event := cetest.MinEvent() + + tt := []struct { + name string + sink string + caCerts *string + wantErr bool + }{ + { + name: "https sink URL, no CA certs fail", + sink: "https://localhost:8333", + wantErr: true, + }, + { + name: "https sink URL with ca certs", + sink: "https://localhost:8333", + caCerts: pointer.String(ca), + }, + { + name: "http sink URL with ca certs", + sink: "http://localhost:8333", + caCerts: pointer.String(ca), + wantErr: true, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + reporter, err := source.NewStatsReporter() + assert.Nil(t, err) + + c, err := NewCloudEventsClientCRStatus( + &EnvConfig{ + Sink: tc.sink, + CACerts: tc.caCerts, + }, + reporter, + nil, + ) + assert.Nil(t, err) + + result := c.Send(ctx, event) + if tc.wantErr { + if cloudevents.IsACK(result) { + t.Fatalf("wantErr %v, got %v IsACK %v", tc.wantErr, result, cloudevents.IsACK(result)) + } + } else if cloudevents.IsNACK(result) || cloudevents.IsUndelivered(result) { + t.Fatalf("wantErr %v, got %v IsACK %v", tc.wantErr, result, cloudevents.IsACK(result)) + } + }) + } +} + func validateSent(t *testing.T, ce *test.TestCloudEventsClient, want string) { if got := len(ce.Sent()); got != 1 { t.Error("Expected 1 event to be sent, got", got) diff --git a/pkg/adapter/v2/config.go b/pkg/adapter/v2/config.go index ba499be49f9..9cfafcd792d 100644 --- a/pkg/adapter/v2/config.go +++ b/pkg/adapter/v2/config.go @@ -65,6 +65,11 @@ type EnvConfig struct { // Sink is the URI messages will be sent. Sink string `envconfig:"K_SINK"` + // CACerts are the Certification Authority (CA) certificates in PEM format + // according to https://www.rfc-editor.org/rfc/rfc7468. + // +optional + CACerts *string `envconfig:"K_CA_CERTS"` + // CEOverrides are the CloudEvents overrides to be applied to the outbound event. CEOverrides string `envconfig:"K_CE_OVERRIDES"` @@ -104,6 +109,9 @@ type EnvConfigAccessor interface { // Get the URI where messages will be forwarded to. GetSink() string + // GetCACerts gets the CACerts of the Sink. + GetCACerts() *string + // Get the namespace of the adapter. GetNamespace() string @@ -163,6 +171,10 @@ func (e *EnvConfig) GetSink() string { return e.Sink } +func (e *EnvConfig) GetCACerts() *string { + return e.CACerts +} + func (e *EnvConfig) GetNamespace() string { return e.Namespace } diff --git a/pkg/adapter/v2/config_test.go b/pkg/adapter/v2/config_test.go index e2537ef69dd..c9aaaf67a9a 100644 --- a/pkg/adapter/v2/config_test.go +++ b/pkg/adapter/v2/config_test.go @@ -19,7 +19,6 @@ package adapter import ( "encoding/json" "fmt" - "os" "testing" "time" @@ -36,25 +35,14 @@ type myEnvConfig struct { } func TestEnvConfig(t *testing.T) { - os.Setenv("K_SINK", "http://sink") - os.Setenv("NAMESPACE", "ns") - os.Setenv("K_METRICS_CONFIG", "metrics") - os.Setenv("K_LOGGING_CONFIG", "logging") - os.Setenv("K_TRACING_CONFIG", "tracing") - os.Setenv("K_LEADER_ELECTION_CONFIG", "leaderelection") - os.Setenv("K_SINK_TIMEOUT", "999") - os.Setenv("MODE", "mymode") // note: custom to this test impl - - defer func() { - os.Unsetenv("K_SINK") - os.Unsetenv("NAMESPACE") - os.Unsetenv("K_METRICS_CONFIG") - os.Unsetenv("K_LOGGING_CONFIG") - os.Unsetenv("K_TRACING_CONFIG") - os.Unsetenv("K_LEADER_ELECTION_CONFIG") - os.Unsetenv("MODE") - os.Unsetenv("K_SINK_TIMEOUT") - }() + t.Setenv("K_SINK", "http://sink") + t.Setenv("NAMESPACE", "ns") + t.Setenv("K_METRICS_CONFIG", "metrics") + t.Setenv("K_LOGGING_CONFIG", "logging") + t.Setenv("K_TRACING_CONFIG", "tracing") + t.Setenv("K_LEADER_ELECTION_CONFIG", "leaderelection") + t.Setenv("K_SINK_TIMEOUT", "999") + t.Setenv("MODE", "mymode") // note: custom to this test impl var env myEnvConfig err := envconfig.Process("", &env) @@ -81,13 +69,14 @@ func TestEnvConfig(t *testing.T) { if env.EnvSinkTimeout != "999" { t.Error("Expected env.EnvSinkTimeout to be 999, got:", env.EnvSinkTimeout) } + + if env.CACerts != nil { + t.Error("Expected CACerts to be nil") + } } func TestEmptySinkTimeout(t *testing.T) { - os.Setenv("K_SINK_TIMEOUT", "") - defer func() { - os.Unsetenv("K_SINK_TIMEOUT") - }() + t.Setenv("K_SINK_TIMEOUT", "") var env myEnvConfig err := envconfig.Process("", &env) @@ -117,10 +106,7 @@ func TestGetName(t *testing.T) { func TestGetName_Override(t *testing.T) { want := "custom-name" - os.Setenv("NAME", want) - defer func() { - os.Unsetenv("NAME") - }() + t.Setenv("NAME", want) var env myEnvConfig err := envconfig.Process("", &env) @@ -150,10 +136,7 @@ func TestGetNamespace(t *testing.T) { func TestGetNamespace_Override(t *testing.T) { want := "custom-namespace" - os.Setenv("NAMESPACE", want) - defer func() { - os.Unsetenv("NAMESPACE") - }() + t.Setenv("NAMESPACE", want) var env myEnvConfig err := envconfig.Process("", &env) @@ -171,10 +154,7 @@ func TestGetCloudEventOverrides(t *testing.T) { want.Extensions = map[string]string{"quack": "attack"} wantJson, _ := json.Marshal(want) - os.Setenv("K_CE_OVERRIDES", string(wantJson)) - defer func() { - os.Unsetenv("K_CE_OVERRIDES") - }() + t.Setenv("K_CE_OVERRIDES", string(wantJson)) var env myEnvConfig err := envconfig.Process("", &env) @@ -190,10 +170,7 @@ func TestGetCloudEventOverrides(t *testing.T) { } func TestGetCloudEventOverrides_BadJson(t *testing.T) { - os.Setenv("K_CE_OVERRIDES", "quack attack") - defer func() { - os.Unsetenv("K_CE_OVERRIDES") - }() + t.Setenv("K_CE_OVERRIDES", "quack attack") var env myEnvConfig err := envconfig.Process("", &env) @@ -207,10 +184,7 @@ func TestGetCloudEventOverrides_BadJson(t *testing.T) { } func TestGetLeaderElectionConfig(t *testing.T) { - os.Setenv("K_COMPONENT", "Gotham") - defer func() { - os.Unsetenv("K_COMPONENT") - }() + t.Setenv("K_COMPONENT", "Gotham") want := new(kle.ComponentConfig) want.Buckets = 2 @@ -224,10 +198,7 @@ func TestGetLeaderElectionConfig(t *testing.T) { fmt.Println(wantJson) - os.Setenv("K_LEADER_ELECTION_CONFIG", wantJson) - defer func() { - os.Unsetenv("K_LEADER_ELECTION_CONFIG") - }() + t.Setenv("K_LEADER_ELECTION_CONFIG", wantJson) var env myEnvConfig err := envconfig.Process("", &env) @@ -241,3 +212,31 @@ func TestGetLeaderElectionConfig(t *testing.T) { t.Errorf("GetLeaderElectionConfig (-want, +got) = %v", diff) } } + +func TestCACerts(t *testing.T) { + tt := []struct { + value string + }{ + {value: "foo"}, + {value: ""}, + } + + for _, tc := range tt { + t.Run(tc.value, func(t *testing.T) { + t.Setenv("K_CA_CERTS", tc.value) + + var env myEnvConfig + err := envconfig.Process("", &env) + if err != nil { + t.Error("Expected no error:", err) + } + + if env.CACerts == nil { + t.Error("Expected CACerts to be non nil") + } + if *env.CACerts != tc.value { + t.Errorf("Expected CACerts to be %v, got %v", tc.value, *env.CACerts) + } + }) + } +} diff --git a/pkg/eventingtls/eventingtls.go b/pkg/eventingtls/eventingtls.go new file mode 100644 index 00000000000..4e394952ab7 --- /dev/null +++ b/pkg/eventingtls/eventingtls.go @@ -0,0 +1,197 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventingtls + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "strings" + "sync/atomic" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + coreinformersv1 "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "knative.dev/pkg/apis" + "knative.dev/pkg/controller" + "knative.dev/pkg/logging" +) + +const ( + // TLSKey is the key in the TLS secret for the private key of TLS servers + TLSKey = "tls.key" + // TLSCrt is the key in the TLS secret for the public key of TLS servers + TLSCrt = "tls.crt" + // DefaultMinTLSVersion is the default minimum TLS version for servers and clients. + DefaultMinTLSVersion = tls.VersionTLS12 +) + +type ClientConfig struct { + // CACerts are Certification Authority (CA) certificates in PEM format + // according to https://www.rfc-editor.org/rfc/rfc7468. + CACerts *string +} + +type ServerConfig struct { + // GetCertificate returns a Certificate based on the given + // ClientHelloInfo. It will only be called if the client supplies SNI + // information or if Certificates is empty. + // + // If GetCertificate is nil or returns nil, then the certificate is + // retrieved from NameToCertificate. If NameToCertificate is nil, the + // best element of Certificates will be used. + GetCertificate func(*tls.ClientHelloInfo) (*tls.Certificate, error) +} + +// GetCertificate returns a Certificate based on the given +// ClientHelloInfo. It will only be called if the client supplies SNI +// information or if Certificates is empty. +// +// If GetCertificate is nil or returns nil, then the certificate is +// retrieved from NameToCertificate. If NameToCertificate is nil, the +// best element of Certificates will be used. +type GetCertificate func(*tls.ClientHelloInfo) (*tls.Certificate, error) + +// GetCertificateFromSecret returns a GetCertificate function that will automatically return +// the latest certificate that is present in the provided secret. +// +// The secret is expected to have at least 2 keys in data: see TLSKey and TLSCrt constants for +// knowing the key names. +func GetCertificateFromSecret(ctx context.Context, informer coreinformersv1.SecretInformer, kube kubernetes.Interface, secret types.NamespacedName) GetCertificate { + + certHolder := atomic.Value{} + + logger := logging.FromContext(ctx).Desugar(). + With(zap.String("tls.secret", secret.String())) + + store := func(obj interface{}) { + s, ok := obj.(*corev1.Secret) + if !ok { + return + } + crt, crtOk := s.Data[TLSCrt] + key, keyOk := s.Data[TLSKey] + if !crtOk || !keyOk { + logger.Debug("Missing " + TLSCrt + " or " + TLSKey + " in the secret.data") + return + } + + logger.Debug("Loading key pair") + + certificate, err := tls.X509KeyPair(crt, key) + if err != nil { + logger.Error("Failed to create x.509 key pair", zap.Error(err)) + return + } + + logger.Debug("certificate stored") + certHolder.Store(&certificate) + } + + informer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.FilterWithNameAndNamespace(secret.Namespace, secret.Name), + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: store, + UpdateFunc: func(_, newObj interface{}) { + store(newObj) + }, + DeleteFunc: nil, + }, + }) + + // Store the current value so that we have certHolder initialized. + firstValue, err := informer.Lister().Secrets(secret.Namespace).Get(secret.Name) + if err != nil { + // Try to get the secret from the API Server when the lister failed. + firstValue, err = kube.CoreV1().Secrets(secret.Namespace).Get(ctx, secret.Name, metav1.GetOptions{}) + if err != nil { + logger.Fatal(err.Error()) + } + } + store(firstValue) + + return func(info *tls.ClientHelloInfo) (*tls.Certificate, error) { + cert := certHolder.Load() + if cert == nil { + return nil, nil + } + return cert.(*tls.Certificate), nil + } +} + +// NewDefaultClientConfig returns a default ClientConfig. +func NewDefaultClientConfig() ClientConfig { + return ClientConfig{} +} + +// GetTLSClientConfig returns tls.Config based on the given ClientConfig. +func GetTLSClientConfig(config ClientConfig) (*tls.Config, error) { + pool, err := certPool(config.CACerts) + if err != nil { + return nil, err + } + + return &tls.Config{ + RootCAs: pool, + MinVersion: DefaultMinTLSVersion, + }, nil +} + +func NewDefaultServerConfig() ServerConfig { + return ServerConfig{} +} + +func GetTLSServerConfig(config ServerConfig) (*tls.Config, error) { + return &tls.Config{ + MinVersion: DefaultMinTLSVersion, + GetCertificate: config.GetCertificate, + }, nil +} + +// IsHttpsSink returns true if the sink has scheme equal to https. +func IsHttpsSink(sink string) bool { + s, err := apis.ParseURL(sink) + if err != nil { + return false + } + return strings.EqualFold(s.Scheme, "https") +} + +// certPool returns a x509.CertPool with the combined certs from: +// - the system cert pool +// - the given CA certificates +func certPool(caCerts *string) (*x509.CertPool, error) { + p, err := x509.SystemCertPool() + if err != nil { + return nil, err + } + + if caCerts == nil || *caCerts == "" { + return p, nil + } + + if ok := p.AppendCertsFromPEM([]byte(*caCerts)); !ok { + return p, fmt.Errorf("failed to append CA certs from PEM") + } + + return p, nil +} diff --git a/pkg/eventingtls/eventingtls_test.go b/pkg/eventingtls/eventingtls_test.go new file mode 100644 index 00000000000..ba6b7fd898b --- /dev/null +++ b/pkg/eventingtls/eventingtls_test.go @@ -0,0 +1,133 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventingtls + +import ( + "crypto/tls" + "crypto/x509" + "testing" + + "k8s.io/utils/pointer" +) + +func TestGetClientConfig(t *testing.T) { + t.Parallel() + + sysCertPool, err := x509.SystemCertPool() + if err != nil { + t.Fatal(err) + } + sysCertPool = sysCertPool.Clone() + + pemCaCert := ` +-----BEGIN CERTIFICATE----- +MIIDLTCCAhWgAwIBAgIJAOjtl0zhGBvpMA0GCSqGSIb3DQEBCwUAMC0xEzARBgNV +BAoMCmlvLnN0cmltemkxFjAUBgNVBAMMDWNsdXN0ZXItY2EgdjAwHhcNMjAxMjIw +MDgzNzU4WhcNMjExMjIwMDgzNzU4WjAtMRMwEQYDVQQKDAppby5zdHJpbXppMRYw +FAYDVQQDDA1jbHVzdGVyLWNhIHYwMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB +CgKCAQEAy7rIo+UwJh5dL6PhUfDe9wRuLgOf1ZeZmabd++eLc2kWL1r6TO8X034n +CerkREfjF+MjDoK30z9xvEURThoSi20a4i/Cb39on9T0AgOr5qCSrqlN9n4KtRey +ZLnKKA5QyLAM6kzyyvIg4PVwWCWFTQSicDPzqd2OmH6jtogD50FkbaP7LcyrKnWf +64gcR9CCEAcrO8tJdhcZP2Slxg+RvupVjXK1rdZcI6/liZ3Jp4hzApSRN30x/8wU +5eJYAtzaeWUvJ0Yq/7BH7uY8J+2Hwh+shhi5K98HBAKeISwuIJEQrWmmUer8WGp1 +IcBZqXbkd4dBXuFa0chO0gSKvzjKpQIDAQABo1AwTjAdBgNVHQ4EFgQUeascji1L +C2voPwDAlPL6iz8TzncwHwYDVR0jBBgwFoAUeascji1LC2voPwDAlPL6iz8Tzncw +DAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAIEL2uustCrpPas06LyoR +VR6QFHQJDcMgdL0CZFE46uLSgGupXO0ybmPP2ymBJ1zDNxx1qskNTwBsfBJLBAj6 +8LfJmhfw98QK8YQDJ/Xhx3fcVxjn6NjJ3RYOyb5bqSIGGCQZRmbMjerf71KMhP3X +rdYg2hVoCvfRcfP2G0jbWtMRK4+MlB3oEvhIvQQW1dw4sohw32HaNJnzb7dErEDB +Ha2zVM47CcNezdWYUD5NQzFqCRypgrIONafQI2S+Ck7aKOiqF03QSug4wizRbKhT +uYpQg59dUIOBebg0roRF326H2x6kFGn5L2o+TROrZeeXT8vyIl2R33o3E+ULpuw+ +Vw== +-----END CERTIFICATE----- +` + + tt := []struct { + name string + cfg ClientConfig + expected tls.Config + wantErr bool + }{ + { + name: "empty string CA certs", + cfg: ClientConfig{ + CACerts: pointer.String(""), + }, + expected: tls.Config{ + MinVersion: tls.VersionTLS12, + RootCAs: sysCertPool, + }, + }, + { + name: "nil CA certs", + cfg: ClientConfig{}, + expected: tls.Config{ + MinVersion: tls.VersionTLS12, + RootCAs: sysCertPool, + }, + }, + { + name: "Additional CA certs", + cfg: ClientConfig{ + CACerts: pointer.String(pemCaCert), + }, + expected: tls.Config{ + MinVersion: tls.VersionTLS12, + RootCAs: WithCerts(sysCertPool, pemCaCert), + }, + }, + { + name: "Additional broken CA certs", + cfg: ClientConfig{ + CACerts: pointer.String(pemCaCert[:len(pemCaCert)-30]), + }, + expected: tls.Config{ + MinVersion: tls.VersionTLS12, + }, + wantErr: true, + }, + } + + for i := range tt { + tc := &tt[i] + t.Run(tc.name, func(t *testing.T) { + got, err := GetTLSClientConfig(tc.cfg) + if tc.wantErr != (err != nil) { + t.Fatalf("Want err: %v, got %v", tc.wantErr, err) + } + if err != nil { + return + } + + if !got.RootCAs.Equal(tc.expected.RootCAs) { + t.Fatalf("Got RootCAs are not equal to expected RootCAs") + } + + if got.MinVersion != tc.expected.MinVersion { + t.Fatalf("want MinVersion %v, got %v", tc.expected.MinVersion, got.MinVersion) + } + }) + } +} + +func WithCerts(pool *x509.CertPool, caCerts string) *x509.CertPool { + pool = pool.Clone() + if ok := pool.AppendCertsFromPEM([]byte(caCerts)); !ok { + panic("Failed to append CA certs from PEM:\n" + caCerts) + } + return pool +} diff --git a/pkg/eventingtls/eventingtlstesting/eventingtlstesting.go b/pkg/eventingtls/eventingtlstesting/eventingtlstesting.go new file mode 100644 index 00000000000..59c7037f6f4 --- /dev/null +++ b/pkg/eventingtls/eventingtlstesting/eventingtlstesting.go @@ -0,0 +1,177 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventingtlstesting + +import ( + "context" + nethttp "net/http" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + kubeclient "knative.dev/pkg/client/injection/kube/client/fake" + secretinformer "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret/fake" + + "knative.dev/eventing/pkg/eventingtls" + "knative.dev/eventing/pkg/kncloudevents" +) + +func StartServer(ctx context.Context, t *testing.T, port int) string { + + secret := types.NamespacedName{ + Namespace: "knative-tests", + Name: "tls-secret", + } + + ca, key, crt := loadCerts() + + _ = secretinformer.Get(ctx).Informer().GetStore().Add(&corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: secret.Namespace, + Name: secret.Name, + }, + Data: map[string][]byte{ + "tls.key": key, + "tls.crt": crt, + }, + Type: corev1.SecretTypeTLS, + }) + + serverTLSConfig := eventingtls.NewDefaultServerConfig() + serverTLSConfig.GetCertificate = eventingtls.GetCertificateFromSecret(ctx, secretinformer.Get(ctx), kubeclient.Get(ctx), secret) + tlsConfig, err := eventingtls.GetTLSServerConfig(serverTLSConfig) + assert.Nil(t, err) + + receiver := kncloudevents.NewHTTPMessageReceiver(port, + kncloudevents.WithTLSConfig(tlsConfig), + ) + + go func() { + err := receiver.StartListen(ctx, nethttp.HandlerFunc(func(writer nethttp.ResponseWriter, request *nethttp.Request) { + if request.TLS == nil { + // It's not on TLS, fail request + writer.WriteHeader(nethttp.StatusInternalServerError) + return + } + writer.WriteHeader(nethttp.StatusOK) + })) + if err != nil { + panic(err) + } + }() + + return string(ca) +} + +func loadCerts() (ca, key, crt []byte) { + /* + Provisioned using: + openssl req -x509 -nodes -new -sha256 -days 1024 -newkey rsa:2048 -keyout RootCA.key -out RootCA.pem -subj "/C=US/CN=Knative-Example-Root-CA" + openssl x509 -outform pem -in RootCA.pem -out RootCA.crt + openssl req -new -nodes -newkey rsa:2048 -keyout localhost.key -out localhost.csr -subj "/C=US/ST=YourState/L=YourCity/O=Example-Certificates/CN=localhost.local" + openssl x509 -req -sha256 -days 1024 -in localhost.csr -CA RootCA.pem -CAkey RootCA.key -CAcreateserial -extfile domains.ext -out localhost.crt + + Copy: + - RootCA.crt for ca + - localhost.key for key + - localhost.crt for crt + + domains.ext file: + authorityKeyIdentifier=keyid,issuer + basicConstraints=CA:FALSE + keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment + subjectAltName = @alt_names + [alt_names] + DNS.1 = localhost + */ + return []byte(` +-----BEGIN CERTIFICATE----- +MIIDPzCCAiegAwIBAgIUYuysnNGPwBjbiDRc+/9s9Jl3N8YwDQYJKoZIhvcNAQEL +BQAwLzELMAkGA1UEBhMCVVMxIDAeBgNVBAMMF0tuYXRpdmUtRXhhbXBsZS1Sb290 +LUNBMB4XDTIzMDQwNTEzMTQxMloXDTI2MDEyMzEzMTQxMlowLzELMAkGA1UEBhMC +VVMxIDAeBgNVBAMMF0tuYXRpdmUtRXhhbXBsZS1Sb290LUNBMIIBIjANBgkqhkiG +9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyEwyvWKc/SJzblAc/pNIE7UJHIpbEUDtwOom +YvytwcMhI73zlSVhAcOagwnn3AvBg3McGPyLGghr9EuXBE1Vx584Pw1cmKOwbyiC +SQtaRwbztzM555T4Rtrk4tdKm+WHD/HiYAB/s+OnPJ6F6yBedT6nW08HlTP5lJX1 +U21+OAiOSU4zx+YYlkRbHq8aYggB1YM+hdRSStl9Mc/nw6TWlVsd2LjppXgoxSKl +YTB4ZwnaKmrIRa9hFf1DVY/nTlmUP2iGr9131CLs3/5QyoFRWI6ayfnRSkmVwKLS +8AW/b4jh+qJVIaeLCw5QF4RuqsE5VaUj6wlEqWM4eI+5Uaj+5QIDAQABo1MwUTAd +BgNVHQ4EFgQUklwJ+26zi+P3w3TNBBq62yMS7zYwHwYDVR0jBBgwFoAUklwJ+26z +i+P3w3TNBBq62yMS7zYwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOC +AQEAWwlatEXUTiB4O3M/fLSZ4JlAA1bq2U+dafiUiq5Ym0F1/UGu7YD74LGm4n03 +X9QU4jVwAkxL8pFV68NEBFJXOwFRyVQ1THAfhzij5teMAd4aqaffEPF0YfE8+rdg +MSQx9n/OOeeyqWlaAqI3D9SEoSFPk5Xbfdzu6zGggizJwIYus77LOYxS7hvGxCci +dTnEHvGoP14/13F/2vZLSaH9qrAv3cTenVYRN1QSSVI0V2XAhz+HAOjO2muaaYEG +2eKiYvHvG0p5aCRIZYi4z3q6QAr9z+nyRyO1Tw/CnbCOeULQoOZWLy8xE9zBOE1t +JQArXobwA4IZrx13xxsMafyt0A== +-----END CERTIFICATE----- +`), []byte(` +-----BEGIN PRIVATE KEY----- +MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQC5teo+En6U5nhq +n7ScuanqswUmPlgs9j/8l21Rhb4T+ezlYKGQGhbJyFFMuiCE1Rjn8bpCwi7Nnv12 +Y2nzFhEv2Jx0yL3Tqx0Q593myqKDq7326EtbO7wmDT0XD03twH5i9XZ0L0ihPWn1 +mjUyWxhnHhoFpXrsnQECJorZY6aTrFbGVYelIaj5AriwiqyL0fET8pueI2GwLjgW +HFSHX8XsGAlcLUhkQG0Z+VO9usy4M1Wpt+cL6cnTiQ+sRmZ6uvaj8fKOT1Slk/oU +eAi4WqFkChGzGzLik0QrhKGTdw3uUvI1F2sdQj0GYzXaWqRz+tP9qnXdzk1GrszK +KSlmWBTLAgMBAAECggEAM5nqhmjZJ0KKvwW1R78HCaHIkoHMOmIKEYN56qcA08gk +HPAmtEWrg1HX1Tv6gS49B2XRXW9bVeMRhm3FKLg++k5z2rdUl5X6M5JZxCEV2wRD +enG9TpJgiyouiVPFUYSlGZYe3dWtlq/b21SH54AMXcqtbFg4ubo+Z3ySJCleRbWp +iUHoTXB9oy31HMca8LaGkcsk4JlSGpThK2mF6zGI5Lz1sjYfBwrBinHImrMuH821 +1JbXLjoAAoHcM/DykQe2vXe0gKJKzbScC3KAI5TimvCuwGdtfAsPWkrQoUQB8Q2N +M7DTAHqbbWGdxVWzntVb5ilFDRg3n3sVAp2AxALvcQKBgQDvlvVEyQqNDfJ8ONDS +zBoK0RN3xu8+gNYguiXMy2oJWAGmomdNLn4UWqrIRmQhBFLob+dms0V/zKn6f1y9 +DZhAt86yPdi0/xXSWEw3UeAcVQcJbbUFc9GjFXjWB1nMyZgk5laBy8+ht5Vly/hC +q/oTLLM7yXVlyBZRru7VU5GDBQKBgQDGbjlarvr6PUgSQS1UJGkny815NKjsVMgG +tdA3iT7dwINxcTUbqwkp6qTKjXgeH2VsJzhWqagcE9W88uumpKIqZGKnd/vgEKss +UTVpVxzVTivcl0y70lZKC/pgycO+ML4WiI+ASPZG9W9wFNvGdWlHnPZIMFVZ/SNB +smcSAa9hjwKBgByR7Md6DccKPbswbz5j1ksp6V9kGo1igaY/bFiCfS+GDhRX02ex +vpkgwrLFKhWB1X0gMwDdKdF2j2Juo5lrsJcvE/fPRjM3I9wEaXpDSi02unMWYPq4 +d+wxmEo1cDDqbTkhOnmZ2zWWlbsg2obgyR5WOz1K5bPwazDsYlCP+Y8dAoGAE2rD +yADpZEVM4SRpmBs8Av3pbFvfz8h4DlgKOPUAJtjow9gNF1kEO4rPd1aik2gFF7E6 +zRgq8BxsxOGMd7ESgU1zbenKxuE6rsp/jIBOvPy6RAq2IobxlKtZY9E6i0jfwPq5 ++BarqsPnlLMl0mS42Z4dZ3D7WSPxKEOZ3GQ30jcCgYBT2wsPDdGrRsBVw3C6MAhu +D8cdmsXujQXkDr51Yop7BSnFPz+Q7zWOlzwqcXEw0h/9TwWa0C5LAuKdYUkQAa26 +0dsxUFD/WmdxtLqPbIwq4YkUgmYSTK0LgzDtnhYU3RwK7HZ+TMUCOxe5CrjOzlvp +uEg9BsZ6abqL1PQfsez+fw== +-----END PRIVATE KEY----- +`), []byte(` +-----BEGIN CERTIFICATE----- +MIIDmjCCAoKgAwIBAgIUYzA4bTMXevuk3pl2Mn8hpCYL2C0wDQYJKoZIhvcNAQEL +BQAwLzELMAkGA1UEBhMCVVMxIDAeBgNVBAMMF0tuYXRpdmUtRXhhbXBsZS1Sb290 +LUNBMB4XDTIzMDQwNTEzMTUyNFoXDTI2MDEyMzEzMTUyNFowbTELMAkGA1UEBhMC +VVMxEjAQBgNVBAgMCVlvdXJTdGF0ZTERMA8GA1UEBwwIWW91ckNpdHkxHTAbBgNV +BAoMFEV4YW1wbGUtQ2VydGlmaWNhdGVzMRgwFgYDVQQDDA9sb2NhbGhvc3QubG9j +YWwwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC5teo+En6U5nhqn7Sc +uanqswUmPlgs9j/8l21Rhb4T+ezlYKGQGhbJyFFMuiCE1Rjn8bpCwi7Nnv12Y2nz +FhEv2Jx0yL3Tqx0Q593myqKDq7326EtbO7wmDT0XD03twH5i9XZ0L0ihPWn1mjUy +WxhnHhoFpXrsnQECJorZY6aTrFbGVYelIaj5AriwiqyL0fET8pueI2GwLjgWHFSH +X8XsGAlcLUhkQG0Z+VO9usy4M1Wpt+cL6cnTiQ+sRmZ6uvaj8fKOT1Slk/oUeAi4 +WqFkChGzGzLik0QrhKGTdw3uUvI1F2sdQj0GYzXaWqRz+tP9qnXdzk1GrszKKSlm +WBTLAgMBAAGjcDBuMB8GA1UdIwQYMBaAFJJcCftus4vj98N0zQQautsjEu82MAkG +A1UdEwQCMAAwCwYDVR0PBAQDAgTwMBQGA1UdEQQNMAuCCWxvY2FsaG9zdDAdBgNV +HQ4EFgQUnu/3vqA3VEzm128x/hLyZzR9JlgwDQYJKoZIhvcNAQELBQADggEBAFc+ +1cKt/CNjHXUsirgEhry2Mm96R6Yxuq//mP2+SEjdab+FaXPZkjHx118u3PPX5uTh +gTT7rMfka6J5xzzQNqJbRMgNpdEFH1bbc11aYuhi0khOAe0cpQDtktyuDJQMMv3/ +3wu6rLr6fmENo0gdcyUY9EiYrglWGtdXhlo4ySRY8UZkUScG2upvyOhHTxVCAjhP +efbMkNjmDuZOMK+wqanqr5YV6zMPzkQK7DspfRgasMAQmugQu7r2MZpXg8Ilhro1 +s/wImGnMVk5RzpBVrq2VB9SkX/ThTVYEC/Sd9BQM364MCR+TA1l8/ptaLFLuwyw8 +O2dgzikq8iSy1BlRsVw= +-----END CERTIFICATE----- +`) +} diff --git a/pkg/kncloudevents/message_receiver.go b/pkg/kncloudevents/message_receiver.go index 9632d27bcb7..8916e4f2c0c 100644 --- a/pkg/kncloudevents/message_receiver.go +++ b/pkg/kncloudevents/message_receiver.go @@ -18,6 +18,7 @@ package kncloudevents import ( "context" + "crypto/tls" "fmt" "net" "net/http" @@ -78,6 +79,17 @@ func WithDrainQuietPeriod(duration time.Duration) HTTPMessageReceiverOption { } } +// WithTLSConfig configures the TLS config for the receiver. +func WithTLSConfig(cfg *tls.Config) HTTPMessageReceiverOption { + return func(h *HTTPMessageReceiver) { + if h.server == nil { + h.server = newServer() + } + + h.server.TLSConfig = cfg + } +} + // WithWriteTimeout sets the HTTP server's WriteTimeout. It covers the time between end of reading // Request Header to end of writing response. func WithWriteTimeout(duration time.Duration) HTTPMessageReceiverOption { @@ -123,7 +135,11 @@ func (recv *HTTPMessageReceiver) StartListen(ctx context.Context, handler http.H errChan := make(chan error, 1) go func() { close(recv.Ready) - errChan <- recv.server.Serve(recv.listener) + if recv.server.TLSConfig == nil { + errChan <- recv.server.Serve(recv.listener) + } else { + errChan <- recv.server.ServeTLS(recv.listener, "", "") + } }() // wait for the server to return or ctx.Done(). diff --git a/vendor/knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret/fake/fake.go b/vendor/knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret/fake/fake.go new file mode 100644 index 00000000000..89384a18087 --- /dev/null +++ b/vendor/knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret/fake/fake.go @@ -0,0 +1,38 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + context "context" + + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + secret "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret" + fake "knative.dev/pkg/injection/clients/namespacedkube/informers/factory/fake" +) + +var Get = secret.Get + +func init() { + injection.Fake.RegisterInformer(withInformer) +} + +func withInformer(ctx context.Context) (context.Context, controller.Informer) { + f := fake.Get(ctx) + inf := f.Core().V1().Secrets() + return context.WithValue(ctx, secret.Key{}, inf), inf.Informer() +} diff --git a/vendor/knative.dev/pkg/injection/clients/namespacedkube/informers/factory/fake/fake.go b/vendor/knative.dev/pkg/injection/clients/namespacedkube/informers/factory/fake/fake.go new file mode 100644 index 00000000000..d92469c85ed --- /dev/null +++ b/vendor/knative.dev/pkg/injection/clients/namespacedkube/informers/factory/fake/fake.go @@ -0,0 +1,42 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + context "context" + + informers "k8s.io/client-go/informers" + fake "knative.dev/pkg/client/injection/kube/client/fake" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + factory "knative.dev/pkg/injection/clients/namespacedkube/informers/factory" + "knative.dev/pkg/system" +) + +var Get = factory.Get + +func init() { + injection.Fake.RegisterInformerFactory(withInformerFactory) +} + +func withInformerFactory(ctx context.Context) context.Context { + c := fake.Get(ctx) + return context.WithValue(ctx, factory.Key{}, + informers.NewSharedInformerFactoryWithOptions(c, controller.GetResyncPeriod(ctx), + // This factory scopes things to the system namespace. + informers.WithNamespace(system.Namespace()))) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 2d18ef4a712..8419bed65d3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1311,7 +1311,9 @@ knative.dev/pkg/injection knative.dev/pkg/injection/clients/dynamicclient knative.dev/pkg/injection/clients/dynamicclient/fake knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret +knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret/fake knative.dev/pkg/injection/clients/namespacedkube/informers/factory +knative.dev/pkg/injection/clients/namespacedkube/informers/factory/fake knative.dev/pkg/injection/sharedmain knative.dev/pkg/kflag knative.dev/pkg/kmap