Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/))
- Enabled default throttling for all tasks submitted to cluster manager ([#17711](https://github.com/opensearch-project/OpenSearch/pull/17711))
- Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988))
- Add pull-based ingestion error metrics and make internal queue size configurable ([#18088](https://github.com/opensearch-project/OpenSearch/pull/18088))

### Changed
- Change the default max header size from 8KB to 16KB. ([#18024](https://github.com/opensearch-project/OpenSearch/pull/18024))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.indices.pollingingest.PollingIngestStats;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.transport.client.Requests;
Expand Down Expand Up @@ -135,6 +136,8 @@ public void testErrorStrategy() throws Exception {
// malformed message
produceData("2", "", "");
produceData("3", "name3", "25");
produceData("{\"_op_type\":\"invalid\",\"_source\":{\"name\":\"name4\", \"age\": 25}}");
produceData("5", "name5", "25");

internalCluster().startClusterManagerOnlyNode();
final String node = internalCluster().startDataOnlyNode();
Expand All @@ -147,6 +150,7 @@ public void testErrorStrategy() throws Exception {
.put("ingestion_source.type", "kafka")
.put("ingestion_source.error_strategy", "block")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.internal_queue_size", "1000")
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("index.replication.type", "SEGMENT")
Expand All @@ -165,7 +169,15 @@ public void testErrorStrategy() throws Exception {
.get();
waitForState(() -> "drop".equalsIgnoreCase(getSettings(indexName, "index.ingestion_source.error_strategy")));
resumeIngestion(indexName);
waitForSearchableDocs(2, Arrays.asList(node));
waitForSearchableDocs(3, Arrays.asList(node));

PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
assertNotNull(stats);
assertThat(stats.getMessageProcessorStats().totalFailedCount(), is(1L));
assertThat(stats.getMessageProcessorStats().totalFailuresDroppedCount(), is(1L));
assertThat(stats.getConsumerStats().totalConsumerErrorCount(), is(0L));
assertThat(stats.getConsumerStats().totalPollerMessageDroppedCount(), is(1L));
}

public void testPauseAndResumeIngestion() throws Exception {
Expand Down Expand Up @@ -372,6 +384,13 @@ public void testExternalVersioning() throws Exception {
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
return true;
});

// validate processor stats
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
assertNotNull(stats);
assertThat(stats.getMessageProcessorStats().totalProcessedCount(), is(11L));
assertThat(stats.getMessageProcessorStats().totalVersionConflictsCount(), is(3L));
}

public void testExternalVersioningWithDisabledGCDeletes() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,19 @@ public Iterator<Setting<?>> settings() {
Setting.Property.Final
);

/**
* Defines the internal blocking queue size that is used to decouple poller and processor in pull-based ingestion.
*/
public static final String SETTING_INGESTION_SOURCE_INTERNAL_QUEUE_SIZE = "index.ingestion_source.internal_queue_size";
public static final Setting<Integer> INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING = Setting.intSetting(
SETTING_INGESTION_SOURCE_INTERNAL_QUEUE_SIZE,
100,
1,
100000,
Property.IndexScope,
Setting.Property.Final
);

public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
"index.ingestion_source.param.",
key -> new Setting<>(key, "", (value) -> {
Expand Down Expand Up @@ -1086,13 +1099,15 @@ public IngestionSource getIngestionSource() {
final long maxPollSize = INGESTION_SOURCE_MAX_POLL_SIZE.get(settings);
final int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.get(settings);
final int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.get(settings);
final int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.get(settings);

return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams)
.setPointerInitReset(pointerInitReset)
.setErrorStrategy(errorStrategy)
.setMaxPollSize(maxPollSize)
.setPollTimeout(pollTimeout)
.setNumProcessorThreads(numProcessorThreads)
.setBlockingQueueSize(blockingQueueSize)
.build();
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Map;
import java.util.Objects;

import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE;
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POLL_TIMEOUT;
Expand All @@ -33,6 +34,7 @@
private final long maxPollSize;
private final int pollTimeout;
private int numProcessorThreads;
private int blockingQueueSize;

private IngestionSource(
String type,
Expand All @@ -41,7 +43,8 @@
Map<String, Object> params,
long maxPollSize,
int pollTimeout,
int numProcessorThreads
int numProcessorThreads,
int blockingQueueSize
) {
this.type = type;
this.pointerInitReset = pointerInitReset;
Expand All @@ -50,6 +53,7 @@
this.maxPollSize = maxPollSize;
this.pollTimeout = pollTimeout;
this.numProcessorThreads = numProcessorThreads;
this.blockingQueueSize = blockingQueueSize;
}

public String getType() {
Expand Down Expand Up @@ -80,6 +84,10 @@
return numProcessorThreads;
}

public int getBlockingQueueSize() {
return blockingQueueSize;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -91,12 +99,22 @@
&& Objects.equals(params, ingestionSource.params)
&& Objects.equals(maxPollSize, ingestionSource.maxPollSize)
&& Objects.equals(pollTimeout, ingestionSource.pollTimeout)
&& Objects.equals(numProcessorThreads, ingestionSource.numProcessorThreads);
&& Objects.equals(numProcessorThreads, ingestionSource.numProcessorThreads)
&& Objects.equals(blockingQueueSize, ingestionSource.blockingQueueSize);
}

@Override
public int hashCode() {
return Objects.hash(type, pointerInitReset, params, errorStrategy, maxPollSize, pollTimeout, numProcessorThreads);
return Objects.hash(
type,
pointerInitReset,
params,
errorStrategy,
maxPollSize,
pollTimeout,
numProcessorThreads,
blockingQueueSize
);
}

@Override
Expand All @@ -119,6 +137,8 @@
+ pollTimeout
+ ", numProcessorThreads="
+ numProcessorThreads
+ ", blockingQueueSize="
+ blockingQueueSize
+ '}';
}

Expand Down Expand Up @@ -175,6 +195,7 @@
private long maxPollSize = INGESTION_SOURCE_MAX_POLL_SIZE.getDefault(Settings.EMPTY);
private int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.getDefault(Settings.EMPTY);
private int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.getDefault(Settings.EMPTY);
private int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.getDefault(Settings.EMPTY);

public Builder(String type) {
this.type = type;
Expand All @@ -186,6 +207,7 @@
this.pointerInitReset = ingestionSource.pointerInitReset;
this.errorStrategy = ingestionSource.errorStrategy;
this.params = ingestionSource.params;
this.blockingQueueSize = ingestionSource.blockingQueueSize;

Check warning on line 210 in server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java#L210

Added line #L210 was not covered by tests
}

public Builder setPointerInitReset(PointerInitReset pointerInitReset) {
Expand Down Expand Up @@ -223,8 +245,22 @@
return this;
}

public Builder setBlockingQueueSize(int blockingQueueSize) {
this.blockingQueueSize = blockingQueueSize;
return this;
}

public IngestionSource build() {
return new IngestionSource(type, pointerInitReset, errorStrategy, params, maxPollSize, pollTimeout, numProcessorThreads);
return new IngestionSource(
type,
pointerInitReset,
errorStrategy,
params,
maxPollSize,
pollTimeout,
numProcessorThreads,
blockingQueueSize
);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE,
IndexMetadata.INGESTION_SOURCE_POLL_TIMEOUT,
IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING,
IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING,

// Settings for search replica
IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public void start() {
initialPollerState,
ingestionSource.getMaxPollSize(),
ingestionSource.getPollTimeout(),
ingestionSource.getNumProcessorThreads()
ingestionSource.getNumProcessorThreads(),
ingestionSource.getBlockingQueueSize()
);
streamPoller.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@
private Set<IngestionShardPointer> persistedPointers;

private final CounterMetric totalPolledCount = new CounterMetric();
private final CounterMetric totalConsumerErrorCount = new CounterMetric();
private final CounterMetric totalPollerMessageFailureCount = new CounterMetric();
private final CounterMetric totalPollerMessageDroppedCount = new CounterMetric();

// A pointer to the max persisted pointer for optimizing the check
@Nullable
Expand All @@ -76,13 +79,20 @@
State initialState,
long maxPollSize,
int pollTimeout,
int numProcessorThreads
int numProcessorThreads,
int blockingQueueSize
) {
this(
startPointer,
persistedPointers,
consumer,
new PartitionedBlockingQueueContainer(numProcessorThreads, consumer.getShardId(), ingestionEngine, errorStrategy),
new PartitionedBlockingQueueContainer(
numProcessorThreads,
consumer.getShardId(),
ingestionEngine,
errorStrategy,
blockingQueueSize
),
resetState,
resetValue,
errorStrategy,
Expand Down Expand Up @@ -227,6 +237,7 @@
// The user will have the option to manually update the offset and resume ingestion.
// todo: support retry?
logger.error("Pausing ingestion. Fatal error occurred in polling the shard {}: {}", consumer.getShardId(), e);
totalConsumerErrorCount.inc();

Check warning on line 240 in server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java#L240

Added line #L240 was not covered by tests
pause();
}
}
Expand Down Expand Up @@ -263,12 +274,15 @@
e
);
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);
totalPollerMessageFailureCount.inc();

if (!errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.POLLING)) {
if (errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.POLLING) == false) {
// Blocking error encountered. Pause poller to stop processing remaining updates.
pause();
failedShardPointer = result.getPointer();
break;
} else {
totalPollerMessageDroppedCount.inc();
}
}
}
Expand Down Expand Up @@ -364,10 +378,20 @@

@Override
public PollingIngestStats getStats() {
MessageProcessorRunnable.MessageProcessorMetrics processorMetrics = blockingQueueContainer.getMessageProcessorMetrics();
PollingIngestStats.Builder builder = new PollingIngestStats.Builder();
// set processor stats
builder.setTotalProcessedCount(processorMetrics.processedCounter().count());
builder.setTotalInvalidMessageCount(processorMetrics.invalidMessageCounter().count());
builder.setTotalProcessorVersionConflictsCount(processorMetrics.versionConflictCounter().count());
builder.setTotalProcessorFailedCount(processorMetrics.failedMessageCounter().count());
builder.setTotalProcessorFailuresDroppedCount(processorMetrics.failedMessageDroppedCounter().count());
builder.setTotalProcessorThreadInterruptCount(processorMetrics.processorThreadInterruptCounter().count());
// set consumer stats
builder.setTotalPolledCount(totalPolledCount.count());
builder.setTotalProcessedCount(blockingQueueContainer.getTotalProcessedCount());
builder.setTotalSkippedCount(blockingQueueContainer.getTotalSkippedCount());
builder.setTotalConsumerErrorCount(totalConsumerErrorCount.count());
builder.setTotalPollerMessageFailureCount(totalPollerMessageFailureCount.count());
builder.setTotalPollerMessageDroppedCount(totalPollerMessageDroppedCount.count());
builder.setLagInMillis(computeLag());
return builder.build();
}
Expand Down
Loading
Loading