Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
16f6360
Wire IngestService from IngestionEngine to MessageProcessor for pull-…
imRishN Mar 3, 2026
2425d7b
Implement pipeline execution in MessageProcessor for pull-based inges…
imRishN Mar 6, 2026
1639bea
Remove nullable
imRishN Mar 11, 2026
6771f6e
Add ingest pipeline support for pull-based ingestion
imRishN Mar 15, 2026
0b295fb
Minor
imRishN Mar 15, 2026
9d488ed
Add changelog
imRishN Mar 15, 2026
3a35581
Add IT for ingest pipeline integration with pull based ingestion
imRishN Mar 16, 2026
be0f220
Add IT for field_mapping + final_pipeline combined for Pull Based Ing…
imRishN Mar 17, 2026
b771d73
Register listener in ctor
imRishN Mar 23, 2026
28c9752
Refactor to create a single IngestPipelineExecutor instance in Ingest…
imRishN Mar 23, 2026
04af53e
Add javadoc
imRishN Mar 23, 2026
e0dc3f4
Merge remote-tracking branch 'origin/main' into pipeline-integration
imRishN Mar 23, 2026
9a50135
Address AI bot comments
imRishN Mar 23, 2026
083f57c
Trigger build
imRishN Mar 24, 2026
ba7ac5b
Add todo for sync pipeline
imRishN Mar 25, 2026
8297892
Merge remote-tracking branch 'origin/main' into pipeline-integration
imRishN Mar 25, 2026
d88b7e2
Address comments
imRishN Mar 26, 2026
d43597f
Merge IT
imRishN Mar 26, 2026
2028d16
Merge remote-tracking branch 'origin/main' into pipeline-integration
imRishN Mar 26, 2026
d8bdfa1
Minor changes
imRishN Mar 27, 2026
23edbca
Merge remote-tracking branch 'origin/main' into pipeline-integration
imRishN Mar 27, 2026
a7789ae
Trigger build
imRishN Mar 27, 2026
eeff066
Merge remote-tracking branch 'origin/main' into pipeline-integration
imRishN Mar 27, 2026
7f06dcd
Trigger build
imRishN Mar 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement FieldMappingIngestionMessageMapper for pull-based ingestion ([#20729](https://github.com/opensearch-project/OpenSearch/pull/20729))
- Added support of WarmerRefreshListener in NRTReplicationEngine to trigger warmer after replication on replica shards ([#20650](https://github.com/opensearch-project/OpenSearch/pull/20650))
- WLM group custom search settings - groundwork and timeout ([#20536](https://github.com/opensearch-project/OpenSearch/issues/20536))
- Add ingest pipeline support for pull-based ingestion ([#20873](https://github.com/opensearch-project/OpenSearch/issues/20873))
- Expose JVM runtime metrics via telemetry framework ([#20844](https://github.com/opensearch-project/OpenSearch/pull/20844))
- Add intra segment support for single-value metric aggregations ([#20503](https://github.com/opensearch-project/OpenSearch/pull/20503))
- Add new setting property 'Sensitive' for tiering dynamic settings ([#20901](https://github.com/opensearch-project/OpenSearch/pull/20901))
Expand Down
2 changes: 2 additions & 0 deletions plugins/ingestion-kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ dependencies {
testImplementation "org.apache.commons:commons-lang3:${versions.commonslang}"
testImplementation "commons-io:commons-io:${versions.commonsio}"
testImplementation 'org.awaitility:awaitility:4.2.0'
testImplementation project(':modules:ingest-common')
testImplementation project(':modules:lang-painless')
}

internalClusterTest{
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.translog.listener.CompositeTranslogEventListener;
import org.opensearch.indices.pollingingest.DefaultStreamPoller;
import org.opensearch.indices.pollingingest.IngestPipelineExecutor;
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
import org.opensearch.indices.pollingingest.IngestionSettings;
import org.opensearch.indices.pollingingest.PollingIngestStats;
Expand All @@ -62,17 +63,17 @@ public class IngestionEngine extends InternalEngine {
private StreamPoller streamPoller;
private final IngestionConsumerFactory ingestionConsumerFactory;
private final DocumentMapperForType documentMapperForType;
private final IngestService ingestService;
private final IngestPipelineExecutor pipelineExecutor;
private volatile IngestionShardPointer lastCommittedBatchStartPointer;

public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory) {
this(engineConfig, ingestionConsumerFactory, null);
}

public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory, IngestService ingestService) {
super(engineConfig);
this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
this.ingestService = ingestService;
this.pipelineExecutor = new IngestPipelineExecutor(
Objects.requireNonNull(ingestService),
engineConfig.getIndexSettings().getIndex().getName(),
engineConfig.getIndexSettings()
);
this.documentMapperForType = engineConfig.getDocumentMapperForTypeSupplier().get();
registerDynamicIndexSettingsHandlers();
}
Expand Down Expand Up @@ -156,6 +157,7 @@ private void initializeStreamPoller(
.pointerBasedLagUpdateInterval(ingestionSource.getPointerBasedLagUpdateInterval().millis())
.mapperType(ingestionSource.getMapperType())
.mapperSettings(ingestionSource.getMapperSettings())
.pipelineExecutor(pipelineExecutor)
.warmupConfig(ingestionSource.getWarmupConfig())
.build();
registerStreamPollerListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,22 @@ private DefaultStreamPoller(
long pointerBasedLagUpdateIntervalMs,
IngestionMessageMapper.MapperType mapperType,
Map<String, Object> mapperSettings,
IngestPipelineExecutor pipelineExecutor,
IngestionSource.WarmupConfig warmupConfig
) {
this(
startPointer,
consumerFactory,
consumerClientId,
shardId,
new PartitionedBlockingQueueContainer(numProcessorThreads, shardId, ingestionEngine, errorStrategy, blockingQueueSize),
new PartitionedBlockingQueueContainer(
numProcessorThreads,
shardId,
ingestionEngine,
errorStrategy,
blockingQueueSize,
pipelineExecutor
),
resetState,
resetValue,
errorStrategy,
Expand Down Expand Up @@ -754,6 +762,7 @@ public static class Builder {
private long pointerBasedLagUpdateIntervalMs = 10000;
private IngestionMessageMapper.MapperType mapperType = IngestionMessageMapper.MapperType.DEFAULT;
private Map<String, Object> mapperSettings = Collections.emptyMap();
private IngestPipelineExecutor pipelineExecutor;
// Warmup configuration - default matches IndexMetadata settings
private IngestionSource.WarmupConfig warmupConfig = new IngestionSource.WarmupConfig(TimeValue.timeValueMillis(-1), 100L);

Expand Down Expand Up @@ -864,7 +873,14 @@ public Builder mapperSettings(Map<String, Object> mapperSettings) {
}

/**
* Set warmup enabled
* Set pipeline executor for ingest pipeline execution
*/
public Builder pipelineExecutor(IngestPipelineExecutor pipelineExecutor) {
this.pipelineExecutor = pipelineExecutor;
return this;
}

/**
* Set warmup configuration
*/
public Builder warmupConfig(IngestionSource.WarmupConfig warmupConfig) {
Expand Down Expand Up @@ -893,6 +909,7 @@ public DefaultStreamPoller build() {
pointerBasedLagUpdateIntervalMs,
mapperType,
mapperSettings,
pipelineExecutor,
warmupConfig
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.pollingingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.common.Nullable;
import org.opensearch.index.IndexSettings;
import org.opensearch.ingest.IngestService;
import org.opensearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Handles ingest pipeline resolution and execution for pull-based ingestion.
*
* <p>Resolves configured pipelines from index settings at initialization and executes them
* synchronously by bridging IngestService's async callback API with CompletableFuture.
* Also registers a dynamic settings listener to pick up runtime changes to {@code final_pipeline}.
* Only {@code final_pipeline} is supported.
*
* <p>Unlike push-based indexing, pipeline execution in pull-based ingestion does not require the
* node to have the {@code ingest} role. Transformations are executed locally on the node hosting the
* shard, and requests are not forwarded to dedicated ingest nodes.
*/
public class IngestPipelineExecutor {

private static final Logger logger = LogManager.getLogger(IngestPipelineExecutor.class);

// TODO: consider making this configurable via index settings if use cases with slow processors arise
static final long PIPELINE_EXECUTION_TIMEOUT_SECONDS = 30;

// TODO: explore synchronous pipeline execution (IngestService.executeBulkRequestSync) to avoid
// thread pool dispatch and execute pipelines directly on the processor thread

private final IngestService ingestService;
private final String index;
private volatile String resolvedFinalPipeline;

/**
* Creates an IngestPipelineExecutor for the given index.
* Resolves the final pipeline from index settings and registers a dynamic settings listener.
*
* @param ingestService the ingest service for pipeline execution
* @param index the index name
* @param indexSettings the index settings to resolve a pipeline from and register listener on
*/
public IngestPipelineExecutor(IngestService ingestService, String index, IndexSettings indexSettings) {
this.ingestService = Objects.requireNonNull(ingestService);
this.index = Objects.requireNonNull(index);
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexSettings.FINAL_PIPELINE, this::updateFinalPipeline);
updateFinalPipeline(IndexSettings.FINAL_PIPELINE.get(indexSettings.getSettings()));
}

/**
* Visible for testing. Creates an executor with a pre-resolved pipeline name,
* bypassing resolution from index settings.
*
* @param ingestService the ingest service
* @param index the index name
* @param finalPipeline the resolved final pipeline name, or null if no pipeline is configured
*/
IngestPipelineExecutor(IngestService ingestService, String index, @Nullable String finalPipeline) {
this.ingestService = Objects.requireNonNull(ingestService);
this.index = Objects.requireNonNull(index);
this.resolvedFinalPipeline = finalPipeline;
}

/**
* Updates the cached final pipeline name. Called on initial resolution and on dynamic settings change.
*/
void updateFinalPipeline(String finalPipeline) {
if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipeline)) {
resolvedFinalPipeline = null;
} else {
resolvedFinalPipeline = finalPipeline;
}
}

/**
* Executes final_pipeline on the source map synchronously using CompletableFuture to bridge
* IngestService's async callback API.
*
* @param id document ID
* @param sourceMap source map to transform
* @return the transformed source map, or null if the document was dropped by the pipeline
* @throws Exception if pipeline execution fails
*/
public Map<String, Object> executePipelines(String id, Map<String, Object> sourceMap) throws Exception {
final String finalPipeline = resolvedFinalPipeline;
if (finalPipeline == null) {
return sourceMap;
}

// Build IndexRequest to carry the document through the pipeline
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.id(id);
indexRequest.source(sourceMap);

indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
indexRequest.setFinalPipeline(finalPipeline);
indexRequest.isPipelineResolved(true);

final String originalId = id;
final String originalRouting = indexRequest.routing();

CompletableFuture<Void> future = new CompletableFuture<>();
AtomicBoolean dropped = new AtomicBoolean(false);

ingestService.executeBulkRequest(
1,
Collections.singletonList(indexRequest),
(slot, e) -> future.completeExceptionally(e),
(thread, e) -> {
if (e != null) {
future.completeExceptionally(e);
} else {
future.complete(null);
}
},
slot -> dropped.set(true),
ThreadPool.Names.WRITE
);

// Block until pipeline execution completes (with timeout)
try {
future.get(PIPELINE_EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException e) {
throw new RuntimeException("Ingest pipeline execution timed out after [" + PIPELINE_EXECUTION_TIMEOUT_SECONDS + "] seconds", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Ingest pipeline execution was interrupted", e);
} catch (ExecutionException e) {
throw new RuntimeException("Ingest pipeline execution failed", e.getCause());
}

if (dropped.get()) {
return null;
}

// verify _id and _routing have not been mutated
if (Objects.equals(originalId, indexRequest.id()) == false) {
throw new IllegalStateException(
"Ingest pipeline attempted to change _id from ["
+ originalId
+ "] to ["
+ indexRequest.id()
+ "]. _id mutations are not allowed in pull-based ingestion."
);
}
if (Objects.equals(originalRouting, indexRequest.routing()) == false) {
throw new IllegalStateException(
"Ingest pipeline attempted to change _routing. _routing mutations are not allowed in pull-based ingestion."
);
}

// _index change is already blocked by final_pipeline semantics in IngestService

return indexRequest.sourceAsMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.indices.pollingingest;

import org.opensearch.cluster.metadata.IngestionSource;
import org.opensearch.common.Nullable;
import org.opensearch.index.IngestionConsumerFactory;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineConfig;
Expand All @@ -27,12 +26,11 @@
public class IngestionEngineFactory implements EngineFactory {

private final IngestionConsumerFactory ingestionConsumerFactory;
@Nullable
private final Supplier<IngestService> ingestServiceSupplier;

public IngestionEngineFactory(IngestionConsumerFactory ingestionConsumerFactory, Supplier<IngestService> ingestServiceSupplier) {
this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
this.ingestServiceSupplier = ingestServiceSupplier;
this.ingestServiceSupplier = Objects.requireNonNull(ingestServiceSupplier);
}

/**
Expand All @@ -45,9 +43,8 @@ public Engine newReadWriteEngine(EngineConfig config) {
IngestionSource ingestionSource = config.getIndexSettings().getIndexMetadata().getIngestionSource();
boolean isAllActiveIngestion = ingestionSource != null && ingestionSource.isAllActiveIngestionEnabled();

IngestService ingestService = ingestServiceSupplier != null ? ingestServiceSupplier.get() : null;
assert ingestService != null || ingestServiceSupplier == null
: "IngestService supplier returned null. This indicates a initialization ordering issue.";
IngestService ingestService = ingestServiceSupplier.get();
assert ingestService != null : "IngestService supplier returned null. This indicates a initialization ordering issue.";

if (isAllActiveIngestion) {
// use ingestion engine on both primary and replica in all-active mode
Expand Down
Loading
Loading