From bb290c6514803f6943fc2af0e7e4064dbc947803 Mon Sep 17 00:00:00 2001 From: jbeemster Date: Thu, 22 Jan 2026 13:04:13 +0100 Subject: [PATCH 01/10] Add Enhanced FanOut Support for aws_kinesis --- internal/impl/aws/input_kinesis.go | 112 ++++- internal/impl/aws/input_kinesis_efo.go | 460 ++++++++++++++++++ website/docs/components/inputs/aws_kinesis.md | 44 ++ 3 files changed, 613 insertions(+), 3 deletions(-) create mode 100644 internal/impl/aws/input_kinesis_efo.go diff --git a/internal/impl/aws/input_kinesis.go b/internal/impl/aws/input_kinesis.go index aaa21998de..0fc4d2a94a 100644 --- a/internal/impl/aws/input_kinesis.go +++ b/internal/impl/aws/input_kinesis.go @@ -37,8 +37,22 @@ const ( kiFieldRebalancePeriod = "rebalance_period" kiFieldStartFromOldest = "start_from_oldest" kiFieldBatching = "batching" + kiFieldEnhancedFanOut = "enhanced_fan_out" + + // Enhanced Fan Out Fields + kiEFOFieldEnabled = "enabled" + kiEFOFieldConsumerName = "consumer_name" + kiEFOFieldConsumerARN = "consumer_arn" + kiEFOFieldRecordBufferCap = "record_buffer_cap" ) +type kiEFOConfig struct { + Enabled bool + ConsumerName string + ConsumerARN string + RecordBufferCap int +} + type kiConfig struct { Streams []string DynamoDB kiddbConfig @@ -47,6 +61,7 @@ type kiConfig struct { LeasePeriod string RebalancePeriod string StartFromOldest bool + EnhancedFanOut *kiEFOConfig } func kinesisInputConfigFromParsed(pConf *service.ParsedConfig) (conf kiConfig, err error) { @@ -73,6 +88,27 @@ func kinesisInputConfigFromParsed(pConf *service.ParsedConfig) (conf kiConfig, e if conf.StartFromOldest, err = pConf.FieldBool(kiFieldStartFromOldest); err != nil { return } + if pConf.Contains(kiFieldEnhancedFanOut) { + efoConf := &kiEFOConfig{} + efoNs := pConf.Namespace(kiFieldEnhancedFanOut) + if efoConf.Enabled, err = efoNs.FieldBool(kiEFOFieldEnabled); err != nil { + return + } + if efoConf.ConsumerName, err = efoNs.FieldString(kiEFOFieldConsumerName); err != nil { + return + } + if efoConf.ConsumerARN, err = efoNs.FieldString(kiEFOFieldConsumerARN); err != nil { + return + } + if efoConf.RecordBufferCap, err = efoNs.FieldInt(kiEFOFieldRecordBufferCap); err != nil { + return + } + if efoConf.RecordBufferCap < 1 { + err = errors.New("enhanced_fan_out.record_buffer_cap must be at least 1") + return + } + conf.EnhancedFanOut = efoConf + } return } @@ -141,6 +177,27 @@ Use the `+"`batching`"+` fields to configure an optional [batching policy](/docs service.NewBoolField(kiFieldStartFromOldest). Description("Whether to consume from the oldest message when a sequence does not yet exist for the stream."). Default(true), + service.NewObjectField(kiFieldEnhancedFanOut, + service.NewBoolField(kiEFOFieldEnabled). + Description("Enable Enhanced Fan Out mode for push-based streaming with dedicated throughput."). + Default(false), + service.NewStringField(kiEFOFieldConsumerName). + Description("Consumer name for EFO registration. Auto-generated if empty: bento-{clientID}."). + Default(""). + Optional(), + service.NewStringField(kiEFOFieldConsumerARN). + Description("Existing consumer ARN to use. If provided, skips registration."). + Default(""). + Optional(). + Advanced(), + service.NewIntField(kiEFOFieldRecordBufferCap). + Description("Buffer capacity for the internal records channel per shard. Lower values reduce memory usage when processing many shards. Set to 1 for minimal memory footprint."). + Default(1). + Advanced(), + ). + Description("Enhanced Fan Out configuration for push-based streaming. Provides dedicated 2 MB/sec throughput per consumer per shard and lower latency (~70ms). Note: EFO incurs per shard-hour charges."). + Optional(). + Advanced(), ). Fields(config.SessionFields()...). Field(service.NewBatchPolicyField(kiFieldBatching)) @@ -174,6 +231,7 @@ type streamInfo struct { explicitShards []string id string // Either a name or arn, extracted from config and used for balancing shards arn string + efoManager *kinesisEFOManager // Enhanced Fan Out manager (if EFO is enabled) } type kinesisReader struct { @@ -189,6 +247,8 @@ type kinesisReader struct { svc *kinesis.Client checkpointer *awsKinesisCheckpointer + efoManager *kinesisEFOManager + efoEnabled bool streams []*streamInfo @@ -319,6 +379,18 @@ func newKinesisReaderFromConfig(conf kiConfig, batcher service.BatchPolicy, sess if k.rebalancePeriod, err = time.ParseDuration(k.conf.RebalancePeriod); err != nil { return nil, fmt.Errorf("failed to parse rebalance period string: %v", err) } + + // Check if Enhanced Fan Out is enabled + if k.conf.EnhancedFanOut != nil && k.conf.EnhancedFanOut.Enabled { + k.efoEnabled = true + k.log.Infof("Enhanced Fan Out enabled") + + // Validate EFO configuration + if k.conf.EnhancedFanOut.ConsumerName != "" && k.conf.EnhancedFanOut.ConsumerARN != "" { + return nil, errors.New("cannot specify both consumer_name and consumer_arn in enhanced_fan_out config") + } + } + return &k, nil } @@ -700,7 +772,12 @@ func (k *kinesisReader) runBalancedShards() { continue } wg.Add(1) - if err = k.runConsumer(&wg, *info, shardID, sequence); err != nil { + if k.efoEnabled { + err = k.runEFOConsumer(&wg, *info, shardID, sequence) + } else { + err = k.runConsumer(&wg, *info, shardID, sequence) + } + if err != nil { k.log.Errorf("Failed to start consumer: %v\n", err) } } @@ -749,7 +826,12 @@ func (k *kinesisReader) runBalancedShards() { info.id, randomShard, clientID, k.clientID, ) wg.Add(1) - if err = k.runConsumer(&wg, *info, randomShard, sequence); err != nil { + if k.efoEnabled { + err = k.runEFOConsumer(&wg, *info, randomShard, sequence) + } else { + err = k.runConsumer(&wg, *info, randomShard, sequence) + } + if err != nil { k.log.Errorf("Failed to start consumer: %v\n", err) } else { // If we successfully stole the shard then that's enough @@ -790,7 +872,11 @@ func (k *kinesisReader) runExplicitShards() { sequence, err := k.checkpointer.Claim(k.ctx, id, shardID, "") if err == nil { wg.Add(1) - err = k.runConsumer(&wg, info, shardID, sequence) + if k.efoEnabled { + err = k.runEFOConsumer(&wg, info, shardID, sequence) + } else { + err = k.runConsumer(&wg, info, shardID, sequence) + } } if err != nil { if k.ctx.Err() != nil { @@ -868,6 +954,26 @@ func (k *kinesisReader) Connect(ctx context.Context) error { return err } + // Initialize Enhanced Fan Out if enabled + if k.efoEnabled { + for _, stream := range k.streams { + // Create EFO manager for this stream + efoMgr, err := newKinesisEFOManager(k.conf.EnhancedFanOut, stream.arn, k.clientID, k.svc, k.log) + if err != nil { + return fmt.Errorf("failed to create EFO manager for stream %s: %w", stream.id, err) + } + + // Register consumer and wait for ACTIVE status + consumerARN, err := efoMgr.ensureConsumerRegistered(ctx) + if err != nil { + return fmt.Errorf("failed to register EFO consumer for stream %s: %w", stream.id, err) + } + + stream.efoManager = efoMgr + k.log.Infof("Enhanced Fan Out consumer registered for stream %s with ARN: %s", stream.id, consumerARN) + } + } + if len(k.streams[0].explicitShards) > 0 { go k.runExplicitShards() } else { diff --git a/internal/impl/aws/input_kinesis_efo.go b/internal/impl/aws/input_kinesis_efo.go new file mode 100644 index 0000000000..4bb46238b0 --- /dev/null +++ b/internal/impl/aws/input_kinesis_efo.go @@ -0,0 +1,460 @@ +package aws + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" + "github.com/cenkalti/backoff/v4" +) + +// kinesisEFOManager handles Enhanced Fan Out consumer registration and lifecycle +type kinesisEFOManager struct { + streamARN string + consumerName string + consumerARN string + svc *kinesis.Client + log awsLogger +} + +type awsLogger interface { + Debugf(format string, v ...any) + Infof(format string, v ...any) + Warnf(format string, v ...any) + Errorf(format string, v ...any) +} + +// newKinesisEFOManager creates a new EFO manager +func newKinesisEFOManager(conf *kiEFOConfig, streamARN, clientID string, svc *kinesis.Client, log awsLogger) (*kinesisEFOManager, error) { + if conf == nil { + return nil, errors.New("enhanced fan out config is nil") + } + + if conf.ConsumerName != "" && conf.ConsumerARN != "" { + return nil, errors.New("cannot specify both consumer_name and consumer_arn") + } + + consumerName := conf.ConsumerName + if consumerName == "" && conf.ConsumerARN == "" { + consumerName = fmt.Sprintf("bento-%s", clientID) + } + + return &kinesisEFOManager{ + streamARN: streamARN, + consumerName: consumerName, + consumerARN: conf.ConsumerARN, + svc: svc, + log: log, + }, nil +} + +// ensureConsumerRegistered registers the consumer if needed and returns the consumer ARN +func (m *kinesisEFOManager) ensureConsumerRegistered(ctx context.Context) (string, error) { + if m.consumerARN != "" { + m.log.Infof("Using provided consumer ARN: %s", m.consumerARN) + return m.consumerARN, nil + } + + m.log.Infof("Registering Enhanced Fan Out consumer: %s for stream: %s", m.consumerName, m.streamARN) + + registerInput := &kinesis.RegisterStreamConsumerInput{ + StreamARN: aws.String(m.streamARN), + ConsumerName: aws.String(m.consumerName), + } + + output, err := m.svc.RegisterStreamConsumer(ctx, registerInput) + if err != nil { + var resourceInUse *types.ResourceInUseException + if errors.As(err, &resourceInUse) { + m.log.Infof("Consumer %s already exists, describing to get ARN", m.consumerName) + return m.describeAndWaitForActive(ctx) + } + return "", fmt.Errorf("failed to register consumer: %w", err) + } + + if output.Consumer == nil || output.Consumer.ConsumerARN == nil { + return "", errors.New("RegisterStreamConsumer succeeded but returned no consumer ARN") + } + + m.consumerARN = *output.Consumer.ConsumerARN + m.log.Infof("Registered consumer with ARN: %s, waiting for ACTIVE status", m.consumerARN) + + if err := m.waitForActiveConsumer(ctx); err != nil { + return "", fmt.Errorf("failed waiting for consumer to become active: %w", err) + } + + return m.consumerARN, nil +} + +// describeAndWaitForActive describes an existing consumer and waits for it to be active +func (m *kinesisEFOManager) describeAndWaitForActive(ctx context.Context) (string, error) { + describeInput := &kinesis.DescribeStreamConsumerInput{ + StreamARN: aws.String(m.streamARN), + ConsumerName: aws.String(m.consumerName), + } + + output, err := m.svc.DescribeStreamConsumer(ctx, describeInput) + if err != nil { + return "", fmt.Errorf("failed to describe consumer: %w", err) + } + + if output.ConsumerDescription == nil || output.ConsumerDescription.ConsumerARN == nil { + return "", errors.New("consumer description missing ARN") + } + + m.consumerARN = *output.ConsumerDescription.ConsumerARN + m.log.Infof("Found existing consumer with ARN: %s", m.consumerARN) + + if err := m.waitForActiveConsumer(ctx); err != nil { + return "", fmt.Errorf("failed waiting for consumer to become active: %w", err) + } + + return m.consumerARN, nil +} + +// waitForActiveConsumer waits for the consumer to reach ACTIVE status +func (m *kinesisEFOManager) waitForActiveConsumer(ctx context.Context) error { + waiterCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + select { + case <-waiterCtx.Done(): + return fmt.Errorf("timeout waiting for consumer to become ACTIVE: %w", waiterCtx.Err()) + case <-ticker.C: + describeInput := &kinesis.DescribeStreamConsumerInput{ + ConsumerARN: aws.String(m.consumerARN), + } + + output, err := m.svc.DescribeStreamConsumer(waiterCtx, describeInput) + if err != nil { + return fmt.Errorf("failed to describe consumer: %w", err) + } + + if output.ConsumerDescription != nil { + status := output.ConsumerDescription.ConsumerStatus + m.log.Debugf("Consumer status: %s", status) + + if status == types.ConsumerStatusActive { + m.log.Infof("Consumer is now ACTIVE") + return nil + } + + if status == types.ConsumerStatusDeleting { + return errors.New("consumer is being deleted") + } + } + } + } +} + +// runEFOConsumer consumes from a shard using Enhanced Fan Out +func (k *kinesisReader) runEFOConsumer(wg *sync.WaitGroup, info streamInfo, shardID, startingSequence string) error { + // Create record batcher (same as polling mode) + var recordBatcher *awsKinesisRecordBatcher + var err error + if recordBatcher, err = k.newAWSKinesisRecordBatcher(info, shardID, startingSequence); err != nil { + wg.Done() + if _, checkErr := k.checkpointer.Checkpoint(context.Background(), info.id, shardID, startingSequence, true); checkErr != nil { + k.log.Errorf("Failed to gracefully yield checkpoint: %v\n", checkErr) + } + return err + } + + // Backoff for error handling + boff := k.boffPool.Get().(backoff.BackOff) + + // Track consumer state + state := awsKinesisConsumerConsuming + var pendingMsg asyncMessage + + // Buffer for pending records from the subscription + var pending []types.Record + + // Channels for subscription control + subscriptionTrigger := make(chan string, 1) // Trigger for initial subscription or resubscription + subscriptionTrigger <- startingSequence // Start with initial sequence + + // Channels for timed batches and message flush + var nextTimedBatchChan <-chan time.Time + var nextFlushChan chan<- asyncMessage + commitCtx, commitCtxClose := context.WithTimeout(k.ctx, k.commitPeriod) + + go func() { + defer func() { + commitCtxClose() + recordBatcher.Close(context.Background(), state == awsKinesisConsumerFinished) + boff.Reset() + k.boffPool.Put(boff) + + reason := "" + switch state { + case awsKinesisConsumerFinished: + reason = " because the shard is closed" + if err := k.checkpointer.Delete(k.ctx, info.id, shardID); err != nil { + k.log.Errorf("Failed to remove checkpoint for finished stream '%v' shard '%v': %v", info.id, shardID, err) + } + case awsKinesisConsumerYielding: + reason = " because the shard has been claimed by another client" + if err := k.checkpointer.Yield(k.ctx, info.id, shardID, recordBatcher.GetSequence()); err != nil { + k.log.Errorf("Failed to yield checkpoint for stolen stream '%v' shard '%v': %v", info.id, shardID, err) + } + case awsKinesisConsumerClosing: + reason = " because the pipeline is shutting down" + if _, err := k.checkpointer.Checkpoint(context.Background(), info.id, shardID, recordBatcher.GetSequence(), true); err != nil { + k.log.Errorf("Failed to store final checkpoint for stream '%v' shard '%v': %v", info.id, shardID, err) + } + } + + wg.Done() + k.log.Debugf("Closing stream '%v' shard '%v' as client '%v'%v", info.id, shardID, k.checkpointer.clientID, reason) + }() + + k.log.Debugf("Consuming stream '%v' shard '%v' with Enhanced Fan Out as client '%v'", info.id, shardID, k.checkpointer.clientID) + + // Start subscription in a separate goroutine + bufferCap := 1 + if k.conf.EnhancedFanOut != nil && k.conf.EnhancedFanOut.RecordBufferCap > 0 { + bufferCap = k.conf.EnhancedFanOut.RecordBufferCap + } + recordsChan := make(chan []types.Record, bufferCap) + errorsChan := make(chan error, 1) + resubscribeChan := make(chan string, 1) + + var subscriptionWg sync.WaitGroup + subscriptionWg.Add(1) + go func() { + defer subscriptionWg.Done() + for sequence := range subscriptionTrigger { + continuationSeq, err := k.efoSubscribeAndStream(k.ctx, info, shardID, sequence, recordsChan) + if err != nil { + errorsChan <- err + } else { + // Schedule resubscription with continuation sequence + if continuationSeq != "" { + resubscribeChan <- continuationSeq + } else { + // Use latest checkpointed sequence + resubscribeChan <- recordBatcher.GetSequence() + } + } + } + }() + + // Main consumer loop (similar to polling consumer) + for { + if pendingMsg.msg == nil { + // If our consumer is finished and we've run out of pending + // records then we're done. + if len(pending) == 0 && state == awsKinesisConsumerFinished { + if pendingMsg, _ = recordBatcher.FlushMessage(k.ctx); pendingMsg.msg == nil { + close(subscriptionTrigger) + subscriptionWg.Wait() + return + } + } else if recordBatcher.HasPendingMessage() { + var err error + if pendingMsg, err = recordBatcher.FlushMessage(commitCtx); err != nil { + k.log.Errorf("Failed to dispatch message due to checkpoint error: %v\n", err) + } + } else if len(pending) > 0 { + var i int + var r types.Record + for i, r = range pending { + if recordBatcher.AddRecord(r) { + var err error + if pendingMsg, err = recordBatcher.FlushMessage(commitCtx); err != nil { + k.log.Errorf("Failed to dispatch message due to checkpoint error: %v\n", err) + } + break + } + } + pending = pending[i+1:] + } + } + + if pendingMsg.msg != nil { + nextFlushChan = k.msgChan + } else { + nextFlushChan = nil + } + + if nextTimedBatchChan == nil { + if tNext, exists := recordBatcher.UntilNext(); exists { + nextTimedBatchChan = time.After(tNext) + } + } + + select { + case <-commitCtx.Done(): + if k.ctx.Err() != nil { + state = awsKinesisConsumerClosing + close(subscriptionTrigger) + subscriptionWg.Wait() + return + } + + commitCtxClose() + commitCtx, commitCtxClose = context.WithTimeout(k.ctx, k.commitPeriod) + + if state == awsKinesisConsumerConsuming { + stillOwned, err := k.checkpointer.Checkpoint(k.ctx, info.id, shardID, recordBatcher.GetSequence(), false) + if err != nil { + k.log.Errorf("Failed to store checkpoint for Kinesis stream '%v' shard '%v': %v", info.id, shardID, err) + } else if !stillOwned { + state = awsKinesisConsumerYielding + close(subscriptionTrigger) + subscriptionWg.Wait() + return + } + } + + case <-nextTimedBatchChan: + nextTimedBatchChan = nil + + case nextFlushChan <- pendingMsg: + pendingMsg = asyncMessage{} + + case records := <-recordsChan: + // Received records from subscription + pending = append(pending, records...) + boff.Reset() + + case err := <-errorsChan: + // Subscription error occurred + var resourceNotFound *types.ResourceNotFoundException + var invalidArg *types.InvalidArgumentException + + if errors.As(err, &resourceNotFound) || errors.As(err, &invalidArg) { + k.log.Errorf("Non-retryable EFO error for shard %v: %v", shardID, err) + state = awsKinesisConsumerClosing + close(subscriptionTrigger) + subscriptionWg.Wait() + return + } + + // Retryable error - backoff and retry + k.log.Warnf("EFO subscription error for shard %v, will retry: %v", shardID, err) + backoffDuration := boff.NextBackOff() + time.AfterFunc(backoffDuration, func() { + // Trigger resubscription after backoff + select { + case subscriptionTrigger <- recordBatcher.GetSequence(): + default: + } + }) + + case sequence := <-resubscribeChan: + // Subscription completed successfully, schedule resubscription + time.AfterFunc(4*time.Minute+30*time.Second, func() { + select { + case subscriptionTrigger <- sequence: + case <-k.ctx.Done(): + } + }) + + case <-k.ctx.Done(): + state = awsKinesisConsumerClosing + close(subscriptionTrigger) + subscriptionWg.Wait() + return + } + } + }() + + return nil +} + +// efoSubscribeAndStream subscribes to a shard and streams records to a channel +func (k *kinesisReader) efoSubscribeAndStream(ctx context.Context, info streamInfo, shardID, startingSequence string, recordsChan chan<- []types.Record) (string, error) { + if info.efoManager == nil || info.efoManager.consumerARN == "" { + return "", errors.New("EFO manager or consumer ARN not initialized") + } + + // Build starting position + var startingPosition *types.StartingPosition + if startingSequence == "" { + // No sequence yet, use TRIM_HORIZON or LATEST based on config + if k.conf.StartFromOldest { + startingPosition = &types.StartingPosition{ + Type: types.ShardIteratorTypeTrimHorizon, + } + } else { + startingPosition = &types.StartingPosition{ + Type: types.ShardIteratorTypeLatest, + } + } + } else { + // Continue from last sequence + startingPosition = &types.StartingPosition{ + Type: types.ShardIteratorTypeAfterSequenceNumber, + SequenceNumber: aws.String(startingSequence), + } + } + + k.log.Debugf("Subscribing to shard %v with sequence %v", shardID, startingSequence) + + input := &kinesis.SubscribeToShardInput{ + ConsumerARN: aws.String(info.efoManager.consumerARN), + ShardId: aws.String(shardID), + StartingPosition: startingPosition, + } + + output, err := k.svc.SubscribeToShard(ctx, input) + if err != nil { + return "", fmt.Errorf("failed to subscribe to shard: %w", err) + } + + // Process the event stream + eventStream := output.GetStream() + defer eventStream.Close() + + continuationSeq := "" + for event := range eventStream.Events() { + switch e := event.(type) { + case *types.SubscribeToShardEventStreamMemberSubscribeToShardEvent: + // Got records event + shardEvent := e.Value + + // Send records to channel + if len(shardEvent.Records) > 0 { + select { + case recordsChan <- shardEvent.Records: + case <-ctx.Done(): + return continuationSeq, ctx.Err() + } + } + + // Update continuation sequence for next subscription + if shardEvent.ContinuationSequenceNumber != nil { + continuationSeq = *shardEvent.ContinuationSequenceNumber + } + + if shardEvent.MillisBehindLatest != nil { + k.log.Debugf("Shard %v is %d milliseconds behind latest", shardID, *shardEvent.MillisBehindLatest) + } + + default: + k.log.Warnf("Unknown event type received: %T", event) + } + } + + // Check for stream errors + if err := eventStream.Err(); err != nil { + // Check if it's just end of stream + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return continuationSeq, nil + } + return continuationSeq, fmt.Errorf("error receiving event: %w", err) + } + + return continuationSeq, nil +} diff --git a/website/docs/components/inputs/aws_kinesis.md b/website/docs/components/inputs/aws_kinesis.md index 7ba76182d3..91f5b01d71 100644 --- a/website/docs/components/inputs/aws_kinesis.md +++ b/website/docs/components/inputs/aws_kinesis.md @@ -69,6 +69,11 @@ input: rebalance_period: 30s lease_period: 30s start_from_oldest: true + enhanced_fan_out: + enabled: false + consumer_name: "" + consumer_arn: "" + record_buffer_cap: 1 region: "" endpoint: "" credentials: @@ -222,6 +227,45 @@ Whether to consume from the oldest message when a sequence does not yet exist fo Type: `bool` Default: `true` +### `enhanced_fan_out` + +Enhanced Fan Out configuration for push-based streaming. Provides dedicated 2 MB/sec throughput per consumer per shard and lower latency (~70ms). Note: EFO incurs per shard-hour charges. + + +Type: `object` + +### `enhanced_fan_out.enabled` + +Enable Enhanced Fan Out mode for push-based streaming with dedicated throughput. + + +Type: `bool` +Default: `false` + +### `enhanced_fan_out.consumer_name` + +Consumer name for EFO registration. Auto-generated if empty: bento-{clientID}. + + +Type: `string` +Default: `""` + +### `enhanced_fan_out.consumer_arn` + +Existing consumer ARN to use. If provided, skips registration. + + +Type: `string` +Default: `""` + +### `enhanced_fan_out.record_buffer_cap` + +Buffer capacity for the internal records channel per shard. Lower values reduce memory usage when processing many shards. Set to 1 for minimal memory footprint. + + +Type: `int` +Default: `1` + ### `region` The AWS region to target. From 9395743bed13c67c47b526938fe6ac647843ae60 Mon Sep 17 00:00:00 2001 From: jbeemster Date: Fri, 23 Jan 2026 09:48:08 +0100 Subject: [PATCH 02/10] fix lint issues --- internal/impl/aws/input_kinesis.go | 1 - internal/impl/aws/input_kinesis_efo.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/impl/aws/input_kinesis.go b/internal/impl/aws/input_kinesis.go index 0fc4d2a94a..be364af26f 100644 --- a/internal/impl/aws/input_kinesis.go +++ b/internal/impl/aws/input_kinesis.go @@ -247,7 +247,6 @@ type kinesisReader struct { svc *kinesis.Client checkpointer *awsKinesisCheckpointer - efoManager *kinesisEFOManager efoEnabled bool streams []*streamInfo diff --git a/internal/impl/aws/input_kinesis_efo.go b/internal/impl/aws/input_kinesis_efo.go index 4bb46238b0..2b5fb90974 100644 --- a/internal/impl/aws/input_kinesis_efo.go +++ b/internal/impl/aws/input_kinesis_efo.go @@ -41,7 +41,7 @@ func newKinesisEFOManager(conf *kiEFOConfig, streamARN, clientID string, svc *ki consumerName := conf.ConsumerName if consumerName == "" && conf.ConsumerARN == "" { - consumerName = fmt.Sprintf("bento-%s", clientID) + consumerName = "bento-" + clientID } return &kinesisEFOManager{ From 945d8d11aeb2ef8fbfa106ebea27c2c1a065ad15 Mon Sep 17 00:00:00 2001 From: jbeemster Date: Fri, 23 Jan 2026 10:05:28 +0100 Subject: [PATCH 03/10] fix docusaurus issue --- internal/impl/aws/input_kinesis.go | 2 +- website/docs/components/inputs/aws_kinesis.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/impl/aws/input_kinesis.go b/internal/impl/aws/input_kinesis.go index be364af26f..6cf883ae15 100644 --- a/internal/impl/aws/input_kinesis.go +++ b/internal/impl/aws/input_kinesis.go @@ -182,7 +182,7 @@ Use the `+"`batching`"+` fields to configure an optional [batching policy](/docs Description("Enable Enhanced Fan Out mode for push-based streaming with dedicated throughput."). Default(false), service.NewStringField(kiEFOFieldConsumerName). - Description("Consumer name for EFO registration. Auto-generated if empty: bento-{clientID}."). + Description("Consumer name for EFO registration. Auto-generated if empty: bento-clientID."). Default(""). Optional(), service.NewStringField(kiEFOFieldConsumerARN). diff --git a/website/docs/components/inputs/aws_kinesis.md b/website/docs/components/inputs/aws_kinesis.md index 91f5b01d71..3e3aecf912 100644 --- a/website/docs/components/inputs/aws_kinesis.md +++ b/website/docs/components/inputs/aws_kinesis.md @@ -244,7 +244,7 @@ Default: `false` ### `enhanced_fan_out.consumer_name` -Consumer name for EFO registration. Auto-generated if empty: bento-{clientID}. +Consumer name for EFO registration. Auto-generated if empty: bento-clientID. Type: `string` From 43907173878dac7bad6fbf86744161f535288be6 Mon Sep 17 00:00:00 2001 From: jbeemster Date: Fri, 23 Jan 2026 10:28:58 +0100 Subject: [PATCH 04/10] Fix consumption data-gaps --- internal/impl/aws/input_kinesis_efo.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/internal/impl/aws/input_kinesis_efo.go b/internal/impl/aws/input_kinesis_efo.go index 2b5fb90974..be39192d3e 100644 --- a/internal/impl/aws/input_kinesis_efo.go +++ b/internal/impl/aws/input_kinesis_efo.go @@ -353,13 +353,11 @@ func (k *kinesisReader) runEFOConsumer(wg *sync.WaitGroup, info streamInfo, shar }) case sequence := <-resubscribeChan: - // Subscription completed successfully, schedule resubscription - time.AfterFunc(4*time.Minute+30*time.Second, func() { - select { - case subscriptionTrigger <- sequence: - case <-k.ctx.Done(): - } - }) + // Subscription completed successfully, resubscribe immediately to maintain continuous data flow + select { + case subscriptionTrigger <- sequence: + case <-k.ctx.Done(): + } case <-k.ctx.Done(): state = awsKinesisConsumerClosing From 039ecd539a465566e3abc2af0e0b3fea181e38fd Mon Sep 17 00:00:00 2001 From: jbeemster Date: Fri, 23 Jan 2026 12:06:25 +0100 Subject: [PATCH 05/10] fix remove redundant awsLogger interface --- internal/impl/aws/input_kinesis_efo.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/internal/impl/aws/input_kinesis_efo.go b/internal/impl/aws/input_kinesis_efo.go index be39192d3e..dfa77dbe73 100644 --- a/internal/impl/aws/input_kinesis_efo.go +++ b/internal/impl/aws/input_kinesis_efo.go @@ -11,6 +11,8 @@ import ( "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/cenkalti/backoff/v4" + + "github.com/warpstreamlabs/bento/public/service" ) // kinesisEFOManager handles Enhanced Fan Out consumer registration and lifecycle @@ -19,18 +21,11 @@ type kinesisEFOManager struct { consumerName string consumerARN string svc *kinesis.Client - log awsLogger -} - -type awsLogger interface { - Debugf(format string, v ...any) - Infof(format string, v ...any) - Warnf(format string, v ...any) - Errorf(format string, v ...any) + log *service.Logger } // newKinesisEFOManager creates a new EFO manager -func newKinesisEFOManager(conf *kiEFOConfig, streamARN, clientID string, svc *kinesis.Client, log awsLogger) (*kinesisEFOManager, error) { +func newKinesisEFOManager(conf *kiEFOConfig, streamARN, clientID string, svc *kinesis.Client, log *service.Logger) (*kinesisEFOManager, error) { if conf == nil { return nil, errors.New("enhanced fan out config is nil") } From ed16f613e8a84ac0e03097eabf088e0f76de5826 Mon Sep 17 00:00:00 2001 From: jbeemster Date: Fri, 23 Jan 2026 12:08:50 +0100 Subject: [PATCH 06/10] fix allow for unbuffered record channel --- internal/impl/aws/input_kinesis.go | 8 ++++---- internal/impl/aws/input_kinesis_efo.go | 4 ++-- website/docs/components/inputs/aws_kinesis.md | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/impl/aws/input_kinesis.go b/internal/impl/aws/input_kinesis.go index 6cf883ae15..1d2c484a23 100644 --- a/internal/impl/aws/input_kinesis.go +++ b/internal/impl/aws/input_kinesis.go @@ -103,8 +103,8 @@ func kinesisInputConfigFromParsed(pConf *service.ParsedConfig) (conf kiConfig, e if efoConf.RecordBufferCap, err = efoNs.FieldInt(kiEFOFieldRecordBufferCap); err != nil { return } - if efoConf.RecordBufferCap < 1 { - err = errors.New("enhanced_fan_out.record_buffer_cap must be at least 1") + if efoConf.RecordBufferCap < 0 { + err = errors.New("enhanced_fan_out.record_buffer_cap must be at least 0") return } conf.EnhancedFanOut = efoConf @@ -191,8 +191,8 @@ Use the `+"`batching`"+` fields to configure an optional [batching policy](/docs Optional(). Advanced(), service.NewIntField(kiEFOFieldRecordBufferCap). - Description("Buffer capacity for the internal records channel per shard. Lower values reduce memory usage when processing many shards. Set to 1 for minimal memory footprint."). - Default(1). + Description("Buffer capacity for the internal records channel per shard. Lower values reduce memory usage when processing many shards. Set to 0 for unbuffered channel (minimal memory footprint)."). + Default(0). Advanced(), ). Description("Enhanced Fan Out configuration for push-based streaming. Provides dedicated 2 MB/sec throughput per consumer per shard and lower latency (~70ms). Note: EFO incurs per shard-hour charges."). diff --git a/internal/impl/aws/input_kinesis_efo.go b/internal/impl/aws/input_kinesis_efo.go index dfa77dbe73..6538c6a1df 100644 --- a/internal/impl/aws/input_kinesis_efo.go +++ b/internal/impl/aws/input_kinesis_efo.go @@ -216,8 +216,8 @@ func (k *kinesisReader) runEFOConsumer(wg *sync.WaitGroup, info streamInfo, shar k.log.Debugf("Consuming stream '%v' shard '%v' with Enhanced Fan Out as client '%v'", info.id, shardID, k.checkpointer.clientID) // Start subscription in a separate goroutine - bufferCap := 1 - if k.conf.EnhancedFanOut != nil && k.conf.EnhancedFanOut.RecordBufferCap > 0 { + bufferCap := 0 + if k.conf.EnhancedFanOut != nil { bufferCap = k.conf.EnhancedFanOut.RecordBufferCap } recordsChan := make(chan []types.Record, bufferCap) diff --git a/website/docs/components/inputs/aws_kinesis.md b/website/docs/components/inputs/aws_kinesis.md index 3e3aecf912..0caf8123e7 100644 --- a/website/docs/components/inputs/aws_kinesis.md +++ b/website/docs/components/inputs/aws_kinesis.md @@ -73,7 +73,7 @@ input: enabled: false consumer_name: "" consumer_arn: "" - record_buffer_cap: 1 + record_buffer_cap: 0 region: "" endpoint: "" credentials: @@ -260,11 +260,11 @@ Default: `""` ### `enhanced_fan_out.record_buffer_cap` -Buffer capacity for the internal records channel per shard. Lower values reduce memory usage when processing many shards. Set to 1 for minimal memory footprint. +Buffer capacity for the internal records channel per shard. Lower values reduce memory usage when processing many shards. Set to 0 for unbuffered channel (minimal memory footprint). Type: `int` -Default: `1` +Default: `0` ### `region` From 234af9fc1b26935c5a0e5d194ddbf720d72c8a18 Mon Sep 17 00:00:00 2001 From: jbeemster Date: Fri, 23 Jan 2026 12:32:17 +0100 Subject: [PATCH 07/10] fix remove redundant check --- internal/impl/aws/input_kinesis_efo.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/impl/aws/input_kinesis_efo.go b/internal/impl/aws/input_kinesis_efo.go index 6538c6a1df..e2b155d273 100644 --- a/internal/impl/aws/input_kinesis_efo.go +++ b/internal/impl/aws/input_kinesis_efo.go @@ -105,6 +105,11 @@ func (m *kinesisEFOManager) describeAndWaitForActive(ctx context.Context) (strin m.consumerARN = *output.ConsumerDescription.ConsumerARN m.log.Infof("Found existing consumer with ARN: %s", m.consumerARN) + if output.ConsumerDescription.ConsumerStatus == types.ConsumerStatusActive { + m.log.Infof("Consumer is already ACTIVE") + return m.consumerARN, nil + } + if err := m.waitForActiveConsumer(ctx); err != nil { return "", fmt.Errorf("failed waiting for consumer to become active: %w", err) } From ea448e6f0712b0f191b3c72fbb0151ebe2de39a3 Mon Sep 17 00:00:00 2001 From: jbeemster Date: Tue, 27 Jan 2026 16:58:22 +0100 Subject: [PATCH 08/10] fix ensure closed shards are no longer processed --- internal/impl/aws/input_kinesis_efo.go | 37 ++++++++++++++++++++------ 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/internal/impl/aws/input_kinesis_efo.go b/internal/impl/aws/input_kinesis_efo.go index e2b155d273..3c9b2ecc1c 100644 --- a/internal/impl/aws/input_kinesis_efo.go +++ b/internal/impl/aws/input_kinesis_efo.go @@ -228,15 +228,24 @@ func (k *kinesisReader) runEFOConsumer(wg *sync.WaitGroup, info streamInfo, shar recordsChan := make(chan []types.Record, bufferCap) errorsChan := make(chan error, 1) resubscribeChan := make(chan string, 1) + shardFinishedChan := make(chan struct{}, 1) var subscriptionWg sync.WaitGroup subscriptionWg.Add(1) go func() { defer subscriptionWg.Done() for sequence := range subscriptionTrigger { - continuationSeq, err := k.efoSubscribeAndStream(k.ctx, info, shardID, sequence, recordsChan) + continuationSeq, shardFinished, err := k.efoSubscribeAndStream(k.ctx, info, shardID, sequence, recordsChan) if err != nil { errorsChan <- err + } else if shardFinished { + // Shard is closed, signal to main loop + // Don't resubscribe to closed shards + select { + case shardFinishedChan <- struct{}{}: + default: + } + return } else { // Schedule resubscription with continuation sequence if continuationSeq != "" { @@ -359,6 +368,10 @@ func (k *kinesisReader) runEFOConsumer(wg *sync.WaitGroup, info streamInfo, shar case <-k.ctx.Done(): } + case <-shardFinishedChan: + // Shard is closed, mark as finished so we can drain pending records + state = awsKinesisConsumerFinished + case <-k.ctx.Done(): state = awsKinesisConsumerClosing close(subscriptionTrigger) @@ -372,9 +385,10 @@ func (k *kinesisReader) runEFOConsumer(wg *sync.WaitGroup, info streamInfo, shar } // efoSubscribeAndStream subscribes to a shard and streams records to a channel -func (k *kinesisReader) efoSubscribeAndStream(ctx context.Context, info streamInfo, shardID, startingSequence string, recordsChan chan<- []types.Record) (string, error) { +// Returns: continuationSequence, shardFinished, error +func (k *kinesisReader) efoSubscribeAndStream(ctx context.Context, info streamInfo, shardID, startingSequence string, recordsChan chan<- []types.Record) (string, bool, error) { if info.efoManager == nil || info.efoManager.consumerARN == "" { - return "", errors.New("EFO manager or consumer ARN not initialized") + return "", false, errors.New("EFO manager or consumer ARN not initialized") } // Build starting position @@ -408,7 +422,7 @@ func (k *kinesisReader) efoSubscribeAndStream(ctx context.Context, info streamIn output, err := k.svc.SubscribeToShard(ctx, input) if err != nil { - return "", fmt.Errorf("failed to subscribe to shard: %w", err) + return "", false, fmt.Errorf("failed to subscribe to shard: %w", err) } // Process the event stream @@ -416,6 +430,7 @@ func (k *kinesisReader) efoSubscribeAndStream(ctx context.Context, info streamIn defer eventStream.Close() continuationSeq := "" + shardFinished := false for event := range eventStream.Events() { switch e := event.(type) { case *types.SubscribeToShardEventStreamMemberSubscribeToShardEvent: @@ -427,7 +442,7 @@ func (k *kinesisReader) efoSubscribeAndStream(ctx context.Context, info streamIn select { case recordsChan <- shardEvent.Records: case <-ctx.Done(): - return continuationSeq, ctx.Err() + return continuationSeq, false, ctx.Err() } } @@ -436,6 +451,12 @@ func (k *kinesisReader) efoSubscribeAndStream(ctx context.Context, info streamIn continuationSeq = *shardEvent.ContinuationSequenceNumber } + // Check if shard is closed (has child shards) + if len(shardEvent.ChildShards) > 0 { + k.log.Infof("Shard %v is closed, child shards: %v", shardID, len(shardEvent.ChildShards)) + shardFinished = true + } + if shardEvent.MillisBehindLatest != nil { k.log.Debugf("Shard %v is %d milliseconds behind latest", shardID, *shardEvent.MillisBehindLatest) } @@ -449,10 +470,10 @@ func (k *kinesisReader) efoSubscribeAndStream(ctx context.Context, info streamIn if err := eventStream.Err(); err != nil { // Check if it's just end of stream if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return continuationSeq, nil + return continuationSeq, shardFinished, nil } - return continuationSeq, fmt.Errorf("error receiving event: %w", err) + return continuationSeq, shardFinished, fmt.Errorf("error receiving event: %w", err) } - return continuationSeq, nil + return continuationSeq, shardFinished, nil } From d966932784e677b574ec38795de938dada7709c0 Mon Sep 17 00:00:00 2001 From: jbeemster Date: Tue, 27 Jan 2026 17:19:22 +0100 Subject: [PATCH 09/10] fix unclaimed shards not being completed --- internal/impl/aws/input_kinesis.go | 12 +++- .../impl/aws/input_kinesis_checkpointer.go | 27 +++++++++ internal/impl/aws/input_kinesis_test.go | 58 +++++++++++++++++++ 3 files changed, 95 insertions(+), 2 deletions(-) diff --git a/internal/impl/aws/input_kinesis.go b/internal/impl/aws/input_kinesis.go index 1d2c484a23..96e6246799 100644 --- a/internal/impl/aws/input_kinesis.go +++ b/internal/impl/aws/input_kinesis.go @@ -729,9 +729,13 @@ func (k *kinesisReader) runBalancedShards() { for _, info := range k.streams { allShards, err := collectShards(k.ctx, info.arn, k.svc) var clientClaims map[string][]awsKinesisClientClaim + var shardsWithCheckpoints map[string]bool if err == nil { clientClaims, err = k.checkpointer.AllClaims(k.ctx, info.id) } + if err == nil { + shardsWithCheckpoints, err = k.checkpointer.AllCheckpoints(k.ctx, info.id) + } if err != nil { if k.ctx.Err() != nil { return @@ -743,8 +747,12 @@ func (k *kinesisReader) runBalancedShards() { totalShards := len(allShards) unclaimedShards := make(map[string]string, totalShards) for _, s := range allShards { - if !isShardFinished(s) { - unclaimedShards[*s.ShardId] = "" + // Include shard if: + // 1. It's not finished (still open), OR + // 2. It's finished but has a checkpoint (meaning it hasn't been fully consumed yet) + shardID := *s.ShardId + if !isShardFinished(s) || shardsWithCheckpoints[shardID] { + unclaimedShards[shardID] = "" } } for clientID, claims := range clientClaims { diff --git a/internal/impl/aws/input_kinesis_checkpointer.go b/internal/impl/aws/input_kinesis_checkpointer.go index 5dcf78cf2b..f836f37eef 100644 --- a/internal/impl/aws/input_kinesis_checkpointer.go +++ b/internal/impl/aws/input_kinesis_checkpointer.go @@ -180,6 +180,33 @@ type awsKinesisClientClaim struct { LeaseTimeout time.Time } +// AllCheckpoints returns a set of all shard IDs that have checkpoint records +// in DynamoDB for the given stream, regardless of whether they are claimed or not. +func (k *awsKinesisCheckpointer) AllCheckpoints(ctx context.Context, streamID string) (map[string]bool, error) { + checkpoints := make(map[string]bool) + + scanRes, err := k.svc.Scan(ctx, &dynamodb.ScanInput{ + TableName: aws.String(k.conf.Table), + FilterExpression: aws.String("StreamID = :stream_id"), + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":stream_id": &types.AttributeValueMemberS{ + Value: streamID, + }, + }, + }) + if err != nil { + return nil, err + } + + for _, i := range scanRes.Items { + if s, ok := i["ShardID"].(*types.AttributeValueMemberS); ok { + checkpoints[s.Value] = true + } + } + + return checkpoints, nil +} + // AllClaims returns a map of client IDs to shards claimed by that client, // including the lease timeout of the claim. func (k *awsKinesisCheckpointer) AllClaims(ctx context.Context, streamID string) (map[string][]awsKinesisClientClaim, error) { diff --git a/internal/impl/aws/input_kinesis_test.go b/internal/impl/aws/input_kinesis_test.go index 4f80a7c9e3..a44c8cc283 100644 --- a/internal/impl/aws/input_kinesis_test.go +++ b/internal/impl/aws/input_kinesis_test.go @@ -3,6 +3,8 @@ package aws import ( "testing" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -64,3 +66,59 @@ func TestStreamIDParser(t *testing.T) { }) } } + +func TestIsShardFinished(t *testing.T) { + tests := []struct { + name string + shard types.Shard + expected bool + }{ + { + name: "open shard - no ending sequence", + shard: types.Shard{ + ShardId: aws.String("shardId-000000000001"), + SequenceNumberRange: &types.SequenceNumberRange{ + StartingSequenceNumber: aws.String("49671246667567228643283430150187087032206582658"), + }, + }, + expected: false, + }, + { + name: "closed shard - has ending sequence", + shard: types.Shard{ + ShardId: aws.String("shardId-000000000001"), + SequenceNumberRange: &types.SequenceNumberRange{ + StartingSequenceNumber: aws.String("49671246667567228643283430150187087032206582658"), + EndingSequenceNumber: aws.String("49671246667589458717803282320587893555896035326582658"), + }, + }, + expected: true, + }, + { + name: "closed shard - ending sequence is null string", + shard: types.Shard{ + ShardId: aws.String("shardId-000000000001"), + SequenceNumberRange: &types.SequenceNumberRange{ + StartingSequenceNumber: aws.String("49671246667567228643283430150187087032206582658"), + EndingSequenceNumber: aws.String("null"), + }, + }, + expected: false, + }, + { + name: "shard with no sequence number range", + shard: types.Shard{ + ShardId: aws.String("shardId-000000000001"), + }, + expected: false, + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + result := isShardFinished(test.shard) + assert.Equal(t, test.expected, result) + }) + } +} From f238c65613fcb2c310a6f946d7de20c8b8082640 Mon Sep 17 00:00:00 2001 From: jbeemster Date: Fri, 30 Jan 2026 11:38:42 +0100 Subject: [PATCH 10/10] fix apply backpressure to avoid OOM --- internal/impl/aws/input_kinesis_efo.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/impl/aws/input_kinesis_efo.go b/internal/impl/aws/input_kinesis_efo.go index 3c9b2ecc1c..fe751f3208 100644 --- a/internal/impl/aws/input_kinesis_efo.go +++ b/internal/impl/aws/input_kinesis_efo.go @@ -186,6 +186,7 @@ func (k *kinesisReader) runEFOConsumer(wg *sync.WaitGroup, info streamInfo, shar // Channels for timed batches and message flush var nextTimedBatchChan <-chan time.Time var nextFlushChan chan<- asyncMessage + var nextRecordsChan <-chan []types.Record commitCtx, commitCtxClose := context.WithTimeout(k.ctx, k.commitPeriod) go func() { @@ -292,8 +293,14 @@ func (k *kinesisReader) runEFOConsumer(wg *sync.WaitGroup, info streamInfo, shar if pendingMsg.msg != nil { nextFlushChan = k.msgChan + nextRecordsChan = nil } else { nextFlushChan = nil + if len(pending) == 0 { + nextRecordsChan = recordsChan + } else { + nextRecordsChan = nil + } } if nextTimedBatchChan == nil { @@ -332,7 +339,7 @@ func (k *kinesisReader) runEFOConsumer(wg *sync.WaitGroup, info streamInfo, shar case nextFlushChan <- pendingMsg: pendingMsg = asyncMessage{} - case records := <-recordsChan: + case records := <-nextRecordsChan: // Received records from subscription pending = append(pending, records...) boff.Reset()