Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
637c0c8
Add Enhanced FanOut Support for aws_kinesis
jbeemster Jan 22, 2026
15da0a6
fix lint issues
jbeemster Jan 23, 2026
cf0d551
fix docusaurus issue
jbeemster Jan 23, 2026
8cd6433
Fix consumption data-gaps
jbeemster Jan 23, 2026
234b61f
fix remove redundant awsLogger interface
jbeemster Jan 23, 2026
fca7e0a
fix allow for unbuffered record channel
jbeemster Jan 23, 2026
5eb99d0
fix remove redundant check
jbeemster Jan 23, 2026
5a6962c
fix ensure closed shards are no longer processed
jbeemster Jan 27, 2026
2cd6d54
fix unclaimed shards not being completed
jbeemster Jan 27, 2026
f7b2158
Use a single query for both retrieving claims and checkpoints from dy…
matus-tomlein Jan 28, 2026
be8e5bb
fix apply backpressure to avoid OOM
jbeemster Jan 30, 2026
f3255cb
Convert Infof to Debugf messages
matus-tomlein Jan 30, 2026
126c2db
Add LintRule to catch misconfiguration in cases both consumer_name an…
matus-tomlein Jan 30, 2026
7955ecd
refactor: remove redundant Optional() from EFO config fields
matus-tomlein Jan 30, 2026
d05e44e
refactor: remove duplicate EFO validation
matus-tomlein Jan 30, 2026
8281d9d
fix: prevent race condition in EFO resubscription backoff
matus-tomlein Jan 30, 2026
1dd27da
fix: add context cancellation checks in EFO subscription goroutine
matus-tomlein Jan 30, 2026
f391410
fix: track last received sequence for EFO resubscription fallback
matus-tomlein Jan 30, 2026
cd25651
add int test for kinesis_efo (#2)
jem-davies Feb 12, 2026
bd55b1c
feat(aws/kinesis): Add global pending pool with bounded memory and mo…
matus-tomlein Feb 12, 2026
09ac2d1
fix lint; make docs (#3)
jem-davies Feb 19, 2026
0f6005b
feat(aws/kinesis): add Version to enhanced_fan_out field registration
matus-tomlein Mar 10, 2026
9ce5b7b
docs(aws/kinesis): clarify intentional ShardID=="" skip in GetCheckpo…
matus-tomlein Mar 10, 2026
31316e1
refactor(aws/kinesis): apply go fix modernisations
matus-tomlein Mar 10, 2026
1834a24
refactor(aws/kinesis): apply go fix modernisations to EFO subscriptio…
matus-tomlein Mar 10, 2026
66a7290
fix(aws/kinesis): improve waitForActiveConsumer correctness and add u…
matus-tomlein Mar 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 134 additions & 10 deletions internal/impl/aws/input_kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,24 @@ const (
kiFieldRebalancePeriod = "rebalance_period"
kiFieldStartFromOldest = "start_from_oldest"
kiFieldBatching = "batching"
kiFieldEnhancedFanOut = "enhanced_fan_out"

// Enhanced Fan Out Fields
kiEFOFieldEnabled = "enabled"
kiEFOFieldConsumerName = "consumer_name"
kiEFOFieldConsumerARN = "consumer_arn"
kiEFOFieldRecordBufferCap = "record_buffer_cap"
kiEFOFieldMaxPendingRecordsGlobal = "max_pending_records"
)

type kiEFOConfig struct {
Enabled bool
ConsumerName string
ConsumerARN string
RecordBufferCap int
MaxPendingRecordsGlobal int
}

type kiConfig struct {
Streams []string
DynamoDB kiddbConfig
Expand All @@ -47,6 +63,7 @@ type kiConfig struct {
LeasePeriod string
RebalancePeriod string
StartFromOldest bool
EnhancedFanOut *kiEFOConfig
}

func kinesisInputConfigFromParsed(pConf *service.ParsedConfig) (conf kiConfig, err error) {
Expand All @@ -73,6 +90,34 @@ func kinesisInputConfigFromParsed(pConf *service.ParsedConfig) (conf kiConfig, e
if conf.StartFromOldest, err = pConf.FieldBool(kiFieldStartFromOldest); err != nil {
return
}
if pConf.Contains(kiFieldEnhancedFanOut) {
efoConf := &kiEFOConfig{}
efoNs := pConf.Namespace(kiFieldEnhancedFanOut)
if efoConf.Enabled, err = efoNs.FieldBool(kiEFOFieldEnabled); err != nil {
return
}
if efoConf.ConsumerName, err = efoNs.FieldString(kiEFOFieldConsumerName); err != nil {
return
}
if efoConf.ConsumerARN, err = efoNs.FieldString(kiEFOFieldConsumerARN); err != nil {
return
}
if efoConf.RecordBufferCap, err = efoNs.FieldInt(kiEFOFieldRecordBufferCap); err != nil {
return
}
if efoConf.RecordBufferCap < 0 {
err = errors.New("enhanced_fan_out.record_buffer_cap must be at least 0")
return
}
if efoConf.MaxPendingRecordsGlobal, err = efoNs.FieldInt(kiEFOFieldMaxPendingRecordsGlobal); err != nil {
return
}
if efoConf.MaxPendingRecordsGlobal < 1 {
err = errors.New("enhanced_fan_out.max_pending_records must be at least 1")
return
}
conf.EnhancedFanOut = efoConf
}
return
}

Expand Down Expand Up @@ -141,9 +186,36 @@ Use the `+"`batching`"+` fields to configure an optional [batching policy](/docs
service.NewBoolField(kiFieldStartFromOldest).
Description("Whether to consume from the oldest message when a sequence does not yet exist for the stream.").
Default(true),
service.NewObjectField(kiFieldEnhancedFanOut,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to use the .Version("1.16.0") func here too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added .Version("1.16.0") to the enhanced_fan_out field.

service.NewBoolField(kiEFOFieldEnabled).
Description("Enable Enhanced Fan Out mode for push-based streaming with dedicated throughput.").
Default(false),
service.NewStringField(kiEFOFieldConsumerName).
Description("Consumer name for EFO registration. Auto-generated if empty: bento-clientID.").
Default(""),
service.NewStringField(kiEFOFieldConsumerARN).
Description("Existing consumer ARN to use. If provided, skips registration.").
Default("").
Advanced(),
service.NewIntField(kiEFOFieldRecordBufferCap).
Description("Buffer capacity for the internal records channel per shard. Lower values reduce memory usage when processing many shards. Set to 0 for unbuffered channel (minimal memory footprint).").
Default(0).
Advanced(),
service.NewIntField(kiEFOFieldMaxPendingRecordsGlobal).
Description("Maximum total number of records to buffer across all shards before applying backpressure to Kinesis subscriptions. This provides a global memory bound regardless of shard count. Higher values improve throughput by allowing shards to continue receiving data while processing, but increase memory usage. Total memory usage is approximately max_pending_records × average_record_size.").
Default(50000).
Advanced(),
).
Description("Enhanced Fan Out configuration for push-based streaming. Provides dedicated 2 MB/sec throughput per consumer per shard and lower latency (~70ms). Note: EFO incurs per shard-hour charges.").
Version("1.16.0").
Optional().
Advanced(),
).
Fields(config.SessionFields()...).
Field(service.NewBatchPolicyField(kiFieldBatching))
Field(service.NewBatchPolicyField(kiFieldBatching)).
LintRule(`root = match {
this.` + kiFieldEnhancedFanOut + `.` + kiEFOFieldConsumerName + ` != "" && this.` + kiFieldEnhancedFanOut + `.` + kiEFOFieldConsumerARN + ` != "" => ["cannot specify both ` + kiEFOFieldConsumerName + ` and ` + kiEFOFieldConsumerARN + ` in ` + kiFieldEnhancedFanOut + ` config"]
}`)
return spec
}

Expand Down Expand Up @@ -174,6 +246,7 @@ type streamInfo struct {
explicitShards []string
id string // Either a name or arn, extracted from config and used for balancing shards
arn string
efoManager *kinesisEFOManager // Enhanced Fan Out manager (if EFO is enabled)
}

type kinesisReader struct {
Expand All @@ -187,8 +260,10 @@ type kinesisReader struct {

boffPool sync.Pool

svc *kinesis.Client
checkpointer *awsKinesisCheckpointer
svc *kinesis.Client
checkpointer *awsKinesisCheckpointer
efoEnabled bool
globalPendingPool *globalPendingPool

streams []*streamInfo

Expand Down Expand Up @@ -319,6 +394,14 @@ func newKinesisReaderFromConfig(conf kiConfig, batcher service.BatchPolicy, sess
if k.rebalancePeriod, err = time.ParseDuration(k.conf.RebalancePeriod); err != nil {
return nil, fmt.Errorf("failed to parse rebalance period string: %v", err)
}

// Check if Enhanced Fan Out is enabled
if k.conf.EnhancedFanOut != nil && k.conf.EnhancedFanOut.Enabled {
k.efoEnabled = true
k.globalPendingPool = newGlobalPendingPool(k.conf.EnhancedFanOut.MaxPendingRecordsGlobal)
k.log.Debugf("Enhanced Fan Out enabled with global pending pool max: %d", k.conf.EnhancedFanOut.MaxPendingRecordsGlobal)
}

return &k, nil
}

Expand Down Expand Up @@ -657,9 +740,9 @@ func (k *kinesisReader) runBalancedShards() {
for {
for _, info := range k.streams {
allShards, err := collectShards(k.ctx, info.arn, k.svc)
var clientClaims map[string][]awsKinesisClientClaim
var checkpointData *awsKinesisCheckpointData
if err == nil {
clientClaims, err = k.checkpointer.AllClaims(k.ctx, info.id)
checkpointData, err = k.checkpointer.GetCheckpointsAndClaims(k.ctx, info.id)
}
if err != nil {
if k.ctx.Err() != nil {
Expand All @@ -669,11 +752,18 @@ func (k *kinesisReader) runBalancedShards() {
continue
}

clientClaims := checkpointData.ClientClaims
shardsWithCheckpoints := checkpointData.ShardsWithCheckpoints

totalShards := len(allShards)
unclaimedShards := make(map[string]string, totalShards)
for _, s := range allShards {
if !isShardFinished(s) {
unclaimedShards[*s.ShardId] = ""
// Include shard if:
// 1. It's not finished (still open), OR
// 2. It's finished but has a checkpoint (meaning it hasn't been fully consumed yet)
shardID := *s.ShardId
if !isShardFinished(s) || shardsWithCheckpoints[shardID] {
unclaimedShards[shardID] = ""
}
}
for clientID, claims := range clientClaims {
Expand All @@ -700,7 +790,12 @@ func (k *kinesisReader) runBalancedShards() {
continue
}
wg.Add(1)
if err = k.runConsumer(&wg, *info, shardID, sequence); err != nil {
if k.efoEnabled {
err = k.runEFOConsumer(&wg, *info, shardID, sequence)
} else {
err = k.runConsumer(&wg, *info, shardID, sequence)
}
if err != nil {
k.log.Errorf("Failed to start consumer: %v\n", err)
}
}
Expand Down Expand Up @@ -749,7 +844,12 @@ func (k *kinesisReader) runBalancedShards() {
info.id, randomShard, clientID, k.clientID,
)
wg.Add(1)
if err = k.runConsumer(&wg, *info, randomShard, sequence); err != nil {
if k.efoEnabled {
err = k.runEFOConsumer(&wg, *info, randomShard, sequence)
} else {
err = k.runConsumer(&wg, *info, randomShard, sequence)
}
if err != nil {
k.log.Errorf("Failed to start consumer: %v\n", err)
} else {
// If we successfully stole the shard then that's enough
Expand Down Expand Up @@ -790,7 +890,11 @@ func (k *kinesisReader) runExplicitShards() {
sequence, err := k.checkpointer.Claim(k.ctx, id, shardID, "")
if err == nil {
wg.Add(1)
err = k.runConsumer(&wg, info, shardID, sequence)
if k.efoEnabled {
err = k.runEFOConsumer(&wg, info, shardID, sequence)
} else {
err = k.runConsumer(&wg, info, shardID, sequence)
}
}
if err != nil {
if k.ctx.Err() != nil {
Expand Down Expand Up @@ -868,6 +972,26 @@ func (k *kinesisReader) Connect(ctx context.Context) error {
return err
}

// Initialize Enhanced Fan Out if enabled
if k.efoEnabled {
for _, stream := range k.streams {
// Create EFO manager for this stream
efoMgr, err := newKinesisEFOManager(k.conf.EnhancedFanOut, stream.arn, k.clientID, k.svc, k.log)
if err != nil {
return fmt.Errorf("failed to create EFO manager for stream %s: %w", stream.id, err)
}

// Register consumer and wait for ACTIVE status
consumerARN, err := efoMgr.ensureConsumerRegistered(ctx)
if err != nil {
return fmt.Errorf("failed to register EFO consumer for stream %s: %w", stream.id, err)
}

stream.efoManager = efoMgr
k.log.Debugf("Enhanced Fan Out consumer registered for stream %s with ARN: %s", stream.id, consumerARN)
}
}

if len(k.streams[0].explicitShards) > 0 {
go k.runExplicitShards()
} else {
Expand Down
105 changes: 71 additions & 34 deletions internal/impl/aws/input_kinesis_checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,54 +180,91 @@ type awsKinesisClientClaim struct {
LeaseTimeout time.Time
}

// AllClaims returns a map of client IDs to shards claimed by that client,
// including the lease timeout of the claim.
func (k *awsKinesisCheckpointer) AllClaims(ctx context.Context, streamID string) (map[string][]awsKinesisClientClaim, error) {
clientClaims := make(map[string][]awsKinesisClientClaim)
var scanErr error

scanRes, err := k.svc.Scan(ctx, &dynamodb.ScanInput{
TableName: aws.String(k.conf.Table),
FilterExpression: aws.String("StreamID = :stream_id"),
// awsKinesisCheckpointData contains both the set of all shards with checkpoints
// and the map of client claims, retrieved in a single DynamoDB query.
type awsKinesisCheckpointData struct {
// ShardsWithCheckpoints is a set of all shard IDs that have checkpoint records
ShardsWithCheckpoints map[string]bool
// ClientClaims is a map of client IDs to shards claimed by that client
ClientClaims map[string][]awsKinesisClientClaim
}

// GetCheckpointsAndClaims retrieves all checkpoint data for a stream.
//
// Returns:
// - ShardsWithCheckpoints: set of all shard IDs that have checkpoint records
// - ClientClaims: map of client IDs to their claimed shards (excludes entries without ClientID)
func (k *awsKinesisCheckpointer) GetCheckpointsAndClaims(ctx context.Context, streamID string) (*awsKinesisCheckpointData, error) {
result := &awsKinesisCheckpointData{
ShardsWithCheckpoints: make(map[string]bool),
ClientClaims: make(map[string][]awsKinesisClientClaim),
}

input := &dynamodb.QueryInput{
TableName: aws.String(k.conf.Table),
KeyConditionExpression: aws.String("StreamID = :stream_id"),
ExpressionAttributeValues: map[string]types.AttributeValue{
":stream_id": &types.AttributeValueMemberS{
Value: streamID,
},
},
})
if err != nil {
return nil, err
}

for _, i := range scanRes.Items {
var clientID string
if s, ok := i["ClientID"].(*types.AttributeValueMemberS); ok {
clientID = s.Value
} else {
continue
}
paginator := dynamodb.NewQueryPaginator(k.svc, input)

var claim awsKinesisClientClaim
if s, ok := i["ShardID"].(*types.AttributeValueMemberS); ok {
claim.ShardID = s.Value
}
if claim.ShardID == "" {
return nil, errors.New("failed to extract shard id from claim")
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query checkpoints: %w", err)
}

if s, ok := i["LeaseTimeout"].(*types.AttributeValueMemberS); ok {
if claim.LeaseTimeout, scanErr = time.Parse(time.RFC3339Nano, s.Value); scanErr != nil {
return nil, fmt.Errorf("failed to parse claim lease: %w", scanErr)
for _, item := range page.Items {
// Extract ShardID - required for all checkpoint entries
var shardID string
if s, ok := item["ShardID"].(*types.AttributeValueMemberS); ok {
shardID = s.Value
}
if shardID == "" {
// Skip malformed items without a ShardID rather than failing the
// whole query — this is intentionally lenient compared to the
// single-item getCheckpoint path which returns an error, because
// here we are scanning all checkpoints and a single bad row
// should not block progress for the rest.
continue
}

// Track all shards with checkpoints
result.ShardsWithCheckpoints[shardID] = true

// Extract client claim if ClientID exists
var clientID string
if s, ok := item["ClientID"].(*types.AttributeValueMemberS); ok {
clientID = s.Value
}
if clientID == "" {
// No client ID means this is an orphaned checkpoint (from final=true)
continue
}
}
if claim.LeaseTimeout.IsZero() {
return nil, errors.New("failed to extract lease timeout from claim")
}

clientClaims[clientID] = append(clientClaims[clientID], claim)
// Extract lease timeout for claims
var claim awsKinesisClientClaim
claim.ShardID = shardID

if s, ok := item["LeaseTimeout"].(*types.AttributeValueMemberS); ok {
var parseErr error
if claim.LeaseTimeout, parseErr = time.Parse(time.RFC3339Nano, s.Value); parseErr != nil {
return nil, fmt.Errorf("failed to parse claim lease for shard %s: %w", shardID, parseErr)
}
}
if claim.LeaseTimeout.IsZero() {
return nil, fmt.Errorf("failed to extract lease timeout from claim for shard %s", shardID)
}

result.ClientClaims[clientID] = append(result.ClientClaims[clientID], claim)
}
}

return clientClaims, scanErr
return result, nil
}

// Claim attempts to claim a shard for a particular stream ID. If fromClientID
Expand Down
Loading
Loading