Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Removed

### Fixed
- Fix ingest pipeline cannot be executed when retry the failed index requests for update_by_query API and reindex API ([#18003](https://github.com/opensearch-project/OpenSearch/pull/18003))
- With creation of FilterFieldType, we need unwrap all the MappedFieldType before using the instanceof check. ([#17951](https://github.com/opensearch-project/OpenSearch/pull/17951))
- Fix simultaneously creating a snapshot and updating the repository can potentially trigger an infinite loop ([#17532](https://github.com/opensearch-project/OpenSearch/pull/17532))
- Remove package org.opensearch.transport.grpc and replace with org.opensearch.plugin.transport.grpc ([#18031](https://github.com/opensearch-project/OpenSearch/pull/18031))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@ public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>>
*/
protected abstract RequestWrapper<?> buildRequest(ScrollableHitSource.Hit doc);

/**
* Build the {@link BulkRequest} for a new bulk operation.
* @return the new BulkRequest
*/
protected BulkRequest buildBulkRequest() {
return new BulkRequest();
}

/**
* Copies the metadata from a hit to the request.
*/
Expand Down Expand Up @@ -254,7 +262,7 @@ protected boolean accept(ScrollableHitSource.Hit doc) {
}

private BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs) {
BulkRequest bulkRequest = new BulkRequest();
BulkRequest bulkRequest = buildBulkRequest();
for (ScrollableHitSource.Hit doc : docs) {
if (accept(doc)) {
RequestWrapper<?> request = scriptApplier.apply(copyMetadata(buildRequest(doc), doc), doc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
Expand Down Expand Up @@ -356,6 +357,11 @@ public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>>
return super.buildScriptApplier();
}

@Override
protected BulkRequest buildBulkRequest() {
return new BulkRequest().pipeline(mainRequest.getDestination().getPipeline());
}

@Override
protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc) {
IndexRequest index = new IndexRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>>
return super.buildScriptApplier();
}

@Override
protected org.opensearch.action.bulk.BulkRequest buildBulkRequest() {
return new org.opensearch.action.bulk.BulkRequest().pipeline(mainRequest.getPipeline());
}

@Override
protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc) {
IndexRequest index = new IndexRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,39 @@
private final long seqNo;
private final long term;
private final boolean aborted;
private final FailureSource source;

/**
* The source of the failure, denotes which step has failure during the bulk processing.
*/
@PublicApi(since = "3.0.0")
public enum FailureSource {
UNKNOWN((byte) 0),
// Pipeline execution failure
PIPELINE((byte) 1),
VALIDATION((byte) 2),
WRITE_PROCESSING((byte) 3);

private final byte sourceType;

FailureSource(byte sourceType) {
this.sourceType = sourceType;
}

public byte getSourceType() {
return sourceType;
}

public static FailureSource fromSourceType(byte sourceType) {
return switch (sourceType) {
case 0 -> UNKNOWN;
case 1 -> PIPELINE;
case 2 -> VALIDATION;
case 3 -> WRITE_PROCESSING;
default -> throw new IllegalArgumentException("Unknown failure source: [" + sourceType + "]");

Check warning on line 229 in server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/bulk/BulkItemResponse.java#L226-L229

Added lines #L226 - L229 were not covered by tests
};
}
}

public static final ConstructingObjectParser<Failure, Void> PARSER = new ConstructingObjectParser<>(
"bulk_failures",
Expand Down Expand Up @@ -224,7 +257,21 @@
ExceptionsHelper.status(cause),
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
false
false,
FailureSource.UNKNOWN
);
}

public Failure(String index, String id, Exception cause, FailureSource source) {
this(
index,
id,
cause,
ExceptionsHelper.status(cause),
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
false,
source
);
}

Expand All @@ -236,27 +283,47 @@
ExceptionsHelper.status(cause),
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
aborted
aborted,
FailureSource.UNKNOWN
);
}

public Failure(String index, String id, Exception cause, RestStatus status) {
this(index, id, cause, status, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, false);
this(
index,
id,
cause,
status,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
false,
FailureSource.UNKNOWN
);
}

/** For write failures after operation was assigned a sequence number. */
public Failure(String index, String id, Exception cause, long seqNo, long term) {
this(index, id, cause, ExceptionsHelper.status(cause), seqNo, term, false);
}

private Failure(String index, String id, Exception cause, RestStatus status, long seqNo, long term, boolean aborted) {
this(index, id, cause, ExceptionsHelper.status(cause), seqNo, term, false, FailureSource.UNKNOWN);
}

private Failure(
String index,
String id,
Exception cause,
RestStatus status,
long seqNo,
long term,
boolean aborted,
FailureSource source
) {
this.index = index;
this.id = id;
this.cause = cause;
this.status = status;
this.seqNo = seqNo;
this.term = term;
this.aborted = aborted;
this.source = source;
}

/**
Expand All @@ -275,6 +342,11 @@
seqNo = in.readZLong();
term = in.readVLong();
aborted = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
source = FailureSource.fromSourceType(in.readByte());
} else {
source = FailureSource.UNKNOWN;
}
}

@Override
Expand All @@ -288,6 +360,9 @@
out.writeZLong(seqNo);
out.writeVLong(term);
out.writeBoolean(aborted);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeByte(source.getSourceType());
}
}

/**
Expand Down Expand Up @@ -352,6 +427,10 @@
return aborted;
}

public FailureSource getSource() {
return source;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(INDEX_FIELD, index);
Expand Down
20 changes: 18 additions & 2 deletions server/src/main/java/org/opensearch/action/bulk/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -169,11 +171,25 @@ private void retry(BulkRequest bulkRequestForRetry) {
}

private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) {
BulkRequest requestToReissue = new BulkRequest();
// global pipeline should be set in the new Bulk Request
String globalPipeline = this.currentBulkRequest.pipeline();
BulkRequest requestToReissue = new BulkRequest().pipeline(globalPipeline);
int index = 0;
for (BulkItemResponse bulkItemResponse : bulkItemResponses.getItems()) {
if (bulkItemResponse.isFailed()) {
requestToReissue.add(currentBulkRequest.requests().get(index));
DocWriteRequest<?> docWriteRequest = currentBulkRequest.requests().get(index);
// when executing pipeline failed with retryable exception, the pipeline needs to be executed again
if (bulkItemResponse.getFailure().getSource() == BulkItemResponse.Failure.FailureSource.PIPELINE
&& docWriteRequest instanceof IndexRequest indexRequest) {
// Reset pipeline configuration for retry, after the first execution, the pipeline was set to _none, so we need to
// reset it
// to the global pipeline if the global pipeline exists,
// if not, set to null to ensure the default pipeline can be resolved and set
// see org.opensearch.ingest.IngestService.resolvePipelines()
indexRequest.setPipeline(globalPipeline);
indexRequest.isPipelineResolved(false);
}
requestToReissue.add(docWriteRequest);
}
index++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,12 @@ synchronized void markItemAsFailed(int slot, Exception e) {
// 2) Add a bulk item failure for this request
// 3) Continue with the next request in the bulk.
failedSlots.set(slot);
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.id(), e);
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(
indexRequest.index(),
indexRequest.id(),
e,
BulkItemResponse.Failure.FailureSource.PIPELINE
);
itemResponses.add(new BulkItemResponse(slot, indexRequest.opType(), failure));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.bulk.BulkItemResponse.Failure;
Expand All @@ -42,11 +43,15 @@
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.action.update.UpdateResponseTests;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;

import java.io.IOException;

Expand Down Expand Up @@ -129,6 +134,90 @@ public void testFailureToAndFromXContent() throws IOException {
assertBulkItemResponse(expectedBulkItemResponse, parsedBulkItemResponse);
}

public void testSerializationForFailure() throws Exception {
final Failure failure = new Failure("index", "id", new OpenSearchException("test"));
try (BytesStreamOutput out = new BytesStreamOutput()) {
failure.writeTo(out);

final Failure deserializedFailure;
try (StreamInput in = out.bytes().streamInput()) {
deserializedFailure = new Failure(in);
}
assertEquals(failure.getIndex(), deserializedFailure.getIndex());
assertEquals(failure.getId(), deserializedFailure.getId());
assertEquals(failure.getMessage(), deserializedFailure.getMessage());
assertEquals(failure.getStatus(), deserializedFailure.getStatus());
assertEquals(failure.getSource(), deserializedFailure.getSource());
assertDeepEquals((OpenSearchException) failure.getCause(), (OpenSearchException) deserializedFailure.getCause());
}
}

public void testBwcSerialization() throws Exception {
{
final Failure failure = new Failure("index", "id", new OpenSearchException("test"));
final Version version = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(version);
failure.writeTo(out);

try (StreamInput in = out.bytes().streamInput()) {
in.setVersion(version);
String index = in.readString();
String id = in.readOptionalString();
Exception cause = in.readException();
RestStatus status = ExceptionsHelper.status(cause);
long seqNo = in.readZLong();
long term = in.readVLong();
boolean aborted = in.readBoolean();
Failure.FailureSource failureSource = Failure.FailureSource.UNKNOWN;
if (version.onOrAfter(Version.V_3_0_0)) {
failureSource = Failure.FailureSource.fromSourceType(in.readByte());
}
assertEquals(failure.getIndex(), index);
assertEquals(failure.getId(), id);
assertEquals(failure.getStatus(), status);
assertEquals(failure.getSource(), failureSource);
assertEquals(failure.getSeqNo(), seqNo);
assertEquals(failure.getTerm(), term);
assertEquals(failure.isAborted(), aborted);
assertDeepEquals((OpenSearchException) failure.getCause(), (OpenSearchException) cause);
}
}
}

{
final Failure failure = new Failure("index", "id", new OpenSearchException("test"));
final Version version = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(version);
out.writeString(failure.getIndex());
out.writeOptionalString(failure.getId());
out.writeException(failure.getCause());
out.writeZLong(failure.getSeqNo());
out.writeVLong(failure.getTerm());
out.writeBoolean(failure.isAborted());
if (version.onOrAfter(Version.V_3_0_0)) {
out.writeByte(failure.getSource().getSourceType());
}

final Failure deserializedFailure;
try (StreamInput in = out.bytes().streamInput()) {
in.setVersion(version);
deserializedFailure = new Failure(in);
}

assertEquals(failure.getIndex(), deserializedFailure.getIndex());
assertEquals(failure.getId(), deserializedFailure.getId());
assertEquals(failure.getStatus(), deserializedFailure.getStatus());
assertEquals(failure.getSource(), deserializedFailure.getSource());
assertEquals(failure.getSeqNo(), deserializedFailure.getSeqNo());
assertEquals(failure.getTerm(), deserializedFailure.getTerm());
assertEquals(failure.isAborted(), deserializedFailure.isAborted());
assertDeepEquals((OpenSearchException) failure.getCause(), (OpenSearchException) deserializedFailure.getCause());
}
}
}

public static void assertBulkItemResponse(BulkItemResponse expected, BulkItemResponse actual) {
assertEquals(expected.getItemId(), actual.getItemId());
assertEquals(expected.getIndex(), actual.getIndex());
Expand All @@ -145,7 +234,7 @@ public static void assertBulkItemResponse(BulkItemResponse expected, BulkItemRes
assertEquals(expectedFailure.getId(), actualFailure.getId());
assertEquals(expectedFailure.getMessage(), actualFailure.getMessage());
assertEquals(expectedFailure.getStatus(), actualFailure.getStatus());

assertEquals(expectedFailure.getSource(), actualFailure.getSource());
assertDeepEquals((OpenSearchException) expectedFailure.getCause(), (OpenSearchException) actualFailure.getCause());
} else {
DocWriteResponse expectedDocResponse = expected.getResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void testBulkRequestModifier() {
assertThat(item.getFailure().getIndex(), equalTo("_index"));
assertThat(item.getFailure().getId(), equalTo(String.valueOf(j)));
assertThat(item.getFailure().getMessage(), equalTo("java.lang.RuntimeException"));
assertThat(item.getFailure().getSource(), equalTo(BulkItemResponse.Failure.FailureSource.PIPELINE));
} else {
assertThat(bulkResponse.getItems()[j], nullValue());
}
Expand Down
Loading
Loading