Skip to content
This repository was archived by the owner on Oct 2, 2025. It is now read-only.

make producer.Kinesis respect api limits#140

Closed
relud wants to merge 1 commit intotrivago:v0.4.x-devfrom
relud:kinesis_respect_hard_limits
Closed

make producer.Kinesis respect api limits#140
relud wants to merge 1 commit intotrivago:v0.4.x-devfrom
relud:kinesis_respect_hard_limits

Conversation

@relud
Copy link
Contributor

@relud relud commented Jun 9, 2017

The kinesis api has hard limits that require no more than 500 records or 5MB per PutRecords call, and no more than 1MB per record.

This change makes sure that gollum respects those three limits by making multiple PutRecords calls if needed, and making more records if needed.

The result is that BatchMaxMessages and RecordMaxMessages can safely be set to higher values that would have previously caused errors.

@arnecls
Copy link
Contributor

arnecls commented Jul 7, 2017

If I remember correctly the ratelimits were indirectly enforced by the batch size chosen.
Need to check that.

@relud
Copy link
Contributor Author

relud commented Jul 7, 2017

it's true that rate limits can be enforced using batch size, but if gollum message size is unpredictable, and RecordMaxMessages is high enough, then some kinesis records may unnecessarily exceed the 1MB record size limit

@arnecls
Copy link
Contributor

arnecls commented Jul 7, 2017

Ah, ok - that's correct.
The current batch implementation only looks at the count, not the size.

recordMaxMessages: recordMaxMessages,
recordMaxSize: 1<<20, // 1 MB per record is the api limit
requestMaxSize: 5<<20, // 5 MB per request is the api limit
requestMaxRecords: 500, // 500 records per request is the api limit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are those negotiable?
If not they should be constants.

content *kinesis.PutRecordsInput
original [][]*core.Message
content []*kinesis.PutRecordsInput
original [][][]*core.Message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is very confusing to have a 3D array here.
If I see this correctly you have a relationship between requests and originals.
So why don't you batch this together to get a clearer context?

  • StreamData may have n requests, limited by requestMaxRecords
  • each request may have n originals (and n records) limited by requestMaxSize
  • each message is limited by recordMaxSize

So it should be something like

type streamRequest struct {
    input *kinesis.PutRecordsInput
    original []core.Message
}

type streamData {
    // ...
    requests []streamRequest
}

This should also make the AddRecord function much clearer as the constraints now get a typed context.

@@ -115,9 +115,101 @@ const (
)

type streamData struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type and related function should be moved to a separate file.


func (sd *streamData) AddMessage(delimiter []byte, data []byte, streamID core.MessageStreamID, msg *core.Message) {
record := sd.GetRecord()
if len(record.Data) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A benefit from encapsulating the request in an extra type is that you can store the size for each request in there, too. This will make the check for "am I too large" a simple

func (sr streamRecord) OverLimt(data, delimiter []byte) {
    dataSize := len(data)
    if len(sr.records) > 0 {
        dataSize += len(delimiter)
    }
    return sr.currentSize + dataSize > sr.Limit
}

// ...
// func (sd *streamData) AddMessage ... {
    record := sd.GetCurrentRecord()
    if record.OverLimit(data,delimiter) {
        record = sd.AddNewRecord(/* ... */)
    }
    record.AddMessage(/* ... */)

This will make it a lot clearer to understand and a lot errorprone as you don't have to expose internal state.
E.g. it was very confusing for me why you check for record.Data twice though it wasn't directly visible where it is modified.

@relud
Copy link
Contributor Author

relud commented Nov 13, 2017

I'm no longer able to work on this

@relud relud closed this Nov 13, 2017
@andygrunwald
Copy link
Contributor

@relud Thanks for letting us know. Do you still think it would make sense to integrate into Gollum?
If yes, we might want to check it out in detail.

@andygrunwald andygrunwald reopened this Nov 16, 2017
@relud
Copy link
Contributor Author

relud commented Nov 16, 2017

yes, I definitely think it makes sense. these limits are hard limits, that aws doesn't allow limit increases for.

@andygrunwald
Copy link
Contributor

Okay, thank you. I reopened the PR. I can't promise anything in terms of timeline, but we will check it out if we can make it. Thank you for your effort.

@relud relud closed this Mar 10, 2020
@relud relud deleted the kinesis_respect_hard_limits branch March 10, 2020 20:22
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants