Skip to content

Add ingest pipeline support for pull-based ingestion#20873

Merged
varunbharadwaj merged 24 commits intoopensearch-project:mainfrom
imRishN:pipeline-integration
Mar 27, 2026
Merged

Add ingest pipeline support for pull-based ingestion#20873
varunbharadwaj merged 24 commits intoopensearch-project:mainfrom
imRishN:pipeline-integration

Conversation

@imRishN
Copy link
Copy Markdown
Member

@imRishN imRishN commented Mar 15, 2026

Description

Adds final_pipeline execution support to the pull-based ingestion path. Documents are transformed by configured ingest pipelines before being written to Lucene.

  • Pipeline resolution from index settings with dynamic update
  • CompletableFuture sync bridge for async IngestService.executeBulkRequest()
  • Guardrails blocking _id and _routing mutations
  • Zero overhead when no pipeline is configured
  • Pipeline not invoked for delete operations

Related Issues

Resolves -

  1. [Feature Request] Support ingest pipeline execution in pull-based ingestion #20875
  2. Add IT for final_pipeline support with streaming messages in Pull Based Ingestion #20879
  3. Add IT for field_mapping + final_pipeline combined for Pull Based Ingestion #20880

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

imRishN added 4 commits March 11, 2026 09:58
…based ingestion

Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
…tion

Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 15, 2026

PR Reviewer Guide 🔍

(Review updated until commit 083f57c)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
📝 TODO sections

🔀 Multiple PR themes

Sub-PR theme: Core pipeline executor and message processor integration

Relevant files:

  • server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java
  • server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java
  • server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java
  • server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java
  • server/src/main/java/org/opensearch/index/engine/IngestionEngine.java
  • server/src/test/java/org/opensearch/indices/pollingingest/IngestPipelineExecutorTests.java
  • server/src/test/java/org/opensearch/indices/pollingingest/MessageProcessorTests.java
  • server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java

Sub-PR theme: IngestionEngineFactory IngestService supplier enforcement

Relevant files:

  • server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java
  • server/src/test/java/org/opensearch/indices/pollingingest/IngestionEngineFactoryTests.java

Sub-PR theme: Kafka integration tests for ingest pipeline

Relevant files:

  • plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java
  • CHANGELOG.md

⚡ Recommended focus areas for review

Race Condition

In executePipelines, the dropped flag is set via onDropped callback and then checked after future.get(). However, the onCompletion callback completes the future, and onDropped is called before onCompletion. If the pipeline implementation calls onDropped without calling onCompletion, the future will never complete and the method will block until timeout. More critically, if onDropped is called after onCompletion (in a different thread), the dropped.get() check after future.get() may return false even though the document was dropped. The ordering guarantee between onDropped and onCompletion depends entirely on IngestService internals.

CompletableFuture<Void> future = new CompletableFuture<>();
AtomicBoolean dropped = new AtomicBoolean(false);

ingestService.executeBulkRequest(
    1,
    Collections.singletonList(indexRequest),
    (slot, e) -> future.completeExceptionally(e),
    (thread, e) -> {
        if (e != null) {
            future.completeExceptionally(e);
        } else {
            future.complete(null);
        }
    },
    slot -> dropped.set(true),
    ThreadPool.Names.WRITE
);

