Skip to content

Add IT for ingest pipeline integration with pull based ingestion#20890

Closed
imRishN wants to merge 1 commit into
opensearch-project:mainfrom
imRishN:pipeline-it
Closed

Add IT for ingest pipeline integration with pull based ingestion#20890
imRishN wants to merge 1 commit into
opensearch-project:mainfrom
imRishN:pipeline-it

Conversation

@imRishN
Copy link
Copy Markdown
Member

@imRishN imRishN commented Mar 16, 2026

Description

Add IT for ingest pipeline integration with pull based ingestion

Related Issues

Resolves #20879

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@github-actions github-actions Bot added enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing missing-component labels Mar 16, 2026
@github-actions
Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Flaky Sleep

Tests testFinalPipelineDropsDocument and testPipelineMutatingIdIsBlocked use hardcoded Thread.sleep(5000) and Thread.sleep(10000) respectively instead of the waitForState polling pattern used elsewhere. This makes these tests fragile and slow — they may fail on slow environments or pass too quickly on fast ones.

Thread.sleep(5000);
refresh(indexName);
SearchResponse response = client().prepareSearch(indexName).get();
assertThat(response.getHits().getTotalHits().value(), is(0L));
Missing Assertion

testPipelineNotCalledForDeletes verifies that a document is deleted after a delete message is produced, but does not assert that the pipeline was NOT called during the delete operation (e.g., no processed field was added before deletion). The test name implies a behavioral guarantee that isn't fully validated.

public void testPipelineNotCalledForDeletes() throws Exception {
    createPipeline("add_field_pipeline", "{\"processors\": [{\"set\": {\"field\": \"processed\", \"value\": true}}]}");

    // Produce an index message, then a delete message
    produceData("1", "alice", "25", defaultMessageTimestamp, "index");
    createIndexWithPipeline("add_field_pipeline", 1, 0);

    // Wait for the document to be indexed
    waitForState(() -> {
        refresh(indexName);
        SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get();
        return response.getHits().getTotalHits().value() == 1;
    });

    // Now delete the document
    produceData("1", "alice", "25", defaultMessageTimestamp, "delete");

    // Verify document is deleted
    waitForState(() -> {
        refresh(indexName);
        SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get();
        return response.getHits().getTotalHits().value() == 0;
    });
}
Unbounded Wait

The waitForState helper is used throughout without a visible timeout. If the Kafka consumer or pipeline never processes a message (e.g., due to a bug), the test will hang indefinitely. Ensure waitForState has a reasonable timeout and throws a clear failure if exceeded.

waitForState(() -> {
    refresh(indexName);
    SearchResponse response = client().prepareSearch(indexName).get();
    if (response.getHits().getTotalHits().value() < 2) return false;
    Map<String, Object> source1 = response.getHits().getHits()[0].getSourceAsMap();
    Map<String, Object> source2 = response.getHits().getHits()[1].getSourceAsMap();
    return Boolean.TRUE.equals(source1.get("processed")) && Boolean.TRUE.equals(source2.get("processed"));
});
Silent Cleanup Failure

In cleanUpPipelines, exceptions are silently ignored. If pipeline deletion fails for a non-trivial reason (e.g., cluster state issue), subsequent tests may be affected by leftover pipelines without any warning or log output.

public void cleanUpPipelines() {
    try {
        GetPipelineResponse response = client().admin().cluster().getPipeline(new GetPipelineRequest("*")).actionGet();
        for (PipelineConfiguration pipeline : response.pipelines()) {
            client().admin().cluster().deletePipeline(new DeletePipelineRequest(pipeline.getId())).actionGet();
        }
    } catch (Exception e) {
        // ignore
    }
}

@github-actions
Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Assert pipeline creation succeeds in helper

The createPipeline helper does not assert that the pipeline was successfully
created. If the putPipeline call fails silently, subsequent tests will produce
misleading failures. Add an assertAcked call (already used elsewhere in the class)
to ensure the pipeline creation is validated.

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java [671-676]

 private void createPipeline(String pipelineId, String pipelineJson) {
-    client().admin()
-        .cluster()
-        .putPipeline(new PutPipelineRequest(pipelineId, new BytesArray(pipelineJson), MediaTypeRegistry.JSON))
-        .actionGet();
+    assertAcked(
+        client().admin()
+            .cluster()
+            .putPipeline(new PutPipelineRequest(pipelineId, new BytesArray(pipelineJson), MediaTypeRegistry.JSON))
+            .actionGet()
+    );
 }
Suggestion importance[1-10]: 5

__

Why: Adding assertAcked to the createPipeline helper is a reasonable improvement to catch silent failures early and provide clearer error messages. However, putPipeline().actionGet() would typically throw an exception on failure, so the practical impact is moderate.

Low
General
Replace fixed sleep with polling mechanism

Using Thread.sleep(5000) is a fragile approach for waiting on asynchronous
operations, as it may cause flaky tests on slow environments or waste time on fast
ones. Replace the fixed sleep with the existing waitForState polling mechanism used
elsewhere in the test class to reliably wait for the expected condition.

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java [85-97]

 public void testFinalPipelineDropsDocument() throws Exception {
     createPipeline("drop_pipeline", "{\"processors\": [{\"drop\": {}}]}");
 
     produceData("1", "alice", "25");
     produceData("2", "bob", "30");
 
     createIndexWithPipeline("drop_pipeline", 1, 0);
 
-    Thread.sleep(5000);
+    // Wait a bit then verify no documents are indexed (drop pipeline discards all)
+    waitForState(() -> {
+        refresh(indexName);
+        SearchResponse response = client().prepareSearch(indexName).get();
+        // Since all docs are dropped, we can only poll until a stable state is reached
+        return response.getHits().getTotalHits().value() == 0;
+    });
     refresh(indexName);
     SearchResponse response = client().prepareSearch(indexName).get();
     assertThat(response.getHits().getTotalHits().value(), is(0L));
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion to replace Thread.sleep(5000) with waitForState is valid for test reliability, but the waitForState approach for a "drop" scenario is logically problematic — it would immediately return true since the count starts at 0 and never increases. The improved code adds redundant polling that doesn't actually improve the test logic, making this a marginal improvement at best.

Low

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for c0a0d0b: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@varunbharadwaj
Copy link
Copy Markdown
Contributor

Let's include these ITs along with the integration PR, to avoid backporting later on in case the 3.6 release is cut before both are merged.

@imRishN
Copy link
Copy Markdown
Member Author

imRishN commented Mar 20, 2026

Yes, closing this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing missing-component skip-changelog

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add IT for final_pipeline support with streaming messages in Pull Based Ingestion

2 participants