From 4e7d99eb581e12c77a5cf9f1deb0c51638ac478a Mon Sep 17 00:00:00 2001 From: Yupeng Fu Date: Wed, 9 Apr 2025 08:55:31 -0700 Subject: [PATCH 1/3] make maxPollSize and pollTimeout in IngestionSource configurable Signed-off-by: Yupeng Fu --- .../cluster/metadata/IndexMetadata.java | 28 +++++++++++++ .../cluster/metadata/IngestionSource.java | 40 +++++++++++++++++-- .../index/engine/IngestionEngine.java | 4 +- .../pollingingest/DefaultStreamPoller.java | 5 ++- .../metadata/IngestionSourceTests.java | 12 +++++- 5 files changed, 81 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 04b10a43ffa10..be6680feb3f2a 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -807,6 +807,30 @@ public Iterator> settings() { Property.Dynamic ); + /** + * Defines the max poll size per batch for pull-based ingestion. + */ + public static final String SETTING_INGESTION_SOURCE_MAX_POLL_SIZE = "index.ingestion_source.poll.max_batch_size"; + public static final Setting INGESTION_SOURCE_ERROR_MAX_POLL_SIZE = Setting.longSetting( + SETTING_INGESTION_SOURCE_MAX_POLL_SIZE, + 1000, + 0, + Property.IndexScope, + Property.Dynamic + ); + + /** + * Defines the poll timeout for pull-based ingestion in milliseconds. + */ + public static final String SETTING_INGESTION_SOURCE_POLL_TIMEOUT = "index.ingestion_source.poll.timeout"; + public static final Setting INGESTION_SOURCE_POLL_TIMEOUT = Setting.intSetting( + SETTING_INGESTION_SOURCE_POLL_TIMEOUT, + 1000, + 0, + Property.IndexScope, + Property.Dynamic + ); + public static final Setting.AffixSetting INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting( "index.ingestion_source.param.", key -> new Setting<>(key, "", (value) -> { @@ -1047,9 +1071,13 @@ public IngestionSource getIngestionSource() { final IngestionErrorStrategy.ErrorStrategy errorStrategy = INGESTION_SOURCE_ERROR_STRATEGY_SETTING.get(settings); final Map ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings); + final long maxPollSize = INGESTION_SOURCE_ERROR_MAX_POLL_SIZE.get(settings); + final int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.get(settings); return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams) .setPointerInitReset(pointerInitReset) .setErrorStrategy(errorStrategy) + .setMaxPollSize(maxPollSize) + .setPollTimeout(pollTimeout) .build(); } return null; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java b/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java index b8ffa890ce519..536c99788e7f5 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java @@ -25,17 +25,23 @@ public class IngestionSource { private final PointerInitReset pointerInitReset; private final IngestionErrorStrategy.ErrorStrategy errorStrategy; private final Map params; + private final long maxPollSize; + private final int pollTimeout; private IngestionSource( String type, PointerInitReset pointerInitReset, IngestionErrorStrategy.ErrorStrategy errorStrategy, - Map params + Map params, + long maxPollSize, + int pollTimeout ) { this.type = type; this.pointerInitReset = pointerInitReset; this.params = params; this.errorStrategy = errorStrategy; + this.maxPollSize = maxPollSize; + this.pollTimeout = pollTimeout; } public String getType() { @@ -54,6 +60,14 @@ public Map params() { return params; } + public long getMaxPollSize() { + return maxPollSize; + } + + public int getPollTimeout() { + return pollTimeout; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -62,12 +76,14 @@ public boolean equals(Object o) { return Objects.equals(type, ingestionSource.type) && Objects.equals(pointerInitReset, ingestionSource.pointerInitReset) && Objects.equals(errorStrategy, ingestionSource.errorStrategy) - && Objects.equals(params, ingestionSource.params); + && Objects.equals(params, ingestionSource.params) + && Objects.equals(maxPollSize, ingestionSource.maxPollSize) + && Objects.equals(pollTimeout, ingestionSource.pollTimeout); } @Override public int hashCode() { - return Objects.hash(type, pointerInitReset, params, errorStrategy); + return Objects.hash(type, pointerInitReset, params, errorStrategy, maxPollSize, pollTimeout); } @Override @@ -84,6 +100,10 @@ public String toString() { + '\'' + ", params=" + params + + ", maxPollSize=" + + maxPollSize + + ", pollTimeout=" + + pollTimeout + '}'; } @@ -137,6 +157,8 @@ public static class Builder { private PointerInitReset pointerInitReset; private IngestionErrorStrategy.ErrorStrategy errorStrategy; private Map params; + private long maxPollSize = 1000L; + private int pollTimeout = 1000; public Builder(String type) { this.type = type; @@ -165,13 +187,23 @@ public Builder setParams(Map params) { return this; } + public Builder setMaxPollSize(long maxPollSize) { + this.maxPollSize = maxPollSize; + return this; + } + public Builder addParam(String key, Object value) { this.params.put(key, value); return this; } + public Builder setPollTimeout(int pollTimeout) { + this.pollTimeout = pollTimeout; + return this; + } + public IngestionSource build() { - return new IngestionSource(type, pointerInitReset, errorStrategy, params); + return new IngestionSource(type, pointerInitReset, errorStrategy, params, maxPollSize, pollTimeout); } } diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index 6d5f112efe594..bd17ee2170121 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -121,7 +121,9 @@ public void start() { resetState, resetValue, ingestionErrorStrategy, - initialPollerState + initialPollerState, + ingestionSource.getMaxPollSize(), + ingestionSource.getPollTimeout() ); streamPoller.start(); } diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java index e1a4f7d3b4b7d..4b4a44e13d1df 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java @@ -32,7 +32,6 @@ public class DefaultStreamPoller implements StreamPoller { private static final Logger logger = LogManager.getLogger(DefaultStreamPoller.class); - // TODO: make this configurable public static final long MAX_POLL_SIZE = 1000; public static final int POLL_TIMEOUT = 1000; @@ -77,7 +76,9 @@ public DefaultStreamPoller( ResetState resetState, String resetValue, IngestionErrorStrategy errorStrategy, - State initialState + State initialState, + long maxPollSize, + int pollTimeout ) { this( startPointer, diff --git a/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java index 1e24c5f7df4a0..11a12c5e753f8 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java @@ -36,6 +36,8 @@ public void testConstructorAndGetters() { assertEquals("1000", source.getPointerInitReset().getValue()); assertEquals(DROP, source.getErrorStrategy()); assertEquals(params, source.params()); + assertEquals(1000, source.getMaxPollSize()); + assertEquals(1000, source.getPollTimeout()); } public void testEquals() { @@ -44,6 +46,8 @@ public void testEquals() { IngestionSource source1 = new IngestionSource.Builder("type").setParams(params1) .setPointerInitReset(pointerInitReset) .setErrorStrategy(DROP) + .setMaxPollSize(500) + .setPollTimeout(500) .build(); Map params2 = new HashMap<>(); @@ -51,6 +55,8 @@ public void testEquals() { IngestionSource source2 = new IngestionSource.Builder("type").setParams(params2) .setPointerInitReset(pointerInitReset) .setErrorStrategy(DROP) + .setMaxPollSize(500) + .setPollTimeout(500) .build(); assertTrue(source1.equals(source2)); assertTrue(source2.equals(source1)); @@ -68,6 +74,8 @@ public void testHashCode() { IngestionSource source1 = new IngestionSource.Builder("type").setParams(params1) .setPointerInitReset(pointerInitReset) .setErrorStrategy(DROP) + .setMaxPollSize(500) + .setPollTimeout(500) .build(); Map params2 = new HashMap<>(); @@ -75,6 +83,8 @@ public void testHashCode() { IngestionSource source2 = new IngestionSource.Builder("type").setParams(params2) .setPointerInitReset(pointerInitReset) .setErrorStrategy(DROP) + .setMaxPollSize(500) + .setPollTimeout(500) .build(); assertEquals(source1.hashCode(), source2.hashCode()); @@ -93,7 +103,7 @@ public void testToString() { .setErrorStrategy(DROP) .build(); String expected = - "IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='REWIND_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}}"; + "IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='REWIND_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}, maxPollSize=1000, pollTimeout=1000}"; assertEquals(expected, source.toString()); } } From 2c11ad6da6564d66a7d9bcedb6b2842e102df000 Mon Sep 17 00:00:00 2001 From: Yupeng Fu Date: Wed, 9 Apr 2025 09:00:34 -0700 Subject: [PATCH 2/3] changelog Signed-off-by: Yupeng Fu --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 455184632ea35..af5c4077b4406 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Added Search Only strict routing setting ([#17803](https://github.com/opensearch-project/OpenSearch/pull/17803)) - Disable the index API for ingestion engine ([#17768](https://github.com/opensearch-project/OpenSearch/pull/17768)) - Add update and delete support in pull-based ingestion ([#17822](https://github.com/opensearch-project/OpenSearch/pull/17822)) +- Allow maxPollSize and pollTimeout in IngestionSource to be configurable ([#17863](https://github.com/opensearch-project/OpenSearch/pull/17863)) ### Changed - Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912)) From 51dfaf898070a6ad47c12f0f947f2a9423ca66e6 Mon Sep 17 00:00:00 2001 From: Yupeng Fu Date: Wed, 9 Apr 2025 13:25:21 -0700 Subject: [PATCH 3/3] comment Signed-off-by: Yupeng Fu --- .../org/opensearch/cluster/metadata/IndexMetadata.java | 4 ++-- .../org/opensearch/cluster/metadata/IngestionSource.java | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index be6680feb3f2a..f32389ba1c951 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -811,7 +811,7 @@ public Iterator> settings() { * Defines the max poll size per batch for pull-based ingestion. */ public static final String SETTING_INGESTION_SOURCE_MAX_POLL_SIZE = "index.ingestion_source.poll.max_batch_size"; - public static final Setting INGESTION_SOURCE_ERROR_MAX_POLL_SIZE = Setting.longSetting( + public static final Setting INGESTION_SOURCE_MAX_POLL_SIZE = Setting.longSetting( SETTING_INGESTION_SOURCE_MAX_POLL_SIZE, 1000, 0, @@ -1071,7 +1071,7 @@ public IngestionSource getIngestionSource() { final IngestionErrorStrategy.ErrorStrategy errorStrategy = INGESTION_SOURCE_ERROR_STRATEGY_SETTING.get(settings); final Map ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings); - final long maxPollSize = INGESTION_SOURCE_ERROR_MAX_POLL_SIZE.get(settings); + final long maxPollSize = INGESTION_SOURCE_MAX_POLL_SIZE.get(settings); final int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.get(settings); return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams) .setPointerInitReset(pointerInitReset) diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java b/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java index 536c99788e7f5..d3c44c1e7027c 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java @@ -9,6 +9,7 @@ package org.opensearch.cluster.metadata; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.settings.Settings; import org.opensearch.indices.pollingingest.IngestionErrorStrategy; import org.opensearch.indices.pollingingest.StreamPoller; @@ -16,6 +17,9 @@ import java.util.Map; import java.util.Objects; +import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE; +import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POLL_TIMEOUT; + /** * Class encapsulating the configuration of an ingestion source. */ @@ -157,8 +161,8 @@ public static class Builder { private PointerInitReset pointerInitReset; private IngestionErrorStrategy.ErrorStrategy errorStrategy; private Map params; - private long maxPollSize = 1000L; - private int pollTimeout = 1000; + private long maxPollSize = INGESTION_SOURCE_MAX_POLL_SIZE.getDefault(Settings.EMPTY); + private int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.getDefault(Settings.EMPTY); public Builder(String type) { this.type = type;