diff --git a/CHANGELOG.md b/CHANGELOG.md index 856a1e55850a2..49f6f11e7cf3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/)) - Enabled default throttling for all tasks submitted to cluster manager ([#17711](https://github.com/opensearch-project/OpenSearch/pull/17711)) - Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988)) +- Add pull-based ingestion error metrics and make internal queue size configurable ([#18088](https://github.com/opensearch-project/OpenSearch/pull/18088)) ### Changed - Change the default max header size from 8KB to 16KB. ([#18024](https://github.com/opensearch-project/OpenSearch/pull/18024)) diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java index 2c56fa574fdb5..97178046fac1b 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java @@ -22,6 +22,7 @@ import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.RangeQueryBuilder; import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.indices.pollingingest.PollingIngestStats; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.transport.client.Requests; @@ -135,6 +136,8 @@ public void testErrorStrategy() throws Exception { // malformed message produceData("2", "", ""); produceData("3", "name3", "25"); + produceData("{\"_op_type\":\"invalid\",\"_source\":{\"name\":\"name4\", \"age\": 25}}"); + produceData("5", "name5", "25"); internalCluster().startClusterManagerOnlyNode(); final String node = internalCluster().startDataOnlyNode(); @@ -147,6 +150,7 @@ public void testErrorStrategy() throws Exception { .put("ingestion_source.type", "kafka") .put("ingestion_source.error_strategy", "block") .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.internal_queue_size", "1000") .put("ingestion_source.param.topic", topicName) .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) .put("index.replication.type", "SEGMENT") @@ -165,7 +169,15 @@ public void testErrorStrategy() throws Exception { .get(); waitForState(() -> "drop".equalsIgnoreCase(getSettings(indexName, "index.ingestion_source.error_strategy"))); resumeIngestion(indexName); - waitForSearchableDocs(2, Arrays.asList(node)); + waitForSearchableDocs(3, Arrays.asList(node)); + + PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0] + .getPollingIngestStats(); + assertNotNull(stats); + assertThat(stats.getMessageProcessorStats().totalFailedCount(), is(1L)); + assertThat(stats.getMessageProcessorStats().totalFailuresDroppedCount(), is(1L)); + assertThat(stats.getConsumerStats().totalConsumerErrorCount(), is(0L)); + assertThat(stats.getConsumerStats().totalPollerMessageDroppedCount(), is(1L)); } public void testPauseAndResumeIngestion() throws Exception { @@ -372,6 +384,13 @@ public void testExternalVersioning() throws Exception { assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L)); return true; }); + + // validate processor stats + PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0] + .getPollingIngestStats(); + assertNotNull(stats); + assertThat(stats.getMessageProcessorStats().totalProcessedCount(), is(11L)); + assertThat(stats.getMessageProcessorStats().totalVersionConflictsCount(), is(3L)); } public void testExternalVersioningWithDisabledGCDeletes() throws Exception { diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 7d6a241aa3985..90b78ad3a0fe7 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -843,6 +843,19 @@ public Iterator> settings() { Setting.Property.Final ); + /** + * Defines the internal blocking queue size that is used to decouple poller and processor in pull-based ingestion. + */ + public static final String SETTING_INGESTION_SOURCE_INTERNAL_QUEUE_SIZE = "index.ingestion_source.internal_queue_size"; + public static final Setting INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING = Setting.intSetting( + SETTING_INGESTION_SOURCE_INTERNAL_QUEUE_SIZE, + 100, + 1, + 100000, + Property.IndexScope, + Setting.Property.Final + ); + public static final Setting.AffixSetting INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting( "index.ingestion_source.param.", key -> new Setting<>(key, "", (value) -> { @@ -1086,6 +1099,7 @@ public IngestionSource getIngestionSource() { final long maxPollSize = INGESTION_SOURCE_MAX_POLL_SIZE.get(settings); final int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.get(settings); final int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.get(settings); + final int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.get(settings); return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams) .setPointerInitReset(pointerInitReset) @@ -1093,6 +1107,7 @@ public IngestionSource getIngestionSource() { .setMaxPollSize(maxPollSize) .setPollTimeout(pollTimeout) .setNumProcessorThreads(numProcessorThreads) + .setBlockingQueueSize(blockingQueueSize) .build(); } return null; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java b/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java index e975a085dc8bf..9feb847fe36ee 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java @@ -17,6 +17,7 @@ import java.util.Map; import java.util.Objects; +import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE; import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POLL_TIMEOUT; @@ -33,6 +34,7 @@ public class IngestionSource { private final long maxPollSize; private final int pollTimeout; private int numProcessorThreads; + private int blockingQueueSize; private IngestionSource( String type, @@ -41,7 +43,8 @@ private IngestionSource( Map params, long maxPollSize, int pollTimeout, - int numProcessorThreads + int numProcessorThreads, + int blockingQueueSize ) { this.type = type; this.pointerInitReset = pointerInitReset; @@ -50,6 +53,7 @@ private IngestionSource( this.maxPollSize = maxPollSize; this.pollTimeout = pollTimeout; this.numProcessorThreads = numProcessorThreads; + this.blockingQueueSize = blockingQueueSize; } public String getType() { @@ -80,6 +84,10 @@ public int getNumProcessorThreads() { return numProcessorThreads; } + public int getBlockingQueueSize() { + return blockingQueueSize; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -91,12 +99,22 @@ public boolean equals(Object o) { && Objects.equals(params, ingestionSource.params) && Objects.equals(maxPollSize, ingestionSource.maxPollSize) && Objects.equals(pollTimeout, ingestionSource.pollTimeout) - && Objects.equals(numProcessorThreads, ingestionSource.numProcessorThreads); + && Objects.equals(numProcessorThreads, ingestionSource.numProcessorThreads) + && Objects.equals(blockingQueueSize, ingestionSource.blockingQueueSize); } @Override public int hashCode() { - return Objects.hash(type, pointerInitReset, params, errorStrategy, maxPollSize, pollTimeout, numProcessorThreads); + return Objects.hash( + type, + pointerInitReset, + params, + errorStrategy, + maxPollSize, + pollTimeout, + numProcessorThreads, + blockingQueueSize + ); } @Override @@ -119,6 +137,8 @@ public String toString() { + pollTimeout + ", numProcessorThreads=" + numProcessorThreads + + ", blockingQueueSize=" + + blockingQueueSize + '}'; } @@ -175,6 +195,7 @@ public static class Builder { private long maxPollSize = INGESTION_SOURCE_MAX_POLL_SIZE.getDefault(Settings.EMPTY); private int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.getDefault(Settings.EMPTY); private int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.getDefault(Settings.EMPTY); + private int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.getDefault(Settings.EMPTY); public Builder(String type) { this.type = type; @@ -186,6 +207,7 @@ public Builder(IngestionSource ingestionSource) { this.pointerInitReset = ingestionSource.pointerInitReset; this.errorStrategy = ingestionSource.errorStrategy; this.params = ingestionSource.params; + this.blockingQueueSize = ingestionSource.blockingQueueSize; } public Builder setPointerInitReset(PointerInitReset pointerInitReset) { @@ -223,8 +245,22 @@ public Builder setNumProcessorThreads(int numProcessorThreads) { return this; } + public Builder setBlockingQueueSize(int blockingQueueSize) { + this.blockingQueueSize = blockingQueueSize; + return this; + } + public IngestionSource build() { - return new IngestionSource(type, pointerInitReset, errorStrategy, params, maxPollSize, pollTimeout, numProcessorThreads); + return new IngestionSource( + type, + pointerInitReset, + errorStrategy, + params, + maxPollSize, + pollTimeout, + numProcessorThreads, + blockingQueueSize + ); } } diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 7c937aa310f16..d60204bad47f5 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -273,6 +273,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE, IndexMetadata.INGESTION_SOURCE_POLL_TIMEOUT, IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING, + IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING, // Settings for search replica IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING, diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index 730e3c44bdb45..399865dcbb0c7 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -129,7 +129,8 @@ public void start() { initialPollerState, ingestionSource.getMaxPollSize(), ingestionSource.getPollTimeout(), - ingestionSource.getNumProcessorThreads() + ingestionSource.getNumProcessorThreads(), + ingestionSource.getBlockingQueueSize() ); streamPoller.start(); } diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java index 25308848bbf03..411e359911093 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java @@ -58,6 +58,9 @@ public class DefaultStreamPoller implements StreamPoller { private Set persistedPointers; private final CounterMetric totalPolledCount = new CounterMetric(); + private final CounterMetric totalConsumerErrorCount = new CounterMetric(); + private final CounterMetric totalPollerMessageFailureCount = new CounterMetric(); + private final CounterMetric totalPollerMessageDroppedCount = new CounterMetric(); // A pointer to the max persisted pointer for optimizing the check @Nullable @@ -76,13 +79,20 @@ public DefaultStreamPoller( State initialState, long maxPollSize, int pollTimeout, - int numProcessorThreads + int numProcessorThreads, + int blockingQueueSize ) { this( startPointer, persistedPointers, consumer, - new PartitionedBlockingQueueContainer(numProcessorThreads, consumer.getShardId(), ingestionEngine, errorStrategy), + new PartitionedBlockingQueueContainer( + numProcessorThreads, + consumer.getShardId(), + ingestionEngine, + errorStrategy, + blockingQueueSize + ), resetState, resetValue, errorStrategy, @@ -227,6 +237,7 @@ protected void startPoll() { // The user will have the option to manually update the offset and resume ingestion. // todo: support retry? logger.error("Pausing ingestion. Fatal error occurred in polling the shard {}: {}", consumer.getShardId(), e); + totalConsumerErrorCount.inc(); pause(); } } @@ -263,12 +274,15 @@ private IngestionShardPointer processRecords( e ); errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING); + totalPollerMessageFailureCount.inc(); - if (!errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.POLLING)) { + if (errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.POLLING) == false) { // Blocking error encountered. Pause poller to stop processing remaining updates. pause(); failedShardPointer = result.getPointer(); break; + } else { + totalPollerMessageDroppedCount.inc(); } } } @@ -364,10 +378,20 @@ public IngestionShardPointer getBatchStartPointer() { @Override public PollingIngestStats getStats() { + MessageProcessorRunnable.MessageProcessorMetrics processorMetrics = blockingQueueContainer.getMessageProcessorMetrics(); PollingIngestStats.Builder builder = new PollingIngestStats.Builder(); + // set processor stats + builder.setTotalProcessedCount(processorMetrics.processedCounter().count()); + builder.setTotalInvalidMessageCount(processorMetrics.invalidMessageCounter().count()); + builder.setTotalProcessorVersionConflictsCount(processorMetrics.versionConflictCounter().count()); + builder.setTotalProcessorFailedCount(processorMetrics.failedMessageCounter().count()); + builder.setTotalProcessorFailuresDroppedCount(processorMetrics.failedMessageDroppedCounter().count()); + builder.setTotalProcessorThreadInterruptCount(processorMetrics.processorThreadInterruptCounter().count()); + // set consumer stats builder.setTotalPolledCount(totalPolledCount.count()); - builder.setTotalProcessedCount(blockingQueueContainer.getTotalProcessedCount()); - builder.setTotalSkippedCount(blockingQueueContainer.getTotalSkippedCount()); + builder.setTotalConsumerErrorCount(totalConsumerErrorCount.count()); + builder.setTotalPollerMessageFailureCount(totalPollerMessageFailureCount.count()); + builder.setTotalPollerMessageDroppedCount(totalPollerMessageDroppedCount.count()); builder.setLagInMillis(computeLag()); return builder.build(); } diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java b/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java index 810c4ae14d7a2..51e322ad7d16b 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java @@ -55,8 +55,7 @@ public class MessageProcessorRunnable implements Runnable, Closeable { private final BlockingQueue> blockingQueue; private final MessageProcessor messageProcessor; - private final CounterMetric processedCounter = new CounterMetric(); - private final CounterMetric skippedCounter = new CounterMetric(); + private final MessageProcessorMetrics messageProcessorMetrics = MessageProcessorMetrics.create(); // currentShardPointer tracks the most recent pointer that is being processed @Nullable @@ -118,11 +117,11 @@ static class MessageProcessor { * field used for range search, (2) a stored field for retrieval. * * @param shardUpdateMessage contains the message to process - * @param skippedCounter the counter for skipped messages + * @param messageProcessorMetrics message processor metrics */ - protected void process(ShardUpdateMessage shardUpdateMessage, CounterMetric skippedCounter) { + protected void process(ShardUpdateMessage shardUpdateMessage, MessageProcessorMetrics messageProcessorMetrics) { try { - Engine.Operation operation = getOperation(shardUpdateMessage, skippedCounter); + Engine.Operation operation = getOperation(shardUpdateMessage, messageProcessorMetrics); switch (operation.operationType()) { case INDEX: engine.indexInternal((Engine.Index) operation); @@ -149,21 +148,22 @@ protected void process(ShardUpdateMessage shardUpdateMessage, CounterMetric skip /** * Visible for testing. Get the engine operation from the message. * @param shardUpdateMessage an update message containing payload and pointer for the update - * @param skippedCounter the counter for skipped messages + * @param messageProcessorMetrics message processor metrics * @return the engine operation */ - protected Engine.Operation getOperation(ShardUpdateMessage shardUpdateMessage, CounterMetric skippedCounter) throws IOException { + protected Engine.Operation getOperation(ShardUpdateMessage shardUpdateMessage, MessageProcessorMetrics messageProcessorMetrics) + throws IOException { Map payloadMap = shardUpdateMessage.parsedPayloadMap(); IngestionShardPointer pointer = shardUpdateMessage.pointer(); if (payloadMap.containsKey(OP_TYPE) && !(payloadMap.get(OP_TYPE) instanceof String)) { - skippedCounter.inc(); + messageProcessorMetrics.invalidMessageCounter.inc(); logger.error("_op_type field is of type {} but not string, skipping the message", payloadMap.get(OP_TYPE).getClass()); return null; } if (payloadMap.containsKey(ID) == false) { - // TODO: add metric + messageProcessorMetrics.invalidMessageCounter.inc(); logger.error("ID field is missing, skipping the message"); return null; } @@ -185,19 +185,18 @@ protected Engine.Operation getOperation(ShardUpdateMessage shardUpdateMessage, C switch (opType) { case INDEX: if (!payloadMap.containsKey(SOURCE)) { - skippedCounter.inc(); + messageProcessorMetrics.invalidMessageCounter.inc(); logger.error("missing _source field, skipping the message"); return null; } if (!(payloadMap.get(SOURCE) instanceof Map)) { - skippedCounter.inc(); + messageProcessorMetrics.invalidMessageCounter.inc(); logger.error("_source field does not contain a map, skipping the message"); return null; } BytesReference source = convertToBytes(payloadMap.get(SOURCE)); SourceToParse sourceToParse = new SourceToParse(index, id, source, MediaTypeRegistry.xContentType(source), null); - // TODO: handle parsing err ParsedDocument doc = engine.getDocumentMapperForType().getDocumentMapper().parse(sourceToParse); ParseContext.Document document = doc.rootDoc(); // set the offset as the offset field @@ -223,12 +222,13 @@ protected Engine.Operation getOperation(ShardUpdateMessage shardUpdateMessage, C case DELETE: if (shardUpdateMessage.autoGeneratedIdTimestamp() != UNSET_AUTO_GENERATED_TIMESTAMP) { logger.info("Delete operation without ID received, and will be dropped."); + messageProcessorMetrics.invalidMessageCounter.inc(); operation = new Engine.NoOp( 0, 1, Engine.Operation.Origin.PRIMARY, System.nanoTime(), - "Delete operation is missing ID" + "Delete operation is missing ID. Skipping message." ); } else { operation = new Engine.Delete( @@ -246,6 +246,7 @@ protected Engine.Operation getOperation(ShardUpdateMessage shardUpdateMessage, C } break; default: + messageProcessorMetrics.invalidMessageCounter.inc(); logger.error("Unsupported operation type {}", opType); return null; } @@ -277,26 +278,28 @@ public void run() { shardUpdateMessage = blockingQueue.poll(1000, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { - // TODO: add metric + messageProcessorMetrics.processorThreadInterruptCounter.inc(); logger.debug("MessageProcessorRunnable poll interruptedException", e); Thread.currentThread().interrupt(); // Restore interrupt status } if (shardUpdateMessage != null) { try { - processedCounter.inc(); + messageProcessorMetrics.processedCounter.inc(); currentShardPointer = shardUpdateMessage.pointer(); - messageProcessor.process(shardUpdateMessage, skippedCounter); + messageProcessor.process(shardUpdateMessage, messageProcessorMetrics); shardUpdateMessage = null; } catch (VersionConflictEngineException e) { // Messages with version conflicts will be dropped. This should not have any impact to data // correctness as pull-based ingestion does not support partial updates. - // TODO: add metric + messageProcessorMetrics.versionConflictCounter.inc(); logger.debug("Dropping message due to version conflict. ShardPointer: " + shardUpdateMessage.pointer().asString(), e); shardUpdateMessage = null; } catch (Exception e) { + messageProcessorMetrics.failedMessageCounter.inc(); errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.PROCESSING); if (errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.PROCESSING)) { shardUpdateMessage = null; + messageProcessorMetrics.failedMessageDroppedCounter.inc(); } else { waitBeforeRetry(); } @@ -314,12 +317,8 @@ private void waitBeforeRetry() { } } - public CounterMetric getProcessedCounter() { - return processedCounter; - } - - public CounterMetric getSkippedCounter() { - return skippedCounter; + public MessageProcessorMetrics getMessageProcessorMetrics() { + return messageProcessorMetrics; } public IngestionErrorStrategy getErrorStrategy() { @@ -342,4 +341,38 @@ public IngestionShardPointer getCurrentShardPointer() { public void close() { closed = true; } + + /** + * Tracks MessageProcessor metrics. + */ + public record MessageProcessorMetrics(CounterMetric processedCounter, CounterMetric invalidMessageCounter, + CounterMetric versionConflictCounter, CounterMetric failedMessageCounter, CounterMetric failedMessageDroppedCounter, + CounterMetric processorThreadInterruptCounter) { + public static MessageProcessorMetrics create() { + return new MessageProcessorMetrics( + new CounterMetric(), + new CounterMetric(), + new CounterMetric(), + new CounterMetric(), + new CounterMetric(), + new CounterMetric() + ); + } + + public MessageProcessorMetrics combine(MessageProcessorMetrics other) { + MessageProcessorMetrics combinedMetrics = create(); + combinedMetrics.processedCounter.inc(this.processedCounter.count() + other.processedCounter.count()); + combinedMetrics.invalidMessageCounter.inc(this.invalidMessageCounter.count() + other.invalidMessageCounter.count()); + combinedMetrics.versionConflictCounter.inc(this.versionConflictCounter.count() + other.versionConflictCounter.count()); + combinedMetrics.failedMessageCounter.inc(this.failedMessageCounter.count() + other.failedMessageCounter.count()); + combinedMetrics.failedMessageDroppedCounter.inc( + this.failedMessageDroppedCounter.count() + other.failedMessageDroppedCounter.count() + ); + combinedMetrics.processorThreadInterruptCounter.inc( + this.processorThreadInterruptCounter.count() + other.processorThreadInterruptCounter.count() + ); + + return combinedMetrics; + } + } } diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java b/server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java index 90cd39a2ff6a9..c5a033c8b4739 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.util.RequestUtils; import org.opensearch.core.common.Strings; import org.opensearch.index.IngestionShardConsumer; @@ -39,7 +38,6 @@ */ public class PartitionedBlockingQueueContainer { private static final Logger logger = LogManager.getLogger(PartitionedBlockingQueueContainer.class); - private static final int defaultQueueSize = 100; private final int numPartitions; // partition mappings @@ -54,7 +52,8 @@ public PartitionedBlockingQueueContainer( int numPartitions, int shardId, IngestionEngine ingestionEngine, - IngestionErrorStrategy errorStrategy + IngestionErrorStrategy errorStrategy, + int blockingQueueSize ) { assert numPartitions > 0 : "Number of processor threads / partitions must be greater than 0"; partitionToQueueMap = new ConcurrentHashMap<>(); @@ -76,7 +75,7 @@ public PartitionedBlockingQueueContainer( r -> new Thread(r, String.format(Locale.ROOT, processorThreadName)) ); partitionToProcessorExecutorMap.put(partition, executorService); - partitionToQueueMap.put(partition, new ArrayBlockingQueue<>(defaultQueueSize)); + partitionToQueueMap.put(partition, new ArrayBlockingQueue<>(blockingQueueSize)); MessageProcessorRunnable messageProcessorRunnable = new MessageProcessorRunnable( partitionToQueueMap.get(partition), @@ -162,25 +161,14 @@ public void close() { } /** - * Return total number of processed updates across all partitions. + * Returns aggregated message processor metrics from all processor threads. */ - public long getTotalProcessedCount() { + public MessageProcessorRunnable.MessageProcessorMetrics getMessageProcessorMetrics() { return partitionToMessageProcessorMap.values() .stream() - .map(MessageProcessorRunnable::getProcessedCounter) - .mapToLong(CounterMetric::count) - .sum(); - } - - /** - * Return total number of skipped updates across all partitions. - */ - public long getTotalSkippedCount() { - return partitionToMessageProcessorMap.values() - .stream() - .map(MessageProcessorRunnable::getSkippedCounter) - .mapToLong(CounterMetric::count) - .sum(); + .map(MessageProcessorRunnable::getMessageProcessorMetrics) + .reduce(MessageProcessorRunnable.MessageProcessorMetrics::combine) + .orElseGet(MessageProcessorRunnable.MessageProcessorMetrics::create); } /** diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/PollingIngestStats.java b/server/src/main/java/org/opensearch/indices/pollingingest/PollingIngestStats.java index 2fadb0ae203f4..08436608b0cc6 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/PollingIngestStats.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/PollingIngestStats.java @@ -25,7 +25,6 @@ public class PollingIngestStats implements Writeable, ToXContentFragment { private final MessageProcessorStats messageProcessorStats; private final ConsumerStats consumerStats; - // TODO: add error stats from error handling sink public PollingIngestStats(MessageProcessorStats messageProcessorStats, ConsumerStats consumerStats) { this.messageProcessorStats = messageProcessorStats; @@ -34,19 +33,46 @@ public PollingIngestStats(MessageProcessorStats messageProcessorStats, ConsumerS public PollingIngestStats(StreamInput in) throws IOException { long totalProcessedCount = in.readLong(); - long totalSkippedCount = in.readLong(); - this.messageProcessorStats = new MessageProcessorStats(totalProcessedCount, totalSkippedCount); + long totalInvalidMessageCount = in.readLong(); + long totalProcessorVersionConflictsCount = in.readLong(); + long totalProcessorFailedCount = in.readLong(); + long totalProcessorFailuresDroppedCount = in.readLong(); + long totalProcessorThreadInterruptCount = in.readLong(); + this.messageProcessorStats = new MessageProcessorStats( + totalProcessedCount, + totalInvalidMessageCount, + totalProcessorVersionConflictsCount, + totalProcessorFailedCount, + totalProcessorFailuresDroppedCount, + totalProcessorThreadInterruptCount + ); long totalPolledCount = in.readLong(); long lagInMillis = in.readLong(); - this.consumerStats = new ConsumerStats(totalPolledCount, lagInMillis); + long totalConsumerErrorCount = in.readLong(); + long totalPollerMessageFailureCount = in.readLong(); + long totalPollerMessageDroppedCount = in.readLong(); + this.consumerStats = new ConsumerStats( + totalPolledCount, + lagInMillis, + totalConsumerErrorCount, + totalPollerMessageFailureCount, + totalPollerMessageDroppedCount + ); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeLong(messageProcessorStats.totalProcessedCount); - out.writeLong(messageProcessorStats.totalSkippedCount); + out.writeLong(messageProcessorStats.totalInvalidMessageCount); + out.writeLong(messageProcessorStats.totalVersionConflictsCount); + out.writeLong(messageProcessorStats.totalFailedCount); + out.writeLong(messageProcessorStats.totalFailuresDroppedCount); + out.writeLong(messageProcessorStats.totalProcessorThreadInterruptCount); out.writeLong(consumerStats.totalPolledCount); out.writeLong(consumerStats.lagInMillis); + out.writeLong(consumerStats.totalConsumerErrorCount); + out.writeLong(consumerStats.totalPollerMessageFailureCount); + out.writeLong(consumerStats.totalPollerMessageDroppedCount); } @Override @@ -54,10 +80,17 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject("polling_ingest_stats"); builder.startObject("message_processor_stats"); builder.field("total_processed_count", messageProcessorStats.totalProcessedCount); - builder.field("total_skipped_count", messageProcessorStats.totalSkippedCount); + builder.field("total_invalid_message_count", messageProcessorStats.totalInvalidMessageCount); + builder.field("total_version_conflicts_count", messageProcessorStats.totalVersionConflictsCount); + builder.field("total_failed_count", messageProcessorStats.totalFailedCount); + builder.field("total_failures_dropped_count", messageProcessorStats.totalFailuresDroppedCount); + builder.field("total_processor_thread_interrupt_count", messageProcessorStats.totalProcessorThreadInterruptCount); builder.endObject(); builder.startObject("consumer_stats"); builder.field("total_polled_count", consumerStats.totalPolledCount); + builder.field("total_consumer_error_count", consumerStats.totalConsumerErrorCount); + builder.field("total_poller_message_failure_count", consumerStats.totalPollerMessageFailureCount); + builder.field("total_poller_message_dropped_count", consumerStats.totalPollerMessageDroppedCount); builder.field("lag_in_millis", consumerStats.lagInMillis); builder.endObject(); builder.endObject(); @@ -89,14 +122,16 @@ public int hashCode() { * Stats for message processor */ @ExperimentalApi - public record MessageProcessorStats(long totalProcessedCount, long totalSkippedCount) { + public record MessageProcessorStats(long totalProcessedCount, long totalInvalidMessageCount, long totalVersionConflictsCount, + long totalFailedCount, long totalFailuresDroppedCount, long totalProcessorThreadInterruptCount) { } /** * Stats for consumer (poller) */ @ExperimentalApi - public record ConsumerStats(long totalPolledCount, long lagInMillis) { + public record ConsumerStats(long totalPolledCount, long lagInMillis, long totalConsumerErrorCount, long totalPollerMessageFailureCount, + long totalPollerMessageDroppedCount) { } /** @@ -105,9 +140,16 @@ public record ConsumerStats(long totalPolledCount, long lagInMillis) { @ExperimentalApi public static class Builder { private long totalProcessedCount; - private long totalSkippedCount; + private long totalInvalidMessageCount; private long totalPolledCount; + private long totalVersionConflictsCount; + private long totalFailedCount; + private long totalFailuresDroppedCount; + private long totalProcessorThreadInterruptCount; private long lagInMillis; + private long totalConsumerErrorCount; + private long totalPollerMessageFailureCount; + private long totalPollerMessageDroppedCount; public Builder() {} @@ -121,8 +163,28 @@ public Builder setTotalPolledCount(long totalPolledCount) { return this; } - public Builder setTotalSkippedCount(long totalSkippedCount) { - this.totalSkippedCount = totalSkippedCount; + public Builder setTotalInvalidMessageCount(long totalInvalidMessageCount) { + this.totalInvalidMessageCount = totalInvalidMessageCount; + return this; + } + + public Builder setTotalProcessorVersionConflictsCount(long totalVersionConflictsCount) { + this.totalVersionConflictsCount = totalVersionConflictsCount; + return this; + } + + public Builder setTotalProcessorFailedCount(long totalFailedCount) { + this.totalFailedCount = totalFailedCount; + return this; + } + + public Builder setTotalProcessorFailuresDroppedCount(long totalFailuresDroppedCount) { + this.totalFailuresDroppedCount = totalFailuresDroppedCount; + return this; + } + + public Builder setTotalProcessorThreadInterruptCount(long totalProcessorThreadInterruptCount) { + this.totalProcessorThreadInterruptCount = totalProcessorThreadInterruptCount; return this; } @@ -131,9 +193,37 @@ public Builder setLagInMillis(long lagInMillis) { return this; } + public Builder setTotalConsumerErrorCount(long totalConsumerErrorCount) { + this.totalConsumerErrorCount = totalConsumerErrorCount; + return this; + } + + public Builder setTotalPollerMessageFailureCount(long totalPollerMessageFailureCount) { + this.totalPollerMessageFailureCount = totalPollerMessageFailureCount; + return this; + } + + public Builder setTotalPollerMessageDroppedCount(long totalPollerMessageDroppedCount) { + this.totalPollerMessageDroppedCount = totalPollerMessageDroppedCount; + return this; + } + public PollingIngestStats build() { - MessageProcessorStats messageProcessorStats = new MessageProcessorStats(totalProcessedCount, totalSkippedCount); - ConsumerStats consumerStats = new ConsumerStats(totalPolledCount, lagInMillis); + MessageProcessorStats messageProcessorStats = new MessageProcessorStats( + totalProcessedCount, + totalInvalidMessageCount, + totalVersionConflictsCount, + totalFailedCount, + totalFailuresDroppedCount, + totalProcessorThreadInterruptCount + ); + ConsumerStats consumerStats = new ConsumerStats( + totalPolledCount, + lagInMillis, + totalConsumerErrorCount, + totalPollerMessageFailureCount, + totalPollerMessageDroppedCount + ); return new PollingIngestStats(messageProcessorStats, consumerStats); } } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java index f76305a327b04..4bd1199bf41ae 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java @@ -29,6 +29,7 @@ public void testConstructorAndGetters() { IngestionSource source = new IngestionSource.Builder("type").setParams(params) .setPointerInitReset(pointerInitReset) .setErrorStrategy(DROP) + .setBlockingQueueSize(1000) .build(); assertEquals("type", source.getType()); @@ -38,6 +39,7 @@ public void testConstructorAndGetters() { assertEquals(params, source.params()); assertEquals(1000, source.getMaxPollSize()); assertEquals(1000, source.getPollTimeout()); + assertEquals(1000, source.getBlockingQueueSize()); } public void testEquals() { @@ -103,7 +105,7 @@ public void testToString() { .setErrorStrategy(DROP) .build(); String expected = - "IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='REWIND_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}, maxPollSize=1000, pollTimeout=1000, numProcessorThreads=1}"; + "IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='REWIND_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}, maxPollSize=1000, pollTimeout=1000, numProcessorThreads=1, blockingQueueSize=100}"; assertEquals(expected, source.toString()); } } diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java index ffc3ccd5ec0fe..3cb0f8a8e1cdc 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; @@ -305,7 +306,10 @@ public void testDropErrorIngestionStrategy() throws TimeoutException, Interrupte ); poller.start(); Thread.sleep(sleepTime); + PollingIngestStats pollingIngestStats = poller.getStats(); + assertThat(pollingIngestStats.getConsumerStats().totalPollerMessageFailureCount(), is(1L)); + assertThat(pollingIngestStats.getConsumerStats().totalPollerMessageDroppedCount(), is(1L)); verify(errorStrategy, times(1)).handleError(any(), eq(IngestionErrorStrategy.ErrorStage.POLLING)); verify(mockQueue, times(4)).put(any()); blockingQueueContainer.close(); @@ -359,6 +363,8 @@ public void testBlockErrorIngestionStrategy() throws TimeoutException, Interrupt poller.start(); Thread.sleep(sleepTime); + PollingIngestStats pollingIngestStats = poller.getStats(); + assertThat(pollingIngestStats.getConsumerStats().totalPollerMessageDroppedCount(), is(0L)); verify(errorStrategy, times(1)).handleError(any(), eq(IngestionErrorStrategy.ErrorStage.POLLING)); assertEquals(DefaultStreamPoller.State.PAUSED, poller.getState()); assertTrue(poller.isPaused()); diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/MessageProcessorTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/MessageProcessorTests.java index 4f8d9c0f7be3d..bef0be1e29bf0 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/MessageProcessorTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/MessageProcessorTests.java @@ -8,7 +8,6 @@ package org.opensearch.indices.pollingingest; -import org.opensearch.common.metrics.CounterMetric; import org.opensearch.index.Message; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.FakeIngestionSource; @@ -59,7 +58,7 @@ public void testGetIndexOperation() throws IOException { Engine.Operation operation = processor.getOperation( new ShardUpdateMessage(pointer, mock(Message.class), IngestionUtils.getParsedPayloadMap(payload), 0), - new CounterMetric() + MessageProcessorRunnable.MessageProcessorMetrics.create() ); assertTrue(operation instanceof Engine.Index); @@ -75,7 +74,7 @@ public void testGetDeleteOperation() throws IOException { Engine.Operation operation = processor.getOperation( new ShardUpdateMessage(pointer, mock(Message.class), IngestionUtils.getParsedPayloadMap(payload), -1), - new CounterMetric() + MessageProcessorRunnable.MessageProcessorMetrics.create() ); assertTrue(operation instanceof Engine.Delete); @@ -89,7 +88,7 @@ public void testSkipNoSourceIndexOperation() throws IOException { Engine.Operation operation = processor.getOperation( new ShardUpdateMessage(pointer, mock(Message.class), IngestionUtils.getParsedPayloadMap(payload), -1), - new CounterMetric() + MessageProcessorRunnable.MessageProcessorMetrics.create() ); assertNull(operation); @@ -98,7 +97,7 @@ public void testSkipNoSourceIndexOperation() throws IOException { operation = processor.getOperation( new ShardUpdateMessage(pointer, mock(Message.class), IngestionUtils.getParsedPayloadMap(payload), 0), - new CounterMetric() + MessageProcessorRunnable.MessageProcessorMetrics.create() ); assertNull(operation); } @@ -109,7 +108,7 @@ public void testUnsupportedOperation() throws IOException { Engine.Operation operation = processor.getOperation( new ShardUpdateMessage(pointer, mock(Message.class), IngestionUtils.getParsedPayloadMap(payload), -1), - new CounterMetric() + MessageProcessorRunnable.MessageProcessorMetrics.create() ); assertNull(operation); } @@ -123,7 +122,7 @@ public void testMissingID() throws IOException { when(parsedDocument.rootDoc()).thenReturn(new ParseContext.Document()); Engine.Operation operation = processor.getOperation( new ShardUpdateMessage(pointer, mock(Message.class), IngestionUtils.getParsedPayloadMap(payload), -1), - new CounterMetric() + MessageProcessorRunnable.MessageProcessorMetrics.create() ); assertNull(operation); } @@ -137,8 +136,34 @@ public void testDeleteWithAutoGeneratedID() throws IOException { when(parsedDocument.rootDoc()).thenReturn(new ParseContext.Document()); Engine.Operation operation = processor.getOperation( new ShardUpdateMessage(pointer, mock(Message.class), IngestionUtils.getParsedPayloadMap(payload), System.currentTimeMillis()), - new CounterMetric() + MessageProcessorRunnable.MessageProcessorMetrics.create() ); assertTrue(operation instanceof Engine.NoOp); } + + public void testMessageProcessorMetrics() { + MessageProcessorRunnable.MessageProcessorMetrics metrics1 = MessageProcessorRunnable.MessageProcessorMetrics.create(); + metrics1.processedCounter().inc(100); + metrics1.invalidMessageCounter().inc(5); + metrics1.versionConflictCounter().inc(2); + metrics1.failedMessageCounter().inc(1); + metrics1.failedMessageDroppedCounter().inc(1); + metrics1.processorThreadInterruptCounter().inc(0); + + MessageProcessorRunnable.MessageProcessorMetrics metrics2 = MessageProcessorRunnable.MessageProcessorMetrics.create(); + metrics2.processedCounter().inc(100); + metrics2.invalidMessageCounter().inc(0); + metrics2.versionConflictCounter().inc(0); + metrics2.failedMessageCounter().inc(100); + metrics2.failedMessageDroppedCounter().inc(100); + metrics2.processorThreadInterruptCounter().inc(1); + + MessageProcessorRunnable.MessageProcessorMetrics combinedMetric = metrics1.combine(metrics2); + assertEquals(200, combinedMetric.processedCounter().count()); + assertEquals(5, combinedMetric.invalidMessageCounter().count()); + assertEquals(2, combinedMetric.versionConflictCounter().count()); + assertEquals(101, combinedMetric.failedMessageCounter().count()); + assertEquals(101, combinedMetric.failedMessageDroppedCounter().count()); + assertEquals(1, combinedMetric.processorThreadInterruptCounter().count()); + } } diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainerTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainerTests.java index 3a98ef36803e8..3c21cc31b3f1b 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainerTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainerTests.java @@ -92,7 +92,7 @@ public void testAddMessage() throws TimeoutException, InterruptedException { // start processor threads and verify updates are processed blockingQueueContainer.startProcessorThreads(); updatesLatch.await(); - assertEquals(2, blockingQueueContainer.getTotalProcessedCount()); + assertEquals(2, blockingQueueContainer.getMessageProcessorMetrics().processedCounter().count()); verify(processor, times(2)).process(any(), any()); } diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/PollingIngestStatsTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/PollingIngestStatsTests.java index 0b9b61f1b8d6b..e7a372dbd2258 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/PollingIngestStatsTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/PollingIngestStatsTests.java @@ -29,10 +29,24 @@ public void testToXContent() throws IOException { String expected = "{\"polling_ingest_stats\":{\"message_processor_stats\":{\"total_processed_count\":" + stats.getMessageProcessorStats().totalProcessedCount() - + ",\"total_skipped_count\":" - + stats.getMessageProcessorStats().totalSkippedCount() + + ",\"total_invalid_message_count\":" + + stats.getMessageProcessorStats().totalInvalidMessageCount() + + ",\"total_version_conflicts_count\":" + + stats.getMessageProcessorStats().totalVersionConflictsCount() + + ",\"total_failed_count\":" + + stats.getMessageProcessorStats().totalFailedCount() + + ",\"total_failures_dropped_count\":" + + stats.getMessageProcessorStats().totalFailuresDroppedCount() + + ",\"total_processor_thread_interrupt_count\":" + + stats.getMessageProcessorStats().totalProcessorThreadInterruptCount() + "},\"consumer_stats\":{\"total_polled_count\":" + stats.getConsumerStats().totalPolledCount() + + ",\"total_consumer_error_count\":" + + stats.getConsumerStats().totalConsumerErrorCount() + + ",\"total_poller_message_failure_count\":" + + stats.getConsumerStats().totalPollerMessageFailureCount() + + ",\"total_poller_message_dropped_count\":" + + stats.getConsumerStats().totalPollerMessageDroppedCount() + ",\"lag_in_millis\":" + stats.getConsumerStats().lagInMillis() + "}}}"; @@ -56,7 +70,7 @@ public void testSerialization() throws IOException { private PollingIngestStats createTestInstance() { return PollingIngestStats.builder() .setTotalProcessedCount(randomNonNegativeLong()) - .setTotalSkippedCount(randomNonNegativeLong()) + .setTotalInvalidMessageCount(randomNonNegativeLong()) .setTotalPolledCount(randomNonNegativeLong()) .setLagInMillis(randomNonNegativeLong()) .build();