Skip to content

feat(aws/kinesis): Add global pending pool with bounded memory and more robust EFO retry handling#1

Merged
Matus Tomlein (matus-tomlein) merged 9 commits intoefo-supportfrom
efo-support-suggestions
Feb 12, 2026
Merged

feat(aws/kinesis): Add global pending pool with bounded memory and more robust EFO retry handling#1
Matus Tomlein (matus-tomlein) merged 9 commits intoefo-supportfrom
efo-support-suggestions

Conversation

@matus-tomlein
Copy link
Copy Markdown
Collaborator

@matus-tomlein Matus Tomlein (matus-tomlein) commented Feb 6, 2026

Summary

This PR improves the Enhanced Fan-Out (EFO) implementation for the aws_kinesis input with two major changes:

  1. Global pending pool for bounded memory - Replaces per-shard backpressure with a global pool that limits total pending records across all shards, ensuring predictable memory usage regardless of shard count.

  2. Self-sufficient subscription retry handling - Redesigns the EFO subscription goroutine to handle retries autonomously with its own backoff, eliminating coordination issues that could cause shards to stop processing.

Problem

When consuming from Kinesis with many shards under heavy load, we observed:

  • OOM issues: Unbounded per-shard pending buffers could exhaust memory
  • Reduced throughput after errors: "Connection reset by peer" errors from AWS would sometimes cause shards to stop resubscribing, leading to sustained low throughput
  • Backpressure causing connection resets: When downstream processing was slow, AWS would terminate idle EFO connections after ~5 minutes

Solution

Global Pending Pool

  • New globalPendingPool limits total pending records across all shards (default: 50,000)
  • WaitForSpace() blocks before fetching from Kinesis, applying backpressure at the source
  • Acquire()/Release() track space as records flow through the system
  • 30-second timeout on WaitForSpace() allows clean connection close before AWS terminates it

Self-Sufficient Retry Handling

  • Subscription goroutine has its own exponential backoff (300ms → 5s)
  • Inner retry loop handles all errors autonomously
  • Main consumer loop only logs errors and handles fatal cases
  • Eliminates coordination deadlocks between goroutines

Configuration

New EFO config option:

input:
  aws_kinesis:
    enhanced_fan_out:
      enabled: true
      max_pending_records: 50000  # Global limit across all shards

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR enhances the AWS Kinesis input by introducing a global pending-records limiter for Enhanced Fan-Out (EFO) mode to improve backpressure and bound memory usage, and adds a configurable buffer for the message channel to improve throughput.

Changes:

  • Added a global pending pool (max_pending_records) to limit total buffered EFO records across shards.
  • Adjusted EFO consumer/subscription logic to acquire/release from the global pool and to keep reading records while flushing.
  • Added message_buffer_cap to allow buffering the internal message channel (applies to both polling and EFO modes).

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.

File Description
internal/impl/aws/input_kinesis_pending_pool.go Implements the new global pending pool used for cross-shard backpressure in EFO.
internal/impl/aws/input_kinesis_efo.go Integrates the global pool into EFO subscription/consumer loops and adjusts record receive/flush behavior.
internal/impl/aws/input_kinesis.go Adds config fields/validation and applies message_buffer_cap to the internal message channel; initializes the global pool when EFO is enabled.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@matus-tomlein Matus Tomlein (matus-tomlein) changed the title feat(aws_kinesis): improve EFO backpressure with global pending pool and configurable message buffering feat(aws/kinesis): Add global pending pool with bounded memory and more robust EFO retry handling Feb 10, 2026
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

…loops

When backpressure causes WaitForSpace to timeout, we were returning
nil error which triggered immediate resubscription. But since the
backpressure condition hasn't resolved, this creates a loop of:
Subscribe → Wait 30s → Timeout → Resubscribe immediately → Repeat

Now we return errBackpressureTimeout which is treated as a retryable
error, triggering exponential backoff before the next subscription
attempt. This gives the system time to drain pending records.

Also adds distinct debug logging for backpressure timeouts vs network
errors to aid in troubleshooting.
When multiple errors occur in rapid succession, the subscription
goroutine could block trying to send to errorsChan (buffer size 1).
This prevented it from receiving resubscription triggers, causing
a deadlock where:
1. Subscription goroutine blocks on errorsChan <- err
2. time.AfterFunc blocks on subscriptionTrigger <- sequence
3. Neither can proceed

Fix:
- Increased errorsChan buffer to 8
- Use non-blocking send to errorsChan
- If channel is full, handle retry locally with a small delay
  and re-queue on subscriptionTrigger

This ensures the subscription goroutine never blocks indefinitely
and can always accept resubscription triggers.
Previously, retry coordination was split between the subscription
goroutine and the main loop:
1. Subscription goroutine sent errors to errorsChan
2. Main loop received errors and scheduled resubscription via time.AfterFunc
3. time.AfterFunc sent to subscriptionTrigger
4. Subscription goroutine received from subscriptionTrigger

This created multiple potential deadlock/starvation scenarios:
- If errorsChan was full, subscription goroutine could block
- If subscriptionTrigger was full, time.AfterFunc could block
- Complex coordination made it easy for resubscription triggers to be lost

New design makes subscription goroutine fully autonomous:
- Has its own backoff (300ms initial, 5s max, never stops)
- Retries directly in an inner loop until success
- Sends to errorsChan non-blocking (for logging only)
- Main loop just logs errors, doesn't trigger resubscription

This eliminates all coordination issues since the subscription
goroutine never waits for the main loop to tell it to retry.
…redesign

Simplifications after making subscription goroutine self-sufficient:

1. Remove unused `boff` (backoff) from main consumer loop
   - Was previously used for scheduling resubscription via time.AfterFunc
   - No longer needed since subscription goroutine has its own backoff

2. Reduce errorsChan buffer from 8 to 1
   - Only used for logging/monitoring now, not control flow
   - Non-blocking sends handle overflow gracefully

3. Fix non-retryable error handling in subscription goroutine
   - Now checks for ResourceNotFoundException and InvalidArgumentException
   - Stops retrying and notifies main loop for proper shutdown
   - Previously would have kept retrying forever

4. Update comments to reflect new design
High priority fixes:

1. Replace busy-polling with proper cond.Wait() signaling
   - Acquire and WaitForSpace now use sync.Cond.Wait() instead of
     time.After(10ms) polling loop
   - Eliminates unnecessary CPU/GC overhead under sustained backpressure
   - Context cancellation and timeout handled via goroutine that
     broadcasts to wake up waiters

2. Handle count > max in Acquire
   - Return false immediately if requested count exceeds pool max
   - Prevents indefinite blocking when Kinesis returns a batch larger
     than max_pending_records

3. Use blocking send for non-retryable errors
   - Fatal errors (ResourceNotFoundException, InvalidArgumentException)
     now use blocking send to errorsChan with context guard
   - Ensures main loop always receives fatal errors for proper shutdown
   - Prevents shard from getting stuck without active subscription

Medium priority fixes:

4. Drain recordsChan on shutdown
   - Added drainRecordsChan() helper called after subscriptionWg.Wait()
   - Releases pool capacity for any records buffered in recordsChan
   - Prevents leaking pool capacity when consumer exits with buffered records

5. Add unit tests for globalPendingPool
   - Tests for Acquire/Release basic operations
   - Tests for Acquire blocking until space available
   - Tests for Acquire with count > max (immediate failure)
   - Tests for context cancellation
   - Tests for WaitForSpace with timeout
   - Tests for concurrent access
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants