-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathbatch_event_handler.go
More file actions
119 lines (104 loc) · 3.21 KB
/
batch_event_handler.go
File metadata and controls
119 lines (104 loc) · 3.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package kafka_wrapper
import (
"context"
"fmt"
"github.com/IBM/sarama"
"github.com/Trendyol/kafka-wrapper/execution_behaviour"
"github.com/Trendyol/kafka-wrapper/execution_behaviour/behavioral"
"github.com/Trendyol/kafka-wrapper/params"
)
const (
defaultBatchRoutineCount = 1
defaultBatchBufferSize = 100
)
// ContextEnricher is used for project-specific context enrichment (tracing, otel, etc.).
// Enrich reads headers from the message and writes them to context.
type ContextEnricher interface {
Enrich(ctx context.Context, message *sarama.ConsumerMessage) (context.Context)
}
// BatchEventHandlerConfig is the configuration for the batch event handler.
type BatchEventHandlerConfig struct {
// RoutineCount is the number of workers that process messages (defaults to 1 if 0).
RoutineCount int
// BufferSize is the channel buffer size (default is used if 0).
BufferSize int
// ContextEnricher is optional; if nil, context.Background() is used.
ContextEnricher ContextEnricher
// Logger is optional; used for error logging.
Logger params.Logger
}
type batchMessage struct {
session sarama.ConsumerGroupSession
message *sarama.ConsumerMessage
processor behavioral.BehaviourExecutor
}
type batchEventHandler struct {
cfg *BatchEventHandlerConfig
selector execution_behaviour.BehavioralSelector
msgCh chan *batchMessage
}
// NewBatchEventHandler returns the shared batch event handler with the given BehavioralSelector and config.
// Projects can use this handler by providing only a ContextEnricher (tracing/newrelic, etc.) and selector.
func NewBatchEventHandler(selector execution_behaviour.BehavioralSelector, cfg *BatchEventHandlerConfig) EventHandler {
if cfg == nil {
cfg = &BatchEventHandlerConfig{}
}
routineCount := cfg.RoutineCount
if routineCount == 0 {
routineCount = defaultBatchRoutineCount
}
bufferSize := cfg.BufferSize
if bufferSize == 0 {
bufferSize = defaultBatchBufferSize
}
msgCh := make(chan *batchMessage, bufferSize)
h := &batchEventHandler{
cfg: cfg,
selector: selector,
msgCh: msgCh,
}
for i := 0; i < routineCount; i++ {
go h.worker(msgCh)
}
return h
}
func (h *batchEventHandler) Setup(sarama.ConsumerGroupSession) error {
return nil
}
func (h *batchEventHandler) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func (h *batchEventHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
processor := h.selector.GetBehavioral(claim)
for {
select {
case message, ok := <-claim.Messages():
if !ok {
return nil
}
h.msgCh <- &batchMessage{
session: session,
message: message,
processor: processor,
}
case <-session.Context().Done():
return nil
}
}
}
func (h *batchEventHandler) worker(msgCh chan *batchMessage) {
for work := range msgCh {
ctx := context.Background()
if h.cfg.ContextEnricher != nil {
ctx = h.cfg.ContextEnricher.Enrich(ctx, work.message)
}
if err := work.processor.Process(ctx, work.message); err != nil {
if h.cfg.Logger != nil {
h.cfg.Logger.Error(ctx, "batch event handler process error: %v", err)
} else {
fmt.Printf("batch event handler process error: %v\n", err)
}
}
work.session.MarkMessage(work.message, "")
}
}