Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove unnecessary looping in field data cache clear ([#19116](https://github.com/opensearch-project/OpenSearch/pull/19116))
- [Flaky Test] Fix flaky test IngestFromKinesisIT.testAllActiveIngestion ([#19380](https://github.com/opensearch-project/OpenSearch/pull/19380))
- Fix lag metric for pull-based ingestion when streaming source is empty ([#19393](https://github.com/opensearch-project/OpenSearch/pull/19393))
- Fix ingestion state xcontent serialization in IndexMetadata and fail fast on mapping errors([#19320](https://github.com/opensearch-project/OpenSearch/pull/19320))

### Dependencies
- Update to Gradle 9.1.0 ([#19329](https://github.com/opensearch-project/OpenSearch/pull/19329))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public void testKafkaIngestion_RewindByTimeStamp() {
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.param.auto.offset.reset", "latest")
.put("ingestion_source.all_active", true)
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);
Expand Down Expand Up @@ -134,6 +135,7 @@ public void testKafkaIngestion_RewindByOffset() {
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.param.auto.offset.reset", "latest")
.put("ingestion_source.all_active", true)
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public void testKinesisIngestion_RewindByOffset() throws InterruptedException {
"ingestion_source.param.endpoint_override",
localstack.getEndpointOverride(LocalStackContainer.Service.KINESIS).toString()
)
.put("ingestion_source.all_active", true)
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,8 @@ public Iterator<Setting<?>> settings() {

/**
* Defines if all-active pull-based ingestion is enabled. In this mode, replicas will directly consume from the
* streaming source and process the updates. This mode is currently not supported along with segment replication.
* streaming source and process the updates. In the default document replication mode, this setting must be enabled.
* This mode is currently not supported with segment replication.
*/
public static final String SETTING_INGESTION_SOURCE_ALL_ACTIVE_INGESTION = "index.ingestion_source.all_active";
public static final Setting<Boolean> INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING = Setting.boolSetting(
Expand All @@ -923,21 +924,33 @@ public void validate(final Boolean value) {}
@Override
public void validate(final Boolean value, final Map<Setting<?>, Object> settings) {
final Object replicationType = settings.get(INDEX_REPLICATION_TYPE_SETTING);
if (ReplicationType.SEGMENT.equals(replicationType) && value) {
final Object ingestionSourceType = settings.get(INGESTION_SOURCE_TYPE_SETTING);
boolean isPullBasedIngestionEnabled = NONE_INGESTION_SOURCE_TYPE.equals(ingestionSourceType) == false;

if (isPullBasedIngestionEnabled && ReplicationType.SEGMENT.equals(replicationType) && value) {
throw new IllegalArgumentException(
"To enable "
+ INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.getKey()
+ ", "
+ INDEX_REPLICATION_TYPE_SETTING.getKey()
+ " should not be set to "
"Replication type "
+ ReplicationType.SEGMENT
+ " is not supported in pull-based ingestion when "
+ INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.getKey()
+ " is enabled"
);
}

if (isPullBasedIngestionEnabled && ReplicationType.DOCUMENT.equals(replicationType) && value == false) {
throw new IllegalArgumentException(
"Replication type "
+ ReplicationType.DOCUMENT
+ " is not supported in pull-based ingestion when "
+ INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.getKey()
+ " is not enabled"
);
}
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = List.of(INDEX_REPLICATION_TYPE_SETTING);
final List<Setting<?>> settings = List.of(INDEX_REPLICATION_TYPE_SETTING, INGESTION_SOURCE_TYPE_SETTING);
return settings.iterator();
}
},
Expand Down Expand Up @@ -981,6 +994,7 @@ public Iterator<Setting<?>> settings() {
public static final String TRANSLOG_METADATA_KEY = "translog_metadata";
public static final String CONTEXT_KEY = "context";
public static final String INGESTION_SOURCE_KEY = "ingestion_source";
public static final String INGESTION_STATUS_KEY = "ingestion_status";

public static final String INDEX_STATE_FILE_PREFIX = "state-";

Expand Down Expand Up @@ -2304,6 +2318,13 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build
indexMetadata.context.toXContent(builder, params);
}

if (indexMetadata.getCreationVersion().onOrAfter(Version.V_3_3_0) && indexMetadata.ingestionStatus != null) {
// ingestionStatus field is introduced from OS 3.x. But this field is included in XContent serialization only from OS 3.3
// onwards.
builder.field(INGESTION_STATUS_KEY);
indexMetadata.ingestionStatus.toXContent(builder, params);
}

builder.endObject();
}

Expand Down Expand Up @@ -2387,6 +2408,8 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti
parser.skipChildren();
} else if (CONTEXT_KEY.equals(currentFieldName)) {
builder.context(Context.fromXContent(parser));
} else if (INGESTION_STATUS_KEY.equals(currentFieldName)) {
builder.ingestionStatus(IngestionStatus.fromXContent(parser));
} else {
// assume it's custom index metadata
builder.putCustom(currentFieldName, parser.mapStrings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;

/**
* Indicates pull-based ingestion status.
*/
@ExperimentalApi
public record IngestionStatus(boolean isPaused) implements Writeable {
public record IngestionStatus(boolean isPaused) implements Writeable, ToXContent {
public static final String IS_PAUSED = "is_paused";

public IngestionStatus(StreamInput in) throws IOException {
this(in.readBoolean());
Expand All @@ -30,6 +34,37 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(isPaused);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(IS_PAUSED, isPaused);
builder.endObject();
return builder;
}

public static IngestionStatus fromXContent(XContentParser parser) throws IOException {
boolean isPaused = false;

XContentParser.Token token = parser.currentToken();
if (token == null) {
token = parser.nextToken();
}

if (token == XContentParser.Token.START_OBJECT) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
String fieldName = parser.currentName();
if (IS_PAUSED.equals(fieldName)) {
parser.nextToken();
isPaused = parser.booleanValue();
}
}
}
}

return new IngestionStatus(isPaused);
}

public static IngestionStatus getDefaultValue() {
return new IngestionStatus(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ protected void startPoll() {
// Currently we do not have a good way to skip past the failing messages.
// The user will have the option to manually update the offset and resume ingestion.
// todo: support retry?
logger.error("Pausing ingestion. Fatal error occurred in polling the shard {}: {}", shardId, e);
logger.error("Pausing ingestion. Fatal error occurred in polling the shard {} for index {}: {}", shardId, indexName, e);
totalConsumerErrorCount.inc();
pause();
}
Expand Down Expand Up @@ -276,7 +276,13 @@ private IngestionShardPointer processRecords(
result.getPointer().asString()
);
} catch (Exception e) {
logger.error("Error in processing a record. Shard {}, pointer {}: {}", shardId, result.getPointer().asString(), e);
logger.error(
"[Default Poller] Error processing record. Index={}, Shard={}, pointer={}: error={}",
indexName,
shardId,
result.getPointer().asString(),
e
);
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);
totalPollerMessageFailureCount.inc();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.index.engine.IngestionEngine;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.MapperParsingException;
import org.opensearch.index.mapper.ParseContext;
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.mapper.SourceToParse;
Expand Down Expand Up @@ -65,33 +66,50 @@ public class MessageProcessorRunnable implements Runnable, Closeable {
private volatile boolean closed = false;
private volatile IngestionErrorStrategy errorStrategy;

private final String indexName;
private final int shardId;

/**
* Constructor.
*
* @param blockingQueue the blocking queue to poll messages from
* @param engine the ingestion engine
* @param errorStrategy the error strategy/policy to use
*/
public MessageProcessorRunnable(
BlockingQueue<ShardUpdateMessage<? extends IngestionShardPointer, ? extends Message>> blockingQueue,
IngestionEngine engine,
IngestionErrorStrategy errorStrategy
) {
this(blockingQueue, new MessageProcessor(engine), errorStrategy);
this(
blockingQueue,
new MessageProcessor(engine),
errorStrategy,
engine.config().getShardId().getIndexName(),
engine.config().getShardId().getId()
);
}

/**
* Constructor visible for testing.
* @param blockingQueue the blocking queue to poll messages from
* @param messageProcessor the message processor
* @param errorStrategy the error strategy/policy to use
* @param indexName the index name
* @param shardId the shard ID
*/
MessageProcessorRunnable(
BlockingQueue<ShardUpdateMessage<? extends IngestionShardPointer, ? extends Message>> blockingQueue,
MessageProcessor messageProcessor,
IngestionErrorStrategy errorStrategy
IngestionErrorStrategy errorStrategy,
String indexName,
int shardId
) {
this.blockingQueue = Objects.requireNonNull(blockingQueue);
this.messageProcessor = messageProcessor;
this.errorStrategy = errorStrategy;
this.indexName = indexName;
this.shardId = shardId;
}

static class MessageProcessor {
Expand Down Expand Up @@ -309,9 +327,10 @@ public void run() {
logger.debug("Dropping message due to version conflict. ShardPointer: " + shardUpdateMessage.pointer().asString(), e);
shardUpdateMessage = null;
} catch (Exception e) {
logger.error("[Message Processor] Error processing message. Index={}, Shard={}, error={}", indexName, shardId, e);
messageProcessorMetrics.failedMessageCounter.inc();
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.PROCESSING);
boolean retriesExhausted = retryCount >= MIN_RETRY_COUNT || e instanceof IllegalArgumentException;
boolean retriesExhausted = hasExhaustedRetries(e, retryCount);
if (retriesExhausted && errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.PROCESSING)) {
logDroppedMessage(shardUpdateMessage);
shardUpdateMessage = null;
Expand All @@ -336,6 +355,15 @@ private void waitBeforeRetry() {
}
}

private boolean hasExhaustedRetries(Exception e, int retryCount) {
if (retryCount >= MIN_RETRY_COUNT) {
return true;
}

// Don't retry validation/parsing errors
return e instanceof IllegalArgumentException || e instanceof MapperParsingException;
}

public MessageProcessorMetrics getMessageProcessorMetrics() {
return messageProcessorMetrics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ public void testToXContent() throws IOException {
+ " \"0\" : [ ]\n"
+ " },\n"
+ " \"rollover_info\" : { },\n"
+ " \"system\" : false\n"
+ " \"system\" : false,\n"
+ " \"ingestion_status\" : {\n"
+ " \"is_paused\" : false\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"index-graveyard\" : {\n"
Expand Down Expand Up @@ -252,7 +255,10 @@ public void testToXContent() throws IOException {
+ " \"0\" : [ ]\n"
+ " },\n"
+ " \"rollover_info\" : { },\n"
+ " \"system\" : false\n"
+ " \"system\" : false,\n"
+ " \"ingestion_status\" : {\n"
+ " \"is_paused\" : false\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"index-graveyard\" : {\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,10 @@ public void testToXContent() throws IOException {
+ " \"time\" : 1\n"
+ " }\n"
+ " },\n"
+ " \"system\" : false\n"
+ " \"system\" : false,\n"
+ " \"ingestion_status\" : {\n"
+ " \"is_paused\" : false\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"index-graveyard\" : {\n"
Expand Down Expand Up @@ -477,7 +480,10 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti
+ " \"time\" : 1\n"
+ " }\n"
+ " },\n"
+ " \"system\" : false\n"
+ " \"system\" : false,\n"
+ " \"ingestion_status\" : {\n"
+ " \"is_paused\" : false\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"index-graveyard\" : {\n"
Expand Down Expand Up @@ -686,7 +692,10 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti
+ " \"time\" : 1\n"
+ " }\n"
+ " },\n"
+ " \"system\" : false\n"
+ " \"system\" : false,\n"
+ " \"ingestion_status\" : {\n"
+ " \"is_paused\" : false\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"index-graveyard\" : {\n"
Expand Down Expand Up @@ -835,7 +844,10 @@ public void testToXContentSameTypeName() throws IOException {
+ " \"0\" : [ ]\n"
+ " },\n"
+ " \"rollover_info\" : { },\n"
+ " \"system\" : false\n"
+ " \"system\" : false,\n"
+ " \"ingestion_status\" : {\n"
+ " \"is_paused\" : false\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"index-graveyard\" : {\n"
Expand Down
Loading
Loading