Skip to content

Commit b771d73

Browse files
committed
Register listener in ctor
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
1 parent be0f220 commit b771d73

4 files changed

Lines changed: 15 additions & 31 deletions

File tree

server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@
2929
/**
3030
* Handles ingest pipeline resolution and execution for pull-based ingestion.
3131
*
32-
* <p>Resolves configured pipelines from index settings (lazy, cached) and executes them
32+
* <p>Resolves configured pipelines from index settings at initialization and executes them
3333
* synchronously by bridging IngestService's async callback API with CompletableFuture.
34+
* Also registers a dynamic settings listener to pick up runtime changes to {@code final_pipeline}.
3435
* Only {@code final_pipeline} is supported.
3536
*/
3637
public class IngestPipelineExecutor {
@@ -42,25 +43,26 @@ public class IngestPipelineExecutor {
4243

4344
private final IngestService ingestService;
4445
private final String index;
45-
46-
// Cached pipeline names — resolved lazily on the first document
4746
private volatile String resolvedFinalPipeline;
48-
private volatile boolean pipelinesResolved = false;
4947

5048
/**
5149
* Creates an IngestPipelineExecutor for the given index.
50+
* Resolves the final pipeline from index settings and registers a dynamic settings listener.
5251
*
5352
* @param ingestService the ingest service for pipeline execution
5453
* @param index the index name
54+
* @param indexSettings the index settings to resolve a pipeline from and register listener on
5555
*/
56-
public IngestPipelineExecutor(IngestService ingestService, String index) {
56+
public IngestPipelineExecutor(IngestService ingestService, String index, IndexSettings indexSettings) {
5757
this.ingestService = Objects.requireNonNull(ingestService);
5858
this.index = Objects.requireNonNull(index);
59+
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexSettings.FINAL_PIPELINE, this::updateFinalPipeline);
60+
updateFinalPipeline(IndexSettings.FINAL_PIPELINE.get(indexSettings.getSettings()));
5961
}
6062

6163
/**
6264
* Visible for testing. Creates an executor with a pre-resolved pipeline name,
63-
* bypassing lazy resolution from index settings.
65+
* bypassing resolution from index settings.
6466
*
6567
* @param ingestService the ingest service
6668
* @param index the index name
@@ -70,23 +72,6 @@ public IngestPipelineExecutor(IngestService ingestService, String index) {
7072
this.ingestService = Objects.requireNonNull(ingestService);
7173
this.index = Objects.requireNonNull(index);
7274
this.resolvedFinalPipeline = finalPipeline;
73-
this.pipelinesResolved = true;
74-
}
75-
76-
/**
77-
* Resolves pipeline names from index settings. Called lazily on first document and cached.
78-
* Also registers a dynamic settings listener for final_pipeline updates.
79-
*/
80-
void resolvePipelineNames(IndexSettings indexSettings) {
81-
if (pipelinesResolved) {
82-
return;
83-
}
84-
updateFinalPipeline(IndexSettings.FINAL_PIPELINE.get(indexSettings.getSettings()));
85-
86-
// Register dynamic settings listener for final_pipeline updates
87-
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexSettings.FINAL_PIPELINE, this::updateFinalPipeline);
88-
89-
pipelinesResolved = true;
9075
}
9176

9277
/**

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,7 @@ static class MessageProcessor {
126126
String indexName = engine.config().getIndexSettings().getIndex().getName();
127127
this.engine = engine;
128128
this.index = indexName;
129-
this.pipelineExecutor = new IngestPipelineExecutor(ingestService, indexName);
130-
this.pipelineExecutor.resolvePipelineNames(engine.config().getIndexSettings());
129+
this.pipelineExecutor = new IngestPipelineExecutor(ingestService, indexName, engine.config().getIndexSettings());
131130
}
132131

133132
/**

server/src/test/java/org/opensearch/indices/pollingingest/IngestPipelineExecutorTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,17 @@ public void setUp() throws Exception {
3939
// --- Construction tests ---
4040

4141
public void testConstructorRequiresNonNullIngestService() {
42-
expectThrows(NullPointerException.class, () -> new IngestPipelineExecutor(null, "test_index"));
42+
expectThrows(NullPointerException.class, () -> new IngestPipelineExecutor(null, "test_index", (String) null));
4343
}
4444

4545
public void testConstructorRequiresNonNullIndex() {
46-
expectThrows(NullPointerException.class, () -> new IngestPipelineExecutor(ingestService, null));
46+
expectThrows(NullPointerException.class, () -> new IngestPipelineExecutor(ingestService, null, (String) null));
4747
}
4848

4949
// --- Pipeline resolution tests ---
5050

5151
public void testHasPipelines_NoPipelineConfigured() {
52-
IngestPipelineExecutor executor = new IngestPipelineExecutor(ingestService, "test_index", null);
52+
IngestPipelineExecutor executor = new IngestPipelineExecutor(ingestService, "test_index", (String) null);
5353
assertFalse(executor.hasPipelines());
5454
}
5555

@@ -59,7 +59,7 @@ public void testHasPipelines_PipelineConfigured() {
5959
}
6060

6161
public void testUpdateFinalPipeline_SetsPipeline() {
62-
IngestPipelineExecutor executor = new IngestPipelineExecutor(ingestService, "test_index", null);
62+
IngestPipelineExecutor executor = new IngestPipelineExecutor(ingestService, "test_index", (String) null);
6363
assertFalse(executor.hasPipelines());
6464

6565
executor.updateFinalPipeline("new-pipeline");
@@ -77,7 +77,7 @@ public void testUpdateFinalPipeline_NoopClearsPipeline() {
7777
// --- Execution: no pipeline configured ---
7878

7979
public void testExecutePipelines_NoPipeline_ReturnsSourceUnchanged() throws Exception {
80-
IngestPipelineExecutor executor = new IngestPipelineExecutor(ingestService, "test_index", null);
80+
IngestPipelineExecutor executor = new IngestPipelineExecutor(ingestService, "test_index", (String) null);
8181

8282
Map<String, Object> source = new HashMap<>();
8383
source.put("name", "alice");

server/src/test/java/org/opensearch/indices/pollingingest/MessageProcessorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public void setUp() throws Exception {
6464
processor = new MessageProcessorRunnable.MessageProcessor(
6565
ingestionEngine,
6666
"index",
67-
new IngestPipelineExecutor(mock(IngestService.class), "index", null)
67+
new IngestPipelineExecutor(mock(IngestService.class), "index", (String) null)
6868
);
6969
}
7070

0 commit comments

Comments
 (0)