// Block until pipeline execution completes (with timeout)
try {
    future.get(PIPELINE_EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    throw new RuntimeException("Ingest pipeline execution timed out after [" + PIPELINE_EXECUTION_TIMEOUT_SECONDS + "] seconds", e);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new RuntimeException("Ingest pipeline execution was interrupted", e);
} catch (ExecutionException e) {
    throw new RuntimeException("Ingest pipeline execution failed", e.getCause());
}

if (dropped.get()) {
    return null;
}
Thread Blocking

executePipelines is called from the message processor thread and blocks synchronously via future.get() for up to 30 seconds. If IngestService.executeBulkRequest dispatches work to the WRITE thread pool (the same pool that may be used by the processor), this can cause a deadlock or thread starvation under load. The PR description acknowledges this is a "CompletableFuture sync bridge for async IngestService.executeBulkRequest()" but the risk of blocking the processor thread pool is not mitigated.

ingestService.executeBulkRequest(
    1,
    Collections.singletonList(indexRequest),
    (slot, e) -> future.completeExceptionally(e),
    (thread, e) -> {
        if (e != null) {
            future.completeExceptionally(e);
        } else {
            future.complete(null);
        }
    },
    slot -> dropped.set(true),
    ThreadPool.Names.WRITE
);

// Block until pipeline execution completes (with timeout)
try {
    future.get(PIPELINE_EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    throw new RuntimeException("Ingest pipeline execution timed out after [" + PIPELINE_EXECUTION_TIMEOUT_SECONDS + "] seconds", e);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new RuntimeException("Ingest pipeline execution was interrupted", e);
} catch (ExecutionException e) {
    throw new RuntimeException("Ingest pipeline execution failed", e.getCause());
}
Dropped Doc SeqNo

When a document is dropped by the pipeline, a NoOp engine operation is created with hardcoded seqNo=0 and primaryTerm=1. Using a fixed sequence number of 0 for every dropped document will cause sequence number conflicts if multiple documents are dropped, potentially corrupting the sequence number state of the shard.

operation = new Engine.NoOp(
    0,
    1,
    Engine.Operation.Origin.PRIMARY,
    System.nanoTime(),
    "Document dropped by ingest pipeline"
);
return new MessageOperation(operation, opType);
Null Supplier Result

The assert assert ingestService != null only fires when assertions are enabled (JVM -ea flag). In production without assertions enabled, a null IngestService returned by the supplier will propagate to IngestionEngine constructor, where Objects.requireNonNull(ingestService) in IngestPipelineExecutor will throw a NullPointerException with a less informative message. The assert message is good but won't fire in production.

IngestService ingestService = ingestServiceSupplier.get();
assert ingestService != null : "IngestService supplier returned null. This indicates a initialization ordering issue.";

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 15, 2026

PR Code Suggestions ✨

Latest suggestions up to 083f57c

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix potential hang when pipeline item failure occurs

The onItemFailure callback calls future.completeExceptionally(e) but does not also
call future.complete(null) or signal completion in any way. If onCompletion is never
invoked after onItemFailure, the future.get() call will block until timeout. The
onItemFailure and onCompletion callbacks may be called independently, so the failure
should be recorded and the future should only be completed in onCompletion to avoid
a race between the two callbacks completing the future.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [121-134]

+AtomicReference<Exception> itemFailure = new AtomicReference<>();
+
 ingestService.executeBulkRequest(
     1,
     Collections.singletonList(indexRequest),
-    (slot, e) -> future.completeExceptionally(e),
+    (slot, e) -> itemFailure.set(e),
     (thread, e) -> {
-        if (e != null) {
-            future.completeExceptionally(e);
+        Exception failure = e != null ? e : itemFailure.get();
+        if (failure != null) {
+            future.completeExceptionally(failure);
         } else {
             future.complete(null);
         }
     },
     slot -> dropped.set(true),
     ThreadPool.Names.WRITE
 );
Suggestion importance[1-10]: 8

__

Why: The onItemFailure callback calls future.completeExceptionally(e) independently of onCompletion. If onCompletion is also called afterward (which is the typical IngestService behavior), the second call to completeExceptionally or complete on an already-completed future is silently ignored. However, if onCompletion is NOT called after onItemFailure, the future will block until timeout. The suggested fix of recording the failure in onItemFailure and completing only in onCompletion is a more robust pattern that avoids both the race condition and potential hangs.

Medium
Guard against null pipeline executor before invocation

When pipelineExecutor is null (e.g., in tests using the old two-argument
MessageProcessor constructor), calling pipelineExecutor.executePipelines(...) will
throw a NullPointerException. A null-check should be added before invoking the
executor, or the field should be guaranteed non-null by construction.

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java [240-259]

 Map<String, Object> sourceMap = (Map<String, Object>) payloadMap.get(SOURCE);
 
 // Execute ingest pipelines
-try {
-    Map<String, Object> transformedSource = pipelineExecutor.executePipelines(id, sourceMap);
-    if (transformedSource == null) {
-        // Document dropped by pipeline
-        operation = new Engine.NoOp(
-            0,
-            1,
-            Engine.Operation.Origin.PRIMARY,
-            System.nanoTime(),
-            "Document dropped by ingest pipeline"
-        );
-        return new MessageOperation(operation, opType);
+if (pipelineExecutor != null) {
+    try {
+        Map<String, Object> transformedSource = pipelineExecutor.executePipelines(id, sourceMap);
+        if (transformedSource == null) {
+            // Document dropped by pipeline
+            operation = new Engine.NoOp(
+                0,
+                1,
+                Engine.Operation.Origin.PRIMARY,
+                System.nanoTime(),
+                "Document dropped by ingest pipeline"
+            );
+            return new MessageOperation(operation, opType);
+        }
+        sourceMap = transformedSource;
+    } catch (Exception e) {
+        throw new RuntimeException("Ingest pipeline execution failed", e);
     }
-    sourceMap = transformedSource;
-} catch (Exception e) {
-    throw new RuntimeException("Ingest pipeline execution failed", e);
 }
Suggestion importance[1-10]: 3

__

Why: Looking at the PR code, the MessageProcessor constructors all require a pipelineExecutor parameter, so pipelineExecutor should never be null in normal usage. The suggestion adds a null-check that may mask programming errors rather than fail fast. However, it could be useful for backward compatibility in tests.

Low
General
Avoid race condition after waiting for processed count

The totalProcessedCount() check is used to infer that dropped documents were
processed, but a document counted as "processed" may not yet have been fully handled
(e.g., the drop callback may be async). There is a potential race condition where
the assertion runs before the drop is fully committed. Consider waiting on
totalProcessedCount() >= 2 AND a small stabilization delay, or better, wait on a
drop-specific counter if available (e.g., totalDroppedCount()).

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java [95-103]

-// Wait until both messages are processed, then verify none were indexed (all dropped)
+// Wait until both messages are processed (dropped by pipeline), then verify none were indexed
 waitForState(() -> {
     PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
         .getPollingIngestStats();
     return stats != null && stats.getMessageProcessorStats().totalProcessedCount() >= 2;
 });
-refresh(indexName);
-SearchResponse response = client().prepareSearch(indexName).get();
-assertThat(response.getHits().getTotalHits().value(), is(0L));
+// Allow time for any async drop handling to complete
+assertBusy(() -> {
+    refresh(indexName);
+    SearchResponse response = client().prepareSearch(indexName).get();
+    assertThat(response.getHits().getTotalHits().value(), is(0L));
+});
Suggestion importance[1-10]: 4

__

Why: The concern about a potential race condition between totalProcessedCount() reaching 2 and the drop being fully reflected in the search index is valid for integration tests. Using assertBusy for the final assertion would make the test more robust, though the existing refresh() call may be sufficient in practice.

Low
Add descriptive null-check message for required IngestService

The constructor now requires a non-null ingestService via Objects.requireNonNull,
but the old no-arg constructor that passed null was removed. Any existing callers
that relied on the two-argument constructor or passed null will now fail at runtime.
Additionally, the test testConstructorWithNullIngestService was deleted, but the
IngestionEngineFactory previously supported a null supplier. Ensure all call sites
provide a valid IngestService and consider adding a null-check with a descriptive
error message.

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java [68-75]

 public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory, IngestService ingestService) {
     super(engineConfig);
     this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
+    Objects.requireNonNull(ingestService, "IngestService must not be null for IngestionEngine");
     this.pipelineExecutor = new IngestPipelineExecutor(
-        Objects.requireNonNull(ingestService),
+        ingestService,
         engineConfig.getIndexSettings().getIndex().getName(),
         engineConfig.getIndexSettings()
     );
Suggestion importance[1-10]: 2

__

Why: The suggestion only adds a descriptive message to Objects.requireNonNull, which is a minor improvement. The existing_code and improved_code are functionally nearly identical, and the change has minimal impact on correctness since IngestPipelineExecutor already calls Objects.requireNonNull(ingestService) internally.

Low

Previous suggestions

Suggestions up to commit 9a50135
CategorySuggestion                                                                                                                                    Impact
Possible issue
Complete future immediately when document is dropped

The onItemFailure callback calls future.completeExceptionally(e) but does not also
call future.complete(null) or signal completion in any way. If onCompletion is never
invoked after onItemFailure in some code paths, the future.get() call will block
until the timeout. The onItemFailure callback should complete the future directly to
avoid unnecessary blocking.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [121-134]

 ingestService.executeBulkRequest(
     1,
     Collections.singletonList(indexRequest),
     (slot, e) -> future.completeExceptionally(e),
     (thread, e) -> {
         if (e != null) {
             future.completeExceptionally(e);
         } else {
             future.complete(null);
         }
     },
-    slot -> dropped.set(true),
+    slot -> {
+        dropped.set(true);
+        future.complete(null);
+    },
     ThreadPool.Names.WRITE
 );
Suggestion importance[1-10]: 7

__

Why: The slot -> dropped.set(true) callback doesn't complete the future, so if onCompletion is not called after onDropped in some code paths, the future will block until timeout. However, looking at the test in IngestPipelineExecutorTests, onDropped is always followed by onCompletion, suggesting the current contract expects onCompletion to always be called. The suggestion is valid as a defensive improvement but may not be strictly necessary given the existing contract.

Medium
Guard against null pipeline executor

When pipelineExecutor is null (e.g., in tests using the old two-argument
MessageProcessor constructor), calling pipelineExecutor.executePipelines(...) will
throw a NullPointerException. A null check should be added before invoking the
executor, or the field should be guaranteed non-null at construction time.

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java [242-259]

 // Execute ingest pipelines
-try {
-    Map<String, Object> transformedSource = pipelineExecutor.executePipelines(id, sourceMap);
-    if (transformedSource == null) {
-        // Document dropped by pipeline
-        operation = new Engine.NoOp(
-            0,
-            1,
-            Engine.Operation.Origin.PRIMARY,
-            System.nanoTime(),
-            "Document dropped by ingest pipeline"
-        );
-        return new MessageOperation(operation, opType);
+if (pipelineExecutor != null) {
+    try {
+        Map<String, Object> transformedSource = pipelineExecutor.executePipelines(id, sourceMap);
+        if (transformedSource == null) {
+            // Document dropped by pipeline
+            operation = new Engine.NoOp(
+                0,
+                1,
+                Engine.Operation.Origin.PRIMARY,
+                System.nanoTime(),
+                "Document dropped by ingest pipeline"
+            );
+            return new MessageOperation(operation, opType);
+        }
+        sourceMap = transformedSource;
+    } catch (Exception e) {
+        throw new RuntimeException("Ingest pipeline execution failed", e);
     }
-    sourceMap = transformedSource;
-} catch (Exception e) {
-    throw new RuntimeException("Ingest pipeline execution failed", e);
 }
Suggestion importance[1-10]: 3

__

Why: Looking at the PR code, pipelineExecutor is always passed through constructors and the old two-argument MessageProcessor constructor was removed. All construction paths now require a pipelineExecutor. The suggestion addresses a theoretical null case that doesn't exist in the current codebase, making it low impact.

Low
General
Fix test to not call completion after failure

The test calls onFailure followed by onCompletion with no exception, which means
future.completeExceptionally is called first and then future.complete(null) is
attempted. Since CompletableFuture ignores subsequent completions, the test may not
accurately reflect real failure behavior where onCompletion is not called after
onFailure. The test should only invoke onFailure without calling onCompletion to
properly test the failure path.

server/src/test/java/org/opensearch/indices/pollingingest/IngestPipelineExecutorTests.java [101-118]

 public void testExecutePipelines_Failure() {
     doAnswer(invocation -> {
         BiConsumer<Integer, Exception> onFailure = invocation.getArgument(2);
-        BiConsumer<Thread, Exception> onCompletion = invocation.getArgument(3);
         onFailure.accept(0, new RuntimeException("processor failed"));
-        onCompletion.accept(Thread.currentThread(), null);
         return null;
     }).when(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString());
Suggestion importance[1-10]: 4

__

Why: The test calls onFailure then onCompletion with no exception, but since CompletableFuture ignores subsequent completions after completeExceptionally, the test still works correctly. However, the suggestion is valid in that it better reflects the real failure scenario and makes the test intent clearer.

Low
Add descriptive message to null check

The constructor now requires a non-null ingestService via Objects.requireNonNull,
but the old no-arg constructor that passed null was removed. Any existing callers
that relied on the two-argument constructor (without IngestService) will now fail
with a NullPointerException rather than a clear error. Ensure all call sites have
been updated, and consider adding a descriptive null-check message.

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java [68-75]

 public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory, IngestService ingestService) {
     super(engineConfig);
     this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
     this.pipelineExecutor = new IngestPipelineExecutor(
-        Objects.requireNonNull(ingestService),
+        Objects.requireNonNull(ingestService, "IngestService must not be null for IngestionEngine"),
         engineConfig.getIndexSettings().getIndex().getName(),
         engineConfig.getIndexSettings()
     );
Suggestion importance[1-10]: 2

__

Why: This suggestion only adds a descriptive message to Objects.requireNonNull, which is a minor style improvement with minimal functional impact. The existing code already enforces non-null via Objects.requireNonNull.

Low
Suggestions up to commit e0dc3f4
CategorySuggestion                                                                                                                                    Impact
Possible issue
Avoid race condition on volatile pipeline field

resolvedFinalPipeline is a volatile field that can be updated concurrently by
updateFinalPipeline. Between the hasPipelines() check at the top of executePipelines
and this line, the value could change from non-null to null (or vice versa). Capture
the value in a local variable at the start of the method to ensure consistency
throughout the execution.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [120]

-indexRequest.setFinalPipeline(resolvedFinalPipeline != null ? resolvedFinalPipeline : IngestService.NOOP_PIPELINE_NAME);
+final String currentFinalPipeline = resolvedFinalPipeline;
+if (currentFinalPipeline == null) {
+    return sourceMap;
+}
+// ... build IndexRequest ...
+indexRequest.setFinalPipeline(currentFinalPipeline);
Suggestion importance[1-10]: 7

__

Why: This is a valid concurrency concern: resolvedFinalPipeline is volatile and can be updated between the hasPipelines() check and the setFinalPipeline call. Capturing it in a local variable at the start of executePipelines would eliminate this TOCTOU race condition.

Medium
Ensure per-item pipeline failures propagate via future

The failureRef is set in the per-item failure callback, but future.get() is only
checked for ExecutionException (from completeExceptionally). If the per-item failure
callback fires but the completion callback fires without an exception, future.get()
will succeed and the failure check happens after. However, if
future.completeExceptionally is never called (only failureRef is set), the future
completes normally and the failure is caught by the post-get() check. This is
fragile: if the completion callback is never called (e.g., due to a bug or timeout
in the ingest service), the future will block until timeout. More critically, if
both failureRef is set AND future.completeExceptionally is called, the
ExecutionException wraps the completion exception, not the per-item failure.
Consider completing the future exceptionally when a per-item failure is detected to
ensure consistent error propagation.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [130-136]

-ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), (slot, e) -> failureRef.set(e), (thread, e) -> {
+ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), (slot, e) -> {
+    failureRef.set(e);
+    future.completeExceptionally(e);
+}, (thread, e) -> {
     if (e != null) {
         future.completeExceptionally(e);
     } else {
         future.complete(null);
     }
 }, slot -> dropped.set(true), ThreadPool.Names.WRITE);
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies a potential inconsistency: if only failureRef is set (per-item failure) but future is completed normally, the error handling relies on a post-get() check. Completing the future exceptionally on per-item failure would make error propagation more consistent and robust, though the current code does handle it via failureRef.get() after future.get().

Low
Use correct sequence number for dropped document NoOp

Using hardcoded seqNo=0 and primaryTerm=1 for the Engine.NoOp operation may conflict
with actual sequence number tracking in the engine, potentially causing consistency
issues. The sequence number and primary term should be obtained from the engine's
current state (e.g., UNASSIGNED_SEQ_NO and the actual primary term) to avoid
corrupting sequence number accounting.

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java [247-254]

 operation = new Engine.NoOp(
-    0,
-    1,
+    UNASSIGNED_SEQ_NO,
+    engine.config().getPrimaryTermSupplier().getAsLong(),
     Engine.Operation.Origin.PRIMARY,
     System.nanoTime(),
     "Document dropped by ingest pipeline"
 );
 return new MessageOperation(operation, opType);
Suggestion importance[1-10]: 6

__

Why: Using hardcoded seqNo=0 and primaryTerm=1 for Engine.NoOp could conflict with the engine's sequence number tracking. However, the improved_code references engine which is a field of MessageProcessor, not directly accessible in getOperation, so the suggested fix may not compile as-is without additional changes.

Low
General
Replace fixed sleep with reliable async assertion

Using Thread.sleep(5000) is a fragile approach for waiting in integration tests, as
it may be too short on slow CI environments or unnecessarily slow on fast ones. The
same pattern is repeated in testPipelineMutatingIdIsBlocked (with 10 seconds) and
testFieldMappingWithDropPipeline. Consider using waitForState with a condition that
checks the document count, or at minimum use the existing waitForState helper with a
timeout to make the test more reliable and faster.

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java [85-97]

 public void testFinalPipelineDropsDocument() throws Exception {
     createPipeline("drop_pipeline", "{\"processors\": [{\"drop\": {}}]}");
 
     produceData("1", "alice", "25");
     produceData("2", "bob", "30");
 
     createIndexWithPipeline("drop_pipeline", 1, 0);
 
-    Thread.sleep(5000);
-    refresh(indexName);
-    SearchResponse response = client().prepareSearch(indexName).get();
-    assertThat(response.getHits().getTotalHits().value(), is(0L));
+    // Wait briefly then assert no documents were indexed
+    assertBusy(() -> {
+        refresh(indexName);
+        SearchResponse response = client().prepareSearch(indexName).get();
+        assertThat(response.getHits().getTotalHits().value(), is(0L));
+    }, 15, TimeUnit.SECONDS);
 }
Suggestion importance[1-10]: 5

__

Why: Using Thread.sleep in integration tests is fragile and can cause flaky tests on slow CI environments. The suggestion to use assertBusy is a valid improvement for test reliability, though verifying "no documents" with a busy-wait is inherently tricky since the condition may be true before messages are even processed.

Low
Suggestions up to commit 04af53e
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix race condition in pipeline failure handling

The failureRef is set in the per-item failure callback, but future.get() will
succeed (not throw) even when a per-item failure occurs. After future.get() returns
normally, failureRef.get() is checked, but if the completion callback fires before
the failure callback sets failureRef, there is a race condition. Additionally, when
failureRef is set, the future should be completed to avoid blocking until timeout.
Consider completing the future in the failure callback as well, or using the failure
to complete the future exceptionally.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [130-136]

-ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), (slot, e) -> failureRef.set(e), (thread, e) -> {
+ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), (slot, e) -> {
+    if (e != null) {
+        failureRef.set(e);
+    }
+}, (thread, e) -> {
     if (e != null) {
         future.completeExceptionally(e);
+    } else if (failureRef.get() != null) {
+        future.completeExceptionally(failureRef.get());
     } else {
         future.complete(null);
     }
 }, slot -> dropped.set(true), ThreadPool.Names.WRITE);
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a real race condition: the per-item failure callback sets failureRef but doesn't complete the future, so if the completion callback fires before failureRef is set, the future completes normally and the failure check after future.get() could miss it. The improved code properly integrates failure handling into the completion callback.

Medium
Prevent TOCTOU race on volatile pipeline field

The resolvedFinalPipeline field is volatile and is read twice: once in
hasPipelines() and once here. Between these two reads, a concurrent call to
updateFinalPipeline could set it to null, causing a NullPointerException or
incorrect behavior. Capture the value in a local variable at the start of
executePipelines to ensure consistency throughout the method.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [120]

-indexRequest.setFinalPipeline(resolvedFinalPipeline != null ? resolvedFinalPipeline : IngestService.NOOP_PIPELINE_NAME);
+final String currentFinalPipeline = resolvedFinalPipeline;
+if (currentFinalPipeline == null) {
+    return sourceMap;
+}
+// ... use currentFinalPipeline instead of resolvedFinalPipeline below
+indexRequest.setFinalPipeline(currentFinalPipeline);
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies a TOCTOU race on the volatile resolvedFinalPipeline field — it's read in hasPipelines() and again later in executePipelines. Capturing it in a local variable at the start of the method is a valid and standard fix for this pattern.

Low
General
Replace fixed sleeps with bounded polling assertions

Using Thread.sleep with fixed durations (5000ms, 10000ms) to wait for negative
conditions (asserting zero documents) is fragile and can cause flaky tests on slow
CI environments or false positives on fast ones. Consider using a waitForState with
a timeout that polls and verifies the condition, or at minimum use a consistent
helper that retries the assertion.

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java [93-97]

-public void testFinalPipelineDropsDocument() throws Exception {
-    ...
-    Thread.sleep(5000);
+// Instead of Thread.sleep(5000), use a bounded wait with a stable check:
+// For "drop" scenarios, wait for the poller to have processed messages, then assert 0 docs.
+// Example approach:
+assertBusy(() -> {
     refresh(indexName);
     SearchResponse response = client().prepareSearch(indexName).get();
     assertThat(response.getHits().getTotalHits().value(), is(0L));
-}
-...
-public void testPipelineMutatingIdIsBlocked() throws Exception {
-    ...
-    Thread.sleep(10000);
-    refresh(indexName);
-    SearchResponse response = client().prepareSearch(indexName).get();
-    assertThat(response.getHits().getTotalHits().value(), is(0L));
-}
+}, 30, TimeUnit.SECONDS);
Suggestion importance[1-10]: 5

__

Why: Using Thread.sleep with hardcoded durations for negative condition checks is a known source of flaky tests. The suggestion to use assertBusy with a timeout is a valid improvement for test reliability, though the existing_code snippet doesn't exactly match the PR diff format (it spans two separate methods).

Low
Ensure non-null IngestService at construction

The constructor now requires a non-null IngestService via Objects.requireNonNull,
but the test file passes mock(IngestService.class) which is fine. However, the old
no-arg constructor that passed null has been removed, and callers that previously
relied on null being acceptable will now throw NullPointerException at construction
time rather than gracefully degrading. Ensure all call sites (including production
code paths where IngestService may not yet be initialized) always provide a valid
non-null IngestService.

server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngine.java [68-75]

 public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory, IngestService ingestService) {
     super(engineConfig);
     this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
+    Objects.requireNonNull(ingestService, "IngestService must not be null");
     this.pipelineExecutor = new IngestPipelineExecutor(
-        Objects.requireNonNull(ingestService),
+        ingestService,
         engineConfig.getIndexSettings().getIndex().getName(),
         engineConfig.getIndexSettings()
     );
Suggestion importance[1-10]: 2

__

Why: The improved_code is functionally equivalent to the existing code since IngestPipelineExecutor's constructor already calls Objects.requireNonNull(ingestService). Adding a redundant null check in IngestionEngine provides no additional safety and the suggestion is essentially a no-op improvement.

Low
Suggestions up to commit 28c9752
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent race condition on volatile pipeline field reads

The field resolvedFinalPipeline is volatile and is read twice: once in
hasPipelines() and once here. Between these two reads, a concurrent call to
updateFinalPipeline could set it to null, causing a NullPointerException or
incorrect behavior. Capture the value in a local variable at the start of
executePipelines to ensure consistency throughout the method.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [116]

-indexRequest.setFinalPipeline(resolvedFinalPipeline != null ? resolvedFinalPipeline : IngestService.NOOP_PIPELINE_NAME);
+final String currentFinalPipeline = resolvedFinalPipeline;
+if (currentFinalPipeline == null) {
+    return sourceMap;
+}
+// ... use currentFinalPipeline instead of resolvedFinalPipeline below
+indexRequest.setFinalPipeline(currentFinalPipeline);
Suggestion importance[1-10]: 7

__

Why: The resolvedFinalPipeline volatile field is read multiple times in executePipelines, and a concurrent updateFinalPipeline call between reads could cause inconsistency or NPE. Capturing the value in a local variable at the start of the method is a correct and important fix for thread safety.

Medium
Fix potential race condition in pipeline failure handling

The failureRef is set in the per-item failure callback, but future.get() is called
after without checking if the future completed exceptionally due to the completion
callback. If the completion callback fires with null error but failureRef is set,
the code correctly throws after future.get(). However, if the completion callback
fires before failureRef is set (race condition in async execution), the failure may
be missed. Consider completing the future exceptionally directly in the per-item
failure callback to ensure the failure is always propagated.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [126-132]

-ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), (slot, e) -> failureRef.set(e), (thread, e) -> {
+ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), (slot, e) -> {
+    if (e != null) {
+        failureRef.set(e);
+        future.completeExceptionally(e);
+    }
+}, (thread, e) -> {
     if (e != null) {
         future.completeExceptionally(e);
     } else {
         future.complete(null);
     }
 }, slot -> dropped.set(true), ThreadPool.Names.WRITE);
Suggestion importance[1-10]: 6

__

Why: The suggestion identifies a valid potential race condition where failureRef could be set after future.get() returns if the completion callback fires before the per-item failure callback. Completing the future exceptionally in the per-item failure callback ensures the failure is always propagated. However, in practice, IngestService.executeBulkRequest likely calls the per-item failure before the completion callback, making this a low-probability issue.

Low
General
Replace fixed sleep with reliable assertion in tests

Using Thread.sleep(5000) is a fragile approach for verifying that no documents are
indexed, as it relies on a fixed wait time that may be insufficient in slow
environments or wasteful in fast ones. Consider using a waitForState with a timeout
that asserts the count remains zero, or at minimum use a longer, configurable sleep.
This pattern appears in both testFinalPipelineDropsDocument and
testFieldMappingWithDropPipeline.

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java [93-96]

-Thread.sleep(5000);
-refresh(indexName);
-SearchResponse response = client().prepareSearch(indexName).get();
-assertThat(response.getHits().getTotalHits().value(), is(0L));
+// Wait a reasonable time and verify no documents are indexed
+assertBusy(() -> {
+    refresh(indexName);
+    SearchResponse response = client().prepareSearch(indexName).get();
+    assertThat(response.getHits().getTotalHits().value(), is(0L));
+}, 30, java.util.concurrent.TimeUnit.SECONDS);
Suggestion importance[1-10]: 5

__

Why: Using Thread.sleep(5000) is fragile in integration tests and can cause flakiness. Using assertBusy with a timeout is a more robust approach, though the test is specifically verifying a negative condition (no documents indexed), which makes assertBusy slightly less natural but still better than a fixed sleep.

Low
Add null check for indexSettings parameter

The indexSettings parameter is not null-checked before use, which will throw a
NullPointerException with no helpful message if null is passed. Add a
Objects.requireNonNull check for consistency with the other parameters and to
provide a clear error message.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [56-61]

 public IngestPipelineExecutor(IngestService ingestService, String index, IndexSettings indexSettings) {
     this.ingestService = Objects.requireNonNull(ingestService);
     this.index = Objects.requireNonNull(index);
+    Objects.requireNonNull(indexSettings, "indexSettings must not be null");
     indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexSettings.FINAL_PIPELINE, this::updateFinalPipeline);
     updateFinalPipeline(IndexSettings.FINAL_PIPELINE.get(indexSettings.getSettings()));
 }
Suggestion importance[1-10]: 3

__

Why: While adding a null check for indexSettings improves consistency and provides a clearer error message, this is a minor defensive programming improvement. The NPE would still be thrown on the next line without the check, just with a less descriptive message.

Low
Suggestions up to commit b771d73
CategorySuggestion                                                                                                                                    Impact
Possible issue
Volatile field read is not thread-safe across method

Since executePipelines is only called when hasPipelines() returns true (i.e.,
resolvedFinalPipeline != null), the null check here is redundant. However,
resolvedFinalPipeline is volatile and could theoretically be set to null between the
hasPipelines() check and this line due to a concurrent dynamic settings update.
Capture the value in a local variable at the start of the method to ensure
consistency throughout the execution.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [116]

-indexRequest.setFinalPipeline(resolvedFinalPipeline != null ? resolvedFinalPipeline : IngestService.NOOP_PIPELINE_NAME);
+public Map<String, Object> executePipelines(String id, Map<String, Object> sourceMap) throws Exception {
+    final String finalPipeline = resolvedFinalPipeline;
+    if (finalPipeline == null) {
+        return sourceMap;
+    }
 
+    IndexRequest indexRequest = new IndexRequest(index);
+    indexRequest.id(id);
+    indexRequest.source(sourceMap);
+
+    indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
+    indexRequest.setFinalPipeline(finalPipeline);
+    indexRequest.isPipelineResolved(true);
+
Suggestion importance[1-10]: 7

__

Why: This is a valid thread-safety concern — resolvedFinalPipeline is volatile and could change between the hasPipelines() check and its use later in the method. Capturing it in a local variable at the start of executePipelines is a correct and important fix to ensure consistent behavior during concurrent dynamic settings updates.

Medium
Unify failure handling into the future completion path

The failureRef is set in the per-item failure callback, but future.complete(null) is
called in the completion callback regardless of whether failureRef was set. This
means a per-item failure will not cause future.completeExceptionally, so the
future.get() call will succeed and the failure check happens after — but if the
completion callback also fires an exception, both paths compete. More critically, if
failureRef is set but future completes normally, the exception is only caught after
the blocking get(), which is correct but fragile. Consider completing the future
exceptionally when failureRef is set to make the failure path consistent and avoid
the dual-check pattern.

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java [126-132]

 ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), (slot, e) -> failureRef.set(e), (thread, e) -> {
     if (e != null) {
         future.completeExceptionally(e);
+    } else if (failureRef.get() != null) {
+        future.completeExceptionally(failureRef.get());
     } else {
         future.complete(null);
     }
 }, slot -> dropped.set(true), ThreadPool.Names.WRITE);
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies a potential race condition where failureRef is set but future completes normally, requiring a post-get() check. Completing the future exceptionally when failureRef is set would make the failure path more consistent and eliminate the dual-check pattern. This is a valid improvement to error handling robustness.

