Skip to content

Commit 87feb4f

Browse files
committed
Update filter handler to use CloudEventsRequest
1 parent 5530c39 commit 87feb4f

1 file changed

Lines changed: 14 additions & 19 deletions

File tree

pkg/broker/filter/filter_handler.go

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"fmt"
2424
"io"
2525
"net/http"
26-
"net/url"
2726
"strings"
2827
"time"
2928

@@ -35,7 +34,7 @@ import (
3534
"go.opencensus.io/trace"
3635
"go.uber.org/zap"
3736
channelAttributes "knative.dev/eventing/pkg/channel/attributes"
38-
"knative.dev/pkg/apis"
37+
duckv1 "knative.dev/pkg/apis/duck/v1"
3938
"knative.dev/pkg/logging"
4039

4140
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
@@ -82,8 +81,6 @@ var HeaderProxyAllowList = map[string]struct{}{
8281

8382
// Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber.
8483
type Handler struct {
85-
// sender sends requests to downstream services
86-
sender *kncloudevents.HTTPMessageSender
8784
// reporter reports stats of status code and dispatch time
8885
reporter StatsReporter
8986

@@ -100,13 +97,7 @@ func NewHandler(logger *zap.Logger, triggerLister eventinglisters.TriggerLister,
10097
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
10198
})
10299

103-
sender, err := kncloudevents.NewHTTPMessageSenderWithTarget("")
104-
if err != nil {
105-
return nil, fmt.Errorf("failed to create message sender: %w", err)
106-
}
107-
108100
return &Handler{
109-
sender: sender,
110101
reporter: reporter,
111102
triggerLister: triggerLister,
112103
logger: logger,
@@ -211,10 +202,14 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
211202

212203
h.reportArrivalTime(event, reportArgs)
213204

214-
h.send(ctx, writer, request.Header, subscriberURI.URL(), reportArgs, event, ttl)
205+
target := duckv1.Addressable{
206+
URL: t.Status.SubscriberURI,
207+
CACerts: t.Status.SubscriberCACerts,
208+
}
209+
h.send(ctx, writer, request.Header, target, reportArgs, event, ttl)
215210
}
216211

217-
func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target *url.URL, reportArgs *ReportArgs, event *cloudevents.Event, ttl int32) {
212+
func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, ttl int32) {
218213
// send the event to trigger's subscriber
219214
response, responseErr := h.sendEvent(ctx, headers, target, event, reportArgs)
220215

@@ -235,7 +230,7 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers
235230

236231
// Read Response body to responseErr
237232
errExtensionInfo := broker.ErrExtensionInfo{
238-
ErrDestination: (*apis.URL)(target),
233+
ErrDestination: target.URL,
239234
ErrResponseBody: responseErr.ResponseBody,
240235
}
241236
errExtensionBytes, msErr := json.Marshal(errExtensionInfo)
@@ -249,23 +244,23 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers
249244
return
250245
}
251246

252-
h.logger.Debug("Successfully dispatched message", zap.Any("target", target.String()))
247+
h.logger.Debug("Successfully dispatched message", zap.Any("target", target))
253248

254249
// If there is an event in the response write it to the response
255-
statusCode, err := h.writeResponse(ctx, writer, response, ttl, target.String())
250+
statusCode, err := h.writeResponse(ctx, writer, response, ttl, target.URL.String())
256251
if err != nil {
257252
h.logger.Error("failed to write response", zap.Error(err))
258253
}
259254
_ = h.reporter.ReportEventCount(reportArgs, statusCode)
260255
}
261256

262-
func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target *url.URL, event *cloudevents.Event, reporterArgs *ReportArgs) (*http.Response, ErrHandler) {
257+
func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target duckv1.Addressable, event *cloudevents.Event, reporterArgs *ReportArgs) (*http.Response, ErrHandler) {
263258
responseErr := ErrHandler{
264259
ResponseCode: NoResponse,
265260
}
266261

267262
// Send the event to the subscriber
268-
req, err := h.sender.NewCloudEventRequestWithTarget(ctx, target.String())
263+
req, err := kncloudevents.NewCloudEventRequest(ctx, target)
269264
if err != nil {
270265
responseErr.err = fmt.Errorf("failed to create the request: %w", err)
271266
return nil, responseErr
@@ -279,14 +274,14 @@ func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target *ur
279274
// Following the spec https://github.com/knative/specs/blob/main/specs/eventing/data-plane.md#derived-reply-events
280275
additionalHeaders.Set("prefer", "reply")
281276

282-
err = kncloudevents.WriteHTTPRequestWithAdditionalHeaders(ctx, message, req, additionalHeaders)
277+
err = kncloudevents.WriteRequestWithAdditionalHeaders(ctx, message, req, additionalHeaders)
283278
if err != nil {
284279
responseErr.err = fmt.Errorf("failed to write request: %w", err)
285280
return nil, responseErr
286281
}
287282

288283
start := time.Now()
289-
resp, err := h.sender.Send(req)
284+
resp, err := req.Send()
290285
dispatchTime := time.Since(start)
291286
if err != nil {
292287
responseErr.ResponseCode = http.StatusInternalServerError

0 commit comments

Comments
 (0)