-
Notifications
You must be signed in to change notification settings - Fork 688
ElasticSource finishes prematurely #3596
Description
ElasticSource terminates streams prematurely when using fetchThreshold > 0 with Elasticsearch scroll queries, resulting in incomplete data processing despite more documents being available in the source.
When using ElasticSource with scroll-based queries and fetchThreshold > 0, the stream can complete unexpectedly before all available documents are processed. This manifests as:
- Stream completes after processing only a fraction of available documents
- No errors or exceptions thrown (silent failure)
- Behavior is inconsistent and depends on timing/downstream processing speed
- Issue does not occur when fetchThreshold = 0
Observed Behavior
// Configuration that exhibits the problem
val settings = SourceSettings(
scrollQuery,
maxItems = Long.MaxValue,
fetchThreshold = 100, // Issue occurs when > 0
warm = true
)
Source.fromGraph(new ElasticSource(client, settings))
.via(slowProcessingFlow) // Slow downstream processing
.runWith(Sink.seq)
// Expected: 90 documents from Elasticsearch index
// Actual: Only 10-15 documents processed before stream completion
Reproduction Conditions
- Using Elasticsearch scroll queries (
.scroll("5m").size(<a number greater than threshold>)) - fetchThreshold > 0 in SourceSettings
- Downstream processing that creates backpressure (slower consumption than fetch rate)
- Multiple documents available in the source index
Workaround
Setting fetchThreshold = 0 resolves the issue but eliminates prefetching benefits:
val settings = SourceSettings(searchQuery, maxItems = Long.MaxValue, fetchThreshold = 0, warm = true)
Suspected Code Location
Line 64 in b600f74
| if (searchr.hits.hits.length == 0) { |