Low
General
Pipeline error stage is unused in error handling

The pipeline exception is wrapped in a new RuntimeException and re-thrown, but the
outer process() method catches Exception and wraps it again in another
RuntimeException. This double-wrapping makes it harder to identify the root cause.
The pipeline failure should propagate with the original cause preserved, or the
error strategy should be applied at this level using the PIPELINE error stage that
was added to IngestionErrorStrategy.ErrorStage.

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java [259-261]

 } catch (Exception e) {
-    throw new RuntimeException("Ingest pipeline execution failed", e);
+    throw new RuntimeException("Ingest pipeline execution failed: " + e.getMessage(), e);
 }
Suggestion importance[1-10]: 3

__

Why: The improved_code only changes the exception message string, which doesn't address the actual concern raised about double-wrapping or using the PIPELINE error stage. The suggestion content describes a more meaningful change than what the improved_code demonstrates, making it inconsistent.

Low
Replace fixed sleep with reliable wait mechanism

Using Thread.sleep(5000) as a fixed wait is fragile — it may be too short on slow CI
environments or unnecessarily slow otherwise. The same test class uses
waitForState() for polling-based assertions. Consider using a waitForState() with a
timeout that asserts the count remains 0, or at minimum use a longer sleep with a
comment explaining the rationale.

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java [85-97]

 public void testFinalPipelineDropsDocument() throws Exception {
-    ...
-    Thread.sleep(5000);
+    createPipeline("drop_pipeline", "{\"processors\": [{\"drop\": {}}]}");
+
+    produceData("1", "alice", "25");
+    produceData("2", "bob", "30");
+
+    createIndexWithPipeline("drop_pipeline", 1, 0);
+
+    // Wait briefly to allow any potential indexing to occur, then assert no docs indexed
+    Thread.sleep(8000);
     refresh(indexName);
     SearchResponse response = client().prepareSearch(indexName).get();
     assertThat(response.getHits().getTotalHits().value(), is(0L));
 }
Suggestion importance[1-10]: 3

__

Why: The improved_code still uses Thread.sleep (just with a longer duration of 8000ms instead of 5000ms), which doesn't meaningfully address the fragility concern. The suggestion content recommends using waitForState() but the improved code doesn't implement that approach.

Low

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 6771f6e: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

imRishN added 2 commits March 15, 2026 20:49
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 9d488ed

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 083f57c: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 8297892: SUCCESS

imRishN added 2 commits March 26, 2026 23:20
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 2028d16: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 23edbca: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for a7789ae: SUCCESS

Copy link
Copy Markdown
Contributor

@varunbharadwaj varunbharadwaj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Thanks for the change. We will revisit the same-thread pipeline execution in a subsequent PR.

@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for eeff066: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 7f06dcd: SUCCESS

@varunbharadwaj varunbharadwaj merged commit 0715181 into opensearch-project:main Mar 27, 2026
24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants