feat(aws/kinesis): Add global pending pool with bounded memory and more robust EFO retry handling#1
Merged
Matus Tomlein (matus-tomlein) merged 9 commits intoefo-supportfrom Feb 12, 2026
Conversation
Copilot started reviewing on behalf of
Matus Tomlein (matus-tomlein)
February 6, 2026 14:15
View session
There was a problem hiding this comment.
Pull request overview
This PR enhances the AWS Kinesis input by introducing a global pending-records limiter for Enhanced Fan-Out (EFO) mode to improve backpressure and bound memory usage, and adds a configurable buffer for the message channel to improve throughput.
Changes:
- Added a global pending pool (
max_pending_records) to limit total buffered EFO records across shards. - Adjusted EFO consumer/subscription logic to acquire/release from the global pool and to keep reading records while flushing.
- Added
message_buffer_capto allow buffering the internal message channel (applies to both polling and EFO modes).
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
internal/impl/aws/input_kinesis_pending_pool.go |
Implements the new global pending pool used for cross-shard backpressure in EFO. |
internal/impl/aws/input_kinesis_efo.go |
Integrates the global pool into EFO subscription/consumer loops and adjusts record receive/flush behavior. |
internal/impl/aws/input_kinesis.go |
Adds config fields/validation and applies message_buffer_cap to the internal message channel; initializes the global pool when EFO is enabled. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
… we proactively close and resubscribe rather than waiting for AWS to reset the connection
de50036 to
3daa700
Compare
Copilot started reviewing on behalf of
Matus Tomlein (matus-tomlein)
February 10, 2026 12:09
View session
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…loops When backpressure causes WaitForSpace to timeout, we were returning nil error which triggered immediate resubscription. But since the backpressure condition hasn't resolved, this creates a loop of: Subscribe → Wait 30s → Timeout → Resubscribe immediately → Repeat Now we return errBackpressureTimeout which is treated as a retryable error, triggering exponential backoff before the next subscription attempt. This gives the system time to drain pending records. Also adds distinct debug logging for backpressure timeouts vs network errors to aid in troubleshooting.
When multiple errors occur in rapid succession, the subscription goroutine could block trying to send to errorsChan (buffer size 1). This prevented it from receiving resubscription triggers, causing a deadlock where: 1. Subscription goroutine blocks on errorsChan <- err 2. time.AfterFunc blocks on subscriptionTrigger <- sequence 3. Neither can proceed Fix: - Increased errorsChan buffer to 8 - Use non-blocking send to errorsChan - If channel is full, handle retry locally with a small delay and re-queue on subscriptionTrigger This ensures the subscription goroutine never blocks indefinitely and can always accept resubscription triggers.
Previously, retry coordination was split between the subscription goroutine and the main loop: 1. Subscription goroutine sent errors to errorsChan 2. Main loop received errors and scheduled resubscription via time.AfterFunc 3. time.AfterFunc sent to subscriptionTrigger 4. Subscription goroutine received from subscriptionTrigger This created multiple potential deadlock/starvation scenarios: - If errorsChan was full, subscription goroutine could block - If subscriptionTrigger was full, time.AfterFunc could block - Complex coordination made it easy for resubscription triggers to be lost New design makes subscription goroutine fully autonomous: - Has its own backoff (300ms initial, 5s max, never stops) - Retries directly in an inner loop until success - Sends to errorsChan non-blocking (for logging only) - Main loop just logs errors, doesn't trigger resubscription This eliminates all coordination issues since the subscription goroutine never waits for the main loop to tell it to retry.
…redesign Simplifications after making subscription goroutine self-sufficient: 1. Remove unused `boff` (backoff) from main consumer loop - Was previously used for scheduling resubscription via time.AfterFunc - No longer needed since subscription goroutine has its own backoff 2. Reduce errorsChan buffer from 8 to 1 - Only used for logging/monitoring now, not control flow - Non-blocking sends handle overflow gracefully 3. Fix non-retryable error handling in subscription goroutine - Now checks for ResourceNotFoundException and InvalidArgumentException - Stops retrying and notifies main loop for proper shutdown - Previously would have kept retrying forever 4. Update comments to reflect new design
High priority fixes:
1. Replace busy-polling with proper cond.Wait() signaling
- Acquire and WaitForSpace now use sync.Cond.Wait() instead of
time.After(10ms) polling loop
- Eliminates unnecessary CPU/GC overhead under sustained backpressure
- Context cancellation and timeout handled via goroutine that
broadcasts to wake up waiters
2. Handle count > max in Acquire
- Return false immediately if requested count exceeds pool max
- Prevents indefinite blocking when Kinesis returns a batch larger
than max_pending_records
3. Use blocking send for non-retryable errors
- Fatal errors (ResourceNotFoundException, InvalidArgumentException)
now use blocking send to errorsChan with context guard
- Ensures main loop always receives fatal errors for proper shutdown
- Prevents shard from getting stuck without active subscription
Medium priority fixes:
4. Drain recordsChan on shutdown
- Added drainRecordsChan() helper called after subscriptionWg.Wait()
- Releases pool capacity for any records buffered in recordsChan
- Prevents leaking pool capacity when consumer exits with buffered records
5. Add unit tests for globalPendingPool
- Tests for Acquire/Release basic operations
- Tests for Acquire blocking until space available
- Tests for Acquire with count > max (immediate failure)
- Tests for context cancellation
- Tests for WaitForSpace with timeout
- Tests for concurrent access
62bf5c0 to
3769ca6
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR improves the Enhanced Fan-Out (EFO) implementation for the
aws_kinesisinput with two major changes:Global pending pool for bounded memory - Replaces per-shard backpressure with a global pool that limits total pending records across all shards, ensuring predictable memory usage regardless of shard count.
Self-sufficient subscription retry handling - Redesigns the EFO subscription goroutine to handle retries autonomously with its own backoff, eliminating coordination issues that could cause shards to stop processing.
Problem
When consuming from Kinesis with many shards under heavy load, we observed:
Solution
Global Pending Pool
globalPendingPoollimits total pending records across all shards (default: 50,000)WaitForSpace()blocks before fetching from Kinesis, applying backpressure at the sourceAcquire()/Release()track space as records flow through the systemWaitForSpace()allows clean connection close before AWS terminates itSelf-Sufficient Retry Handling
Configuration
New EFO config option: