-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathretryablemessageprocessor.go
More file actions
135 lines (115 loc) · 4.55 KB
/
retryablemessageprocessor.go
File metadata and controls
135 lines (115 loc) · 4.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package messageprocessor
import (
"context"
"math"
"time"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/pkg/errors"
"github.com/rs/xstats"
)
// MaxAttempts is the maximum number of time's we'd like to retry processing a record before quitting
// By default, the maximum number is set to 1, which means application needs to set a property
// <APPNAME>_KINESIS-RETRY-CONFIG_MAXATTEMPTS to configure a higher value
const maxAttempts = 1
// Stat emitted by lib if max attempts are exceeded
const consumerRetriesExceeded = "kinesis.consumer_error.retries_exceeded"
// MaxRetriesExceededError implements MessageError and is used to indicate to upstream application/
// decorators that retries have been attempted and exhausted
type MaxRetriesExceededError struct {
Retryable bool
OrigErr error
Wait int
}
func (t MaxRetriesExceededError) Error() string {
return t.OrigErr.Error()
}
// IsRetryable indicates whether or not the JiraClient MessageError that was returned should be retried
func (t MaxRetriesExceededError) IsRetryable() bool {
return t.Retryable
}
// RetryAfter is not relevant in context of MaxRetriesExceededError as IsRetryable is set to false
func (t MaxRetriesExceededError) RetryAfter() int {
return t.Wait
}
// RetryableMessageProcessorConfig is the config for creating a RetryableMessageProcessor
type RetryableMessageProcessorConfig struct {
MaxAttempts int `description:"Maximum number of attempts to process kinesis message"`
}
// Name of the config root.
func (*RetryableMessageProcessorConfig) Name() string {
return "kinesisretry"
}
// RetryableMessageProcessorComponent implements the settings.Component interface.
type RetryableMessageProcessorComponent struct{}
// NewComponent populates default values.
func NewComponent() *RetryableMessageProcessorComponent {
return &RetryableMessageProcessorComponent{}
}
// Settings generates a config populated with defaults.
func (*RetryableMessageProcessorComponent) Settings() *RetryableMessageProcessorConfig {
return &RetryableMessageProcessorConfig{
MaxAttempts: maxAttempts,
}
}
func (c *RetryableMessageProcessorComponent) New(_ context.Context, conf *RetryableMessageProcessorConfig) (func(MessageProcessor) MessageProcessor, error) { // nolint
return func(processor MessageProcessor) MessageProcessor {
return &RetryableMessageProcessor{
maxAttempts: conf.MaxAttempts,
wrapped: processor,
}
}, nil
}
// RetryableMessageProcessor is a `MessageProcessor` decorator that re-attempts
// processing of messages 'maxAttempts' number of times in case of failures
// 'maxAttempts' is a configurable parameter which can be set by consumer of this lib
// Exponential backoff has been implemented as a retry mechanism
type RetryableMessageProcessor struct {
maxAttempts int
wrapped MessageProcessor
}
// ProcessMessage invokes the wrapped `MessageProcessor`. Attempts retries using exponential backoff
// if underlying 'MessageProcessor' returns an error.
// If 'maxAttempts' are exceeded without successful processing, it emits a stat indicating the same
func (t *RetryableMessageProcessor) ProcessMessage(ctx context.Context, record *kinesis.Record) MessageError {
stat := xstats.FromContext(ctx)
var messageProcErr MessageError
var attemptNum int
for attemptNum < t.maxAttempts {
messageProcErr = t.wrapped.ProcessMessage(ctx, record)
if messageProcErr != nil && messageProcErr.IsRetryable() {
if messageProcErr.RetryAfter() > 0 {
// Wait for duration specified in 'Retry-After'
waitRetryAfter(messageProcErr.RetryAfter())
} else {
// Or perform exponential backoff
waitToRetry(attemptNum)
}
} else {
break
}
attemptNum++
}
if attemptNum >= t.maxAttempts {
stat.Count(consumerRetriesExceeded, 1)
maxRetriesExceededErr := MaxRetriesExceededError{
Retryable: false,
OrigErr: errors.Wrap(messageProcErr, "Max retries exceeded"),
Wait: 0,
}
return maxRetriesExceededErr
}
return messageProcErr
}
// waitToRetry is used to perform an exponential backoff for http calls
// that need to be retried. It waits for 2^attemptNum seconds
func waitToRetry(attemptNum int) {
timeToWait := math.Pow(2, float64(attemptNum))
time.Sleep(time.Duration(timeToWait) * time.Second)
}
// waitRetryAfter is used to support a wait for exact duration as specified in 'Retry-After' header.
// The consumer of this library is responsible to obtain/compute duration of wait time
// by parsing underlying HTTP response and storing that info in
// MessageError.RetryAfter field
func waitRetryAfter(retryAfter int) {
time.Sleep(time.Duration(retryAfter) * time.Second)
}