make producer.Kinesis respect api limits#140
make producer.Kinesis respect api limits#140relud wants to merge 1 commit intotrivago:v0.4.x-devfrom relud:kinesis_respect_hard_limits
Conversation
…sisPutRecords requests
|
If I remember correctly the ratelimits were indirectly enforced by the batch size chosen. |
|
it's true that rate limits can be enforced using batch size, but if gollum message size is unpredictable, and RecordMaxMessages is high enough, then some kinesis records may unnecessarily exceed the 1MB record size limit |
|
Ah, ok - that's correct. |
| recordMaxMessages: recordMaxMessages, | ||
| recordMaxSize: 1<<20, // 1 MB per record is the api limit | ||
| requestMaxSize: 5<<20, // 5 MB per request is the api limit | ||
| requestMaxRecords: 500, // 500 records per request is the api limit |
There was a problem hiding this comment.
Are those negotiable?
If not they should be constants.
| content *kinesis.PutRecordsInput | ||
| original [][]*core.Message | ||
| content []*kinesis.PutRecordsInput | ||
| original [][][]*core.Message |
There was a problem hiding this comment.
It is very confusing to have a 3D array here.
If I see this correctly you have a relationship between requests and originals.
So why don't you batch this together to get a clearer context?
- StreamData may have n requests, limited by requestMaxRecords
- each request may have n originals (and n records) limited by requestMaxSize
- each message is limited by recordMaxSize
So it should be something like
type streamRequest struct {
input *kinesis.PutRecordsInput
original []core.Message
}
type streamData {
// ...
requests []streamRequest
}This should also make the AddRecord function much clearer as the constraints now get a typed context.
| @@ -115,9 +115,101 @@ const ( | |||
| ) | |||
|
|
|||
| type streamData struct { | |||
There was a problem hiding this comment.
This type and related function should be moved to a separate file.
|
|
||
| func (sd *streamData) AddMessage(delimiter []byte, data []byte, streamID core.MessageStreamID, msg *core.Message) { | ||
| record := sd.GetRecord() | ||
| if len(record.Data) > 0 { |
There was a problem hiding this comment.
A benefit from encapsulating the request in an extra type is that you can store the size for each request in there, too. This will make the check for "am I too large" a simple
func (sr streamRecord) OverLimt(data, delimiter []byte) {
dataSize := len(data)
if len(sr.records) > 0 {
dataSize += len(delimiter)
}
return sr.currentSize + dataSize > sr.Limit
}
// ...
// func (sd *streamData) AddMessage ... {
record := sd.GetCurrentRecord()
if record.OverLimit(data,delimiter) {
record = sd.AddNewRecord(/* ... */)
}
record.AddMessage(/* ... */)This will make it a lot clearer to understand and a lot errorprone as you don't have to expose internal state.
E.g. it was very confusing for me why you check for record.Data twice though it wasn't directly visible where it is modified.
|
I'm no longer able to work on this |
|
@relud Thanks for letting us know. Do you still think it would make sense to integrate into Gollum? |
|
yes, I definitely think it makes sense. these limits are hard limits, that aws doesn't allow limit increases for. |
|
Okay, thank you. I reopened the PR. I can't promise anything in terms of timeline, but we will check it out if we can make it. Thank you for your effort. |
The kinesis api has hard limits that require no more than 500 records or 5MB per PutRecords call, and no more than 1MB per record.
This change makes sure that gollum respects those three limits by making multiple PutRecords calls if needed, and making more records if needed.
The result is that BatchMaxMessages and RecordMaxMessages can safely be set to higher values that would have previously caused errors.