-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Add ingest pipeline support for pull-based ingestion #20873
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
varunbharadwaj
merged 24 commits into
opensearch-project:main
from
imRishN:pipeline-integration
Mar 27, 2026
Merged
Changes from all commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
16f6360
Wire IngestService from IngestionEngine to MessageProcessor for pull-…
imRishN 2425d7b
Implement pipeline execution in MessageProcessor for pull-based inges…
imRishN 1639bea
Remove nullable
imRishN 6771f6e
Add ingest pipeline support for pull-based ingestion
imRishN 0b295fb
Minor
imRishN 9d488ed
Add changelog
imRishN 3a35581
Add IT for ingest pipeline integration with pull based ingestion
imRishN be0f220
Add IT for field_mapping + final_pipeline combined for Pull Based Ing…
imRishN b771d73
Register listener in ctor
imRishN 28c9752
Refactor to create a single IngestPipelineExecutor instance in Ingest…
imRishN 04af53e
Add javadoc
imRishN e0dc3f4
Merge remote-tracking branch 'origin/main' into pipeline-integration
imRishN 9a50135
Address AI bot comments
imRishN 083f57c
Trigger build
imRishN ba7ac5b
Add todo for sync pipeline
imRishN 8297892
Merge remote-tracking branch 'origin/main' into pipeline-integration
imRishN d88b7e2
Address comments
imRishN d43597f
Merge IT
imRishN 2028d16
Merge remote-tracking branch 'origin/main' into pipeline-integration
imRishN d8bdfa1
Minor changes
imRishN 23edbca
Merge remote-tracking branch 'origin/main' into pipeline-integration
imRishN a7789ae
Trigger build
imRishN eeff066
Merge remote-tracking branch 'origin/main' into pipeline-integration
imRishN 7f06dcd
Trigger build
imRishN File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
869 changes: 869 additions & 0 deletions
869
...a/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
175 changes: 175 additions & 0 deletions
175
server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,175 @@ | ||
| /* | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| package org.opensearch.indices.pollingingest; | ||
|
|
||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.opensearch.action.index.IndexRequest; | ||
| import org.opensearch.common.Nullable; | ||
| import org.opensearch.index.IndexSettings; | ||
| import org.opensearch.ingest.IngestService; | ||
| import org.opensearch.threadpool.ThreadPool; | ||
|
|
||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.TimeoutException; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
|
|
||
| /** | ||
| * Handles ingest pipeline resolution and execution for pull-based ingestion. | ||
| * | ||
| * <p>Resolves configured pipelines from index settings at initialization and executes them | ||
| * synchronously by bridging IngestService's async callback API with CompletableFuture. | ||
| * Also registers a dynamic settings listener to pick up runtime changes to {@code final_pipeline}. | ||
| * Only {@code final_pipeline} is supported. | ||
| * | ||
| * <p>Unlike push-based indexing, pipeline execution in pull-based ingestion does not require the | ||
| * node to have the {@code ingest} role. Transformations are executed locally on the node hosting the | ||
| * shard, and requests are not forwarded to dedicated ingest nodes. | ||
| */ | ||
| public class IngestPipelineExecutor { | ||
|
|
||
| private static final Logger logger = LogManager.getLogger(IngestPipelineExecutor.class); | ||
|
|
||
| // TODO: consider making this configurable via index settings if use cases with slow processors arise | ||
| static final long PIPELINE_EXECUTION_TIMEOUT_SECONDS = 30; | ||
imRishN marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // TODO: explore synchronous pipeline execution (IngestService.executeBulkRequestSync) to avoid | ||
| // thread pool dispatch and execute pipelines directly on the processor thread | ||
|
|
||
| private final IngestService ingestService; | ||
| private final String index; | ||
| private volatile String resolvedFinalPipeline; | ||
|
|
||
| /** | ||
| * Creates an IngestPipelineExecutor for the given index. | ||
| * Resolves the final pipeline from index settings and registers a dynamic settings listener. | ||
| * | ||
| * @param ingestService the ingest service for pipeline execution | ||
| * @param index the index name | ||
| * @param indexSettings the index settings to resolve a pipeline from and register listener on | ||
| */ | ||
| public IngestPipelineExecutor(IngestService ingestService, String index, IndexSettings indexSettings) { | ||
varunbharadwaj marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| this.ingestService = Objects.requireNonNull(ingestService); | ||
| this.index = Objects.requireNonNull(index); | ||
| indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexSettings.FINAL_PIPELINE, this::updateFinalPipeline); | ||
varunbharadwaj marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| updateFinalPipeline(IndexSettings.FINAL_PIPELINE.get(indexSettings.getSettings())); | ||
| } | ||
|
|
||
| /** | ||
| * Visible for testing. Creates an executor with a pre-resolved pipeline name, | ||
| * bypassing resolution from index settings. | ||
| * | ||
| * @param ingestService the ingest service | ||
| * @param index the index name | ||
| * @param finalPipeline the resolved final pipeline name, or null if no pipeline is configured | ||
| */ | ||
| IngestPipelineExecutor(IngestService ingestService, String index, @Nullable String finalPipeline) { | ||
| this.ingestService = Objects.requireNonNull(ingestService); | ||
| this.index = Objects.requireNonNull(index); | ||
| this.resolvedFinalPipeline = finalPipeline; | ||
| } | ||
|
|
||
| /** | ||
| * Updates the cached final pipeline name. Called on initial resolution and on dynamic settings change. | ||
| */ | ||
| void updateFinalPipeline(String finalPipeline) { | ||
| if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipeline)) { | ||
| resolvedFinalPipeline = null; | ||
| } else { | ||
| resolvedFinalPipeline = finalPipeline; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Executes final_pipeline on the source map synchronously using CompletableFuture to bridge | ||
| * IngestService's async callback API. | ||
| * | ||
| * @param id document ID | ||
| * @param sourceMap source map to transform | ||
| * @return the transformed source map, or null if the document was dropped by the pipeline | ||
| * @throws Exception if pipeline execution fails | ||
| */ | ||
| public Map<String, Object> executePipelines(String id, Map<String, Object> sourceMap) throws Exception { | ||
imRishN marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| final String finalPipeline = resolvedFinalPipeline; | ||
| if (finalPipeline == null) { | ||
| return sourceMap; | ||
| } | ||
|
|
||
| // Build IndexRequest to carry the document through the pipeline | ||
| IndexRequest indexRequest = new IndexRequest(index); | ||
| indexRequest.id(id); | ||
| indexRequest.source(sourceMap); | ||
|
|
||
| indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); | ||
| indexRequest.setFinalPipeline(finalPipeline); | ||
| indexRequest.isPipelineResolved(true); | ||
|
|
||
| final String originalId = id; | ||
| final String originalRouting = indexRequest.routing(); | ||
|
|
||
| CompletableFuture<Void> future = new CompletableFuture<>(); | ||
| AtomicBoolean dropped = new AtomicBoolean(false); | ||
|
|
||
| ingestService.executeBulkRequest( | ||
| 1, | ||
| Collections.singletonList(indexRequest), | ||
| (slot, e) -> future.completeExceptionally(e), | ||
| (thread, e) -> { | ||
| if (e != null) { | ||
| future.completeExceptionally(e); | ||
| } else { | ||
| future.complete(null); | ||
| } | ||
| }, | ||
| slot -> dropped.set(true), | ||
| ThreadPool.Names.WRITE | ||
imRishN marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ); | ||
|
|
||
| // Block until pipeline execution completes (with timeout) | ||
| try { | ||
| future.get(PIPELINE_EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS); | ||
imRishN marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } catch (TimeoutException e) { | ||
| throw new RuntimeException("Ingest pipeline execution timed out after [" + PIPELINE_EXECUTION_TIMEOUT_SECONDS + "] seconds", e); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new RuntimeException("Ingest pipeline execution was interrupted", e); | ||
| } catch (ExecutionException e) { | ||
| throw new RuntimeException("Ingest pipeline execution failed", e.getCause()); | ||
| } | ||
|
|
||
| if (dropped.get()) { | ||
| return null; | ||
| } | ||
|
|
||
| // verify _id and _routing have not been mutated | ||
| if (Objects.equals(originalId, indexRequest.id()) == false) { | ||
| throw new IllegalStateException( | ||
varunbharadwaj marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| "Ingest pipeline attempted to change _id from [" | ||
| + originalId | ||
| + "] to [" | ||
| + indexRequest.id() | ||
| + "]. _id mutations are not allowed in pull-based ingestion." | ||
| ); | ||
| } | ||
| if (Objects.equals(originalRouting, indexRequest.routing()) == false) { | ||
| throw new IllegalStateException( | ||
| "Ingest pipeline attempted to change _routing. _routing mutations are not allowed in pull-based ingestion." | ||
| ); | ||
| } | ||
|
|
||
| // _index change is already blocked by final_pipeline semantics in IngestService | ||
|
|
||
| return indexRequest.sourceAsMap(); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.