Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(request.requiredContent(), defaultIndex, defaultRouting,
null, defaultPipeline, defaultRequireAlias, true, request.getXContentType());
null, defaultPipeline, defaultRequireAlias, true, request.getXContentType(),
Comment thread
pgomulka marked this conversation as resolved.
request.getRestApiVersion());

// short circuit the call to the transport layer
return channel -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,8 @@ XContentParser createParser(NamedXContentRegistry xContentRegistry, DeprecationH
XContentParser createParserForCompatibility(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler,
InputStream is, RestApiVersion restApiVersion) throws IOException;

XContentParser createParserForCompatibility(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, byte[] data, int offset, int length,
RestApiVersion restApiVersion) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,11 @@ public XContentParser createParserForCompatibility(NamedXContentRegistry xConten
return new CborXContentParser(xContentRegistry, deprecationHandler, cborFactory.createParser(is), restApiVersion);
}

@Override
public XContentParser createParserForCompatibility(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler,
byte[] data, int offset, int length, RestApiVersion restApiVersion)
throws IOException {
return new CborXContentParser(xContentRegistry, deprecationHandler, cborFactory.createParser(data, offset, length), restApiVersion);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,11 @@ public XContentParser createParserForCompatibility(NamedXContentRegistry xConten
return new JsonXContentParser(xContentRegistry, deprecationHandler, jsonFactory.createParser(is), restApiVersion);
}

@Override
public XContentParser createParserForCompatibility(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler,
byte[] data, int offset, int length, RestApiVersion restApiVersion)
throws IOException {
return new JsonXContentParser(xContentRegistry, deprecationHandler, jsonFactory.createParser(data, offset, length), restApiVersion);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,12 @@ public XContentParser createParserForCompatibility(NamedXContentRegistry xConten
RestApiVersion restApiVersion) throws IOException {
return new SmileXContentParser(xContentRegistry, deprecationHandler, smileFactory.createParser(is), restApiVersion);
}

@Override
public XContentParser createParserForCompatibility(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler,
byte[] data, int offset, int length, RestApiVersion restApiVersion)
throws IOException {
return new SmileXContentParser(xContentRegistry, deprecationHandler, smileFactory.createParser(data, offset, length),
restApiVersion);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,13 @@ public XContentParser createParserForCompatibility(NamedXContentRegistry xConten
return new YamlXContentParser(xContentRegistry, deprecationHandler, yamlFactory.createParser(is), restApiVersion);
}

@Override
public XContentParser createParserForCompatibility(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, byte[] data, int offset, int length,
RestApiVersion restApiVersion) throws IOException {
return new YamlXContentParser(xContentRegistry, deprecationHandler, yamlFactory.createParser(data, offset, length),
restApiVersion);
}


}
11 changes: 0 additions & 11 deletions rest-api-spec/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,6 @@ tasks.named("yamlRestCompatTest").configure {
//type information is not stored, hence the the index will be found
'termvectors/50_mix_typeless_typeful/Term vectors with typeless API on an index that has types',
// 85 - 13 = 72 tests won't be fixed
'bulk/11_basic_with_types/Array of objects',
'bulk/11_basic_with_types/Empty _id',
'bulk/11_basic_with_types/Empty _id with op_type create',
'bulk/11_basic_with_types/empty action',
'bulk/21_list_of_strings_with_types/List of strings',
'bulk/31_big_string_with_types/One big string',
'bulk/41_source_with_types/Source filtering',
'bulk/51_refresh_with_types/refresh=empty string immediately makes changes are visible in search',
'bulk/51_refresh_with_types/refresh=true immediately makes changes are visible in search',
'bulk/51_refresh_with_types/refresh=wait_for waits until changes are visible in search',
'bulk/81_cas_with_types/Compare And Swap Sequence Numbers',
'cluster.voting_config_exclusions/10_basic/Throw exception when adding voting config exclusion and specifying both node_ids and node_names',
'cluster.voting_config_exclusions/10_basic/Throw exception when adding voting config exclusion without specifying nodes',
'count/11_basic_with_types/count body without query element',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.RestApiVersion;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -64,6 +65,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(STATUS, response.status().getStatus());
} else {
builder.field(_INDEX, failure.getIndex());
if (builder.getRestApiVersion() == RestApiVersion.V_7) {
builder.field(MapperService.TYPE_FIELD_NAME, MapperService.SINGLE_MAPPING_NAME);
}

builder.field(_ID, failure.getId());
builder.field(STATUS, failure.getStatus().getStatus());
builder.startObject(ERROR);
Expand Down Expand Up @@ -313,6 +318,9 @@ public boolean isAborted() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(INDEX_FIELD, index);
if (builder.getRestApiVersion() == RestApiVersion.V_7) {
builder.field(MapperService.TYPE_FIELD_NAME, MapperService.SINGLE_MAPPING_NAME);
}
if (id != null) {
builder.field(ID_FIELD, id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.RestApiVersion;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -398,7 +399,7 @@ public BulkProcessor add(BytesReference data, @Nullable String defaultIndex,
try {
ensureOpen();
bulkRequest.add(data, defaultIndex, null, null, defaultPipeline, null,
true, xContentType);
true, xContentType, RestApiVersion.current());
bulkRequestToExecute = newBulkRequestIfNeeded();
} finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.RestApiVersion;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -227,26 +228,26 @@ public BulkRequest add(byte[] data, int from, int length, @Nullable String defau
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex,
XContentType xContentType) throws IOException {
return add(data, defaultIndex, null, null, null, null, true, xContentType);
return add(data, defaultIndex, null, null, null, null, true, xContentType, RestApiVersion.current());
}

/**
* Adds a framed data in binary format
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, boolean allowExplicitIndex,
XContentType xContentType) throws IOException {
return add(data, defaultIndex, null, null, null, null, allowExplicitIndex, xContentType);
return add(data, defaultIndex, null, null, null, null, allowExplicitIndex, xContentType, RestApiVersion.current());

}

public BulkRequest add(BytesReference data, @Nullable String defaultIndex,
@Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline, @Nullable Boolean defaultRequireAlias, boolean allowExplicitIndex,
XContentType xContentType) throws IOException {
XContentType xContentType, RestApiVersion restApiVersion) throws IOException {
String routing = valueOrDefault(defaultRouting, globalRouting);
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias);
new BulkRequestParser(true).parse(data, defaultIndex, routing, defaultFetchSourceContext, pipeline, requireAlias,
new BulkRequestParser(true, restApiVersion).parse(data, defaultIndex, routing, defaultFetchSourceContext, pipeline, requireAlias,
allowExplicitIndex, xContentType, (indexRequest, type) -> internalAdd(indexRequest), this::internalAdd, this::add);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.RestApiVersion;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -23,6 +25,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
Expand All @@ -38,6 +41,7 @@
* Helper to parse bulk requests. This should be considered an internal class.
*/
public final class BulkRequestParser {
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(BulkRequestParser.class);

private static final ParseField INDEX = new ParseField("_index");
private static final ParseField TYPE = new ParseField("_type");
Expand All @@ -55,15 +59,19 @@ public final class BulkRequestParser {
private static final ParseField DYNAMIC_TEMPLATES = new ParseField("dynamic_templates");

// TODO: Remove this parameter once the BulkMonitoring endpoint has been removed
private final boolean errorOnType;
// for CompatibleApi V7 this means to deprecate on type, for V8+ it means to throw an error
private final boolean deprecateOrErrorOnType;
private RestApiVersion restApiVersion;

/**
* Create a new parser.
*
* @param errorOnType whether to allow _type information in the index line; used by BulkMonitoring
* @param deprecateOrErrorOnType whether to allow _type information in the index line; used by BulkMonitoring
* @param restApiVersion
*/
public BulkRequestParser(boolean errorOnType) {
this.errorOnType = errorOnType;
public BulkRequestParser(boolean deprecateOrErrorOnType, RestApiVersion restApiVersion) {
this.deprecateOrErrorOnType = deprecateOrErrorOnType;
this.restApiVersion = restApiVersion;
}

private static int findNextMarker(byte marker, int from, BytesReference data) {
Expand Down Expand Up @@ -114,6 +122,8 @@ public void parse(
// deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
// reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
final Map<String, String> stringDeduplicator = new HashMap<>();
boolean typesDeprecationLogged = false;

while (true) {
int nextMarker = findNextMarker(marker, from, data);
if (nextMarker == -1) {
Expand All @@ -122,7 +132,7 @@ public void parse(
line++;

// now parse the action
try (XContentParser parser = createParser(data, xContent, from, nextMarker)) {
try (XContentParser parser = createParser(data, xContent, from, nextMarker, restApiVersion)) {
// move pointers
from = nextMarker + 1;

Expand Down Expand Up @@ -174,9 +184,17 @@ public void parse(
}
index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
if (errorOnType) {
if (parser.getRestApiVersion().matches(RestApiVersion.equalTo(RestApiVersion.V_7))) {
// for bigger bulks, deprecation throttling might not be enough
if (deprecateOrErrorOnType && typesDeprecationLogged == false) {
deprecationLogger.compatibleApiWarning("bulk_with_types",
RestBulkAction.TYPES_DEPRECATION_MESSAGE);
typesDeprecationLogged = true;
}
} else if (parser.getRestApiVersion().matches(RestApiVersion.onOrAfter(RestApiVersion.V_8))
&& deprecateOrErrorOnType) {
throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter ["
+ currentFieldName + "]");
+ currentFieldName + "]");
}
type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down Expand Up @@ -279,7 +297,7 @@ public void parse(
.setRequireAlias(requireAlias)
.routing(routing);
try (XContentParser sliceParser = createParser(
sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContent)) {
sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContent, restApiVersion)) {
updateRequest.fromXContent(sliceParser);
}
if (fetchSourceContext != null) {
Expand All @@ -299,36 +317,40 @@ public void parse(
}
}

private static XContentParser createParser(BytesReference data, XContent xContent) throws IOException {
private static XContentParser createParser(BytesReference data, XContent xContent, RestApiVersion restApiVersion) throws IOException {
if (data.hasArray()) {
return parseBytesArray(xContent, data, 0, data.length());
return parseBytesArray(xContent, data, 0, data.length(), restApiVersion);
} else {
return xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, data.streamInput());
return xContent.createParserForCompatibility(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE,
data.streamInput(), restApiVersion);
}
}

// Create an efficient parser of the given bytes, trying to directly parse a byte array if possible and falling back to stream wrapping
// otherwise.
private static XContentParser createParser(BytesReference data, XContent xContent, int from, int nextMarker) throws IOException {
private static XContentParser createParser(BytesReference data, XContent xContent, int from, int nextMarker,
RestApiVersion restApiVersion) throws IOException {
if (data.hasArray()) {
return parseBytesArray(xContent, data, from, nextMarker);
return parseBytesArray(xContent, data, from, nextMarker, restApiVersion);
} else {
final int length = nextMarker - from;
final BytesReference slice = data.slice(from, length);
if (slice.hasArray()) {
return parseBytesArray(xContent, slice, 0, length);
return parseBytesArray(xContent, slice, 0, length, restApiVersion);
} else {
// EMPTY is safe here because we never call namedObject
return xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, slice.streamInput());
return xContent.createParserForCompatibility(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE,
slice.streamInput(), restApiVersion);
}
}
}

private static XContentParser parseBytesArray(XContent xContent, BytesReference array, int from, int nextMarker) throws IOException {
private static XContentParser parseBytesArray(XContent xContent, BytesReference array, int from, int nextMarker,
RestApiVersion restApiVersion) throws IOException {
assert array.hasArray();
final int offset = array.arrayOffset();
// EMPTY is safe here because we never call namedObject
return xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, array.array(),
offset + from, nextMarker - from);
return xContent.createParserForCompatibility(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, array.array(),
offset + from, nextMarker - from, restApiVersion);
}
}
Loading