Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added Search Only strict routing setting ([#17803](https://github.com/opensearch-project/OpenSearch/pull/17803))
- Disable the index API for ingestion engine ([#17768](https://github.com/opensearch-project/OpenSearch/pull/17768))
- Add update and delete support in pull-based ingestion ([#17822](https://github.com/opensearch-project/OpenSearch/pull/17822))
- Allow maxPollSize and pollTimeout in IngestionSource to be configurable ([#17863](https://github.com/opensearch-project/OpenSearch/pull/17863))

### Changed
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,30 @@ public Iterator<Setting<?>> settings() {
Property.Dynamic
);

/**
* Defines the max poll size per batch for pull-based ingestion.
*/
public static final String SETTING_INGESTION_SOURCE_MAX_POLL_SIZE = "index.ingestion_source.poll.max_batch_size";
public static final Setting<Long> INGESTION_SOURCE_MAX_POLL_SIZE = Setting.longSetting(
SETTING_INGESTION_SOURCE_MAX_POLL_SIZE,
1000,
0,
Property.IndexScope,
Property.Dynamic
);

/**
* Defines the poll timeout for pull-based ingestion in milliseconds.
*/
public static final String SETTING_INGESTION_SOURCE_POLL_TIMEOUT = "index.ingestion_source.poll.timeout";
public static final Setting<Integer> INGESTION_SOURCE_POLL_TIMEOUT = Setting.intSetting(
SETTING_INGESTION_SOURCE_POLL_TIMEOUT,
1000,
0,
Property.IndexScope,
Property.Dynamic
);

public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
"index.ingestion_source.param.",
key -> new Setting<>(key, "", (value) -> {
Expand Down Expand Up @@ -1047,9 +1071,13 @@ public IngestionSource getIngestionSource() {

final IngestionErrorStrategy.ErrorStrategy errorStrategy = INGESTION_SOURCE_ERROR_STRATEGY_SETTING.get(settings);
final Map<String, Object> ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings);
final long maxPollSize = INGESTION_SOURCE_MAX_POLL_SIZE.get(settings);
final int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.get(settings);
return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams)
.setPointerInitReset(pointerInitReset)
.setErrorStrategy(errorStrategy)
.setMaxPollSize(maxPollSize)
.setPollTimeout(pollTimeout)
.build();
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@
package org.opensearch.cluster.metadata;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
import org.opensearch.indices.pollingingest.StreamPoller;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE;
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POLL_TIMEOUT;

/**
* Class encapsulating the configuration of an ingestion source.
*/
Expand All @@ -25,17 +29,23 @@ public class IngestionSource {
private final PointerInitReset pointerInitReset;
private final IngestionErrorStrategy.ErrorStrategy errorStrategy;
private final Map<String, Object> params;
private final long maxPollSize;
private final int pollTimeout;

private IngestionSource(
String type,
PointerInitReset pointerInitReset,
IngestionErrorStrategy.ErrorStrategy errorStrategy,
Map<String, Object> params
Map<String, Object> params,
long maxPollSize,
int pollTimeout
) {
this.type = type;
this.pointerInitReset = pointerInitReset;
this.params = params;
this.errorStrategy = errorStrategy;
this.maxPollSize = maxPollSize;
this.pollTimeout = pollTimeout;
}

public String getType() {
Expand All @@ -54,6 +64,14 @@ public Map<String, Object> params() {
return params;
}

public long getMaxPollSize() {
return maxPollSize;
}

public int getPollTimeout() {
return pollTimeout;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -62,12 +80,14 @@ public boolean equals(Object o) {
return Objects.equals(type, ingestionSource.type)
&& Objects.equals(pointerInitReset, ingestionSource.pointerInitReset)
&& Objects.equals(errorStrategy, ingestionSource.errorStrategy)
&& Objects.equals(params, ingestionSource.params);
&& Objects.equals(params, ingestionSource.params)
&& Objects.equals(maxPollSize, ingestionSource.maxPollSize)
&& Objects.equals(pollTimeout, ingestionSource.pollTimeout);
}

@Override
public int hashCode() {
return Objects.hash(type, pointerInitReset, params, errorStrategy);
return Objects.hash(type, pointerInitReset, params, errorStrategy, maxPollSize, pollTimeout);
}

@Override
Expand All @@ -84,6 +104,10 @@ public String toString() {
+ '\''
+ ", params="
+ params
+ ", maxPollSize="
+ maxPollSize
+ ", pollTimeout="
+ pollTimeout
+ '}';
}

Expand Down Expand Up @@ -137,6 +161,8 @@ public static class Builder {
private PointerInitReset pointerInitReset;
private IngestionErrorStrategy.ErrorStrategy errorStrategy;
private Map<String, Object> params;
private long maxPollSize = INGESTION_SOURCE_MAX_POLL_SIZE.getDefault(Settings.EMPTY);
private int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.getDefault(Settings.EMPTY);

public Builder(String type) {
this.type = type;
Expand Down Expand Up @@ -165,13 +191,23 @@ public Builder setParams(Map<String, Object> params) {
return this;
}

public Builder setMaxPollSize(long maxPollSize) {
this.maxPollSize = maxPollSize;
return this;
}

public Builder addParam(String key, Object value) {
this.params.put(key, value);
return this;
}

public Builder setPollTimeout(int pollTimeout) {
this.pollTimeout = pollTimeout;
return this;
}

public IngestionSource build() {
return new IngestionSource(type, pointerInitReset, errorStrategy, params);
return new IngestionSource(type, pointerInitReset, errorStrategy, params, maxPollSize, pollTimeout);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ public void start() {
resetState,
resetValue,
ingestionErrorStrategy,
initialPollerState
initialPollerState,
ingestionSource.getMaxPollSize(),
ingestionSource.getPollTimeout()
);
streamPoller.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
public class DefaultStreamPoller implements StreamPoller {
private static final Logger logger = LogManager.getLogger(DefaultStreamPoller.class);

// TODO: make this configurable
public static final long MAX_POLL_SIZE = 1000;
public static final int POLL_TIMEOUT = 1000;

Expand Down Expand Up @@ -77,7 +76,9 @@ public DefaultStreamPoller(
ResetState resetState,
String resetValue,
IngestionErrorStrategy errorStrategy,
State initialState
State initialState,
long maxPollSize,
int pollTimeout
) {
this(
startPointer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public void testConstructorAndGetters() {
assertEquals("1000", source.getPointerInitReset().getValue());
assertEquals(DROP, source.getErrorStrategy());
assertEquals(params, source.params());
assertEquals(1000, source.getMaxPollSize());
assertEquals(1000, source.getPollTimeout());
}

public void testEquals() {
Expand All @@ -44,13 +46,17 @@ public void testEquals() {
IngestionSource source1 = new IngestionSource.Builder("type").setParams(params1)
.setPointerInitReset(pointerInitReset)
.setErrorStrategy(DROP)
.setMaxPollSize(500)
.setPollTimeout(500)
.build();

Map<String, Object> params2 = new HashMap<>();
params2.put("key", "value");
IngestionSource source2 = new IngestionSource.Builder("type").setParams(params2)
.setPointerInitReset(pointerInitReset)
.setErrorStrategy(DROP)
.setMaxPollSize(500)
.setPollTimeout(500)
.build();
assertTrue(source1.equals(source2));
assertTrue(source2.equals(source1));
Expand All @@ -68,13 +74,17 @@ public void testHashCode() {
IngestionSource source1 = new IngestionSource.Builder("type").setParams(params1)
.setPointerInitReset(pointerInitReset)
.setErrorStrategy(DROP)
.setMaxPollSize(500)
.setPollTimeout(500)
.build();

Map<String, Object> params2 = new HashMap<>();
params2.put("key", "value");
IngestionSource source2 = new IngestionSource.Builder("type").setParams(params2)
.setPointerInitReset(pointerInitReset)
.setErrorStrategy(DROP)
.setMaxPollSize(500)
.setPollTimeout(500)
.build();
assertEquals(source1.hashCode(), source2.hashCode());

Expand All @@ -93,7 +103,7 @@ public void testToString() {
.setErrorStrategy(DROP)
.build();
String expected =
"IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='REWIND_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}}";
"IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='REWIND_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}, maxPollSize=1000, pollTimeout=1000}";
assertEquals(expected, source.toString());
}
}
Loading