Skip to content

ElasticSource finishes prematurely #3596

@jvargas-lumu

Description

@jvargas-lumu

ElasticSource terminates streams prematurely when using fetchThreshold > 0 with Elasticsearch scroll queries, resulting in incomplete data processing despite more documents being available in the source.

When using ElasticSource with scroll-based queries and fetchThreshold > 0, the stream can complete unexpectedly before all available documents are processed. This manifests as:

  • Stream completes after processing only a fraction of available documents
  • No errors or exceptions thrown (silent failure)
  • Behavior is inconsistent and depends on timing/downstream processing speed
  • Issue does not occur when fetchThreshold = 0

Observed Behavior

// Configuration that exhibits the problem
val settings = SourceSettings(
scrollQuery,
maxItems = Long.MaxValue,
fetchThreshold = 100, // Issue occurs when > 0
warm = true
)

Source.fromGraph(new ElasticSource(client, settings))
.via(slowProcessingFlow) // Slow downstream processing
.runWith(Sink.seq)

// Expected: 90 documents from Elasticsearch index
// Actual: Only 10-15 documents processed before stream completion

Reproduction Conditions

  • Using Elasticsearch scroll queries (.scroll("5m").size(<a number greater than threshold>))
  • fetchThreshold > 0 in SourceSettings
  • Downstream processing that creates backpressure (slower consumption than fetch rate)
  • Multiple documents available in the source index

Workaround

Setting fetchThreshold = 0 resolves the issue but eliminates prefetching benefits:

val settings = SourceSettings(searchQuery, maxItems = Long.MaxValue, fetchThreshold = 0, warm = true)

Suspected Code Location

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions