Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix snapshot restore when an index sort is present ([#20284](https://github.com/opensearch-project/OpenSearch/pull/20284))
- Fix SearchPhaseExecutionException to properly initCause ([#20320](https://github.com/opensearch-project/OpenSearch/pull/20320))
- Fix `cluster.remote.<cluster_alias>.server_name` setting no populating SNI ([#20321](https://github.com/opensearch-project/OpenSearch/pull/20321))
- Fix X-Opaque-Id header propagation (along with other response headers) for streaming Reactor Netty 4 transport ([#20371](https://github.com/opensearch-project/OpenSearch/pull/20371))

### Dependencies
- Bump `com.google.auth:google-auth-library-oauth2-http` from 1.38.0 to 1.41.0 ([#20183](https://github.com/opensearch-project/OpenSearch/pull/20183))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,30 @@ public Map<String, String> getParameters() {
}
}

/**
* Set the portion of an HTTP request to OpenSearch that can be
* manipulated without changing OpenSearch's behavior.
*
* @param options the options to be set.
* @throws NullPointerException if {@code options} is null.
*/
public void setOptions(RequestOptions options) {
Objects.requireNonNull(options, "options cannot be null");
this.options = options;
}

/**
* Set the portion of an HTTP request to OpenSearch that can be
* manipulated without changing OpenSearch's behavior.
*
* @param options the options to be set.
* @throws NullPointerException if {@code options} is null.
*/
public void setOptions(RequestOptions.Builder options) {
Objects.requireNonNull(options, "options cannot be null");
this.options = options.build();
}

/**
* Add a query string parameter.
* @param name the name of the url parameter. Must not be null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.client;

import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.Message;
Expand Down Expand Up @@ -93,4 +94,29 @@ public List<String> getWarnings() {
.block()
);
}

/**
* Returns a list of all headers returned in the response.
*/
public Header[] getHeaders() {
return publisher.map(Message::getHead)
.onErrorResume(ResponseException.class, e -> Mono.just(e.getResponse().getHttpResponse()))
.map(HttpResponse::getHeaders)
.block();
}

/**
* Returns the value of the first header with a specified name of this message.
* If there is more than one matching header in the message the first element is returned.
* If there is no matching header in the message <code>null</code> is returned.
*
* @param name header name
*/
public String getHeader(String name) {
return publisher.map(Message::getHead)
.onErrorResume(ResponseException.class, e -> Mono.just(e.getResponse().getHttpResponse()))
.mapNotNull(response -> response.getFirstHeader(name))
.mapNotNull(header -> header.getValue())
.block();
}
Comment thread
cwperks marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.rest;

import org.opensearch.client.Request;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.client.StreamingRequest;
Expand Down Expand Up @@ -48,8 +49,10 @@ public void tearDown() throws Exception {
public void testStreamingRequestNoBatching() throws IOException {
final VirtualTimeScheduler scheduler = VirtualTimeScheduler.create(true);

final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");
final Stream<String> stream = IntStream.range(1, 6).mapToObj(id -> String.format(Locale.ENGLISH, """
{ "index": { "_index": "test-streaming", "_id": "%d" } }
{ "name": "josh" }
""", id));

final Duration delay = Duration.ofMillis(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
Expand Down Expand Up @@ -86,9 +89,87 @@ public void testStreamingRequestNoBatching() throws IOException {
assertThat(count, equalTo(5));
}

public void testStreamingRequestOpaqueId() throws IOException {
final VirtualTimeScheduler scheduler = VirtualTimeScheduler.create(true);

final Stream<String> stream = IntStream.range(1, 6).mapToObj(id -> String.format(Locale.ENGLISH, """
{ "index": { "_index": "test-streaming", "_id": "%d" } }
{ "name": "josh" }
""", id));

final RequestOptions options = RequestOptions.DEFAULT.toBuilder().addHeader("X-Opaque-Id", "1").build();

final Duration delay = Duration.ofMillis(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Flux.fromStream(stream).delayElements(delay, scheduler).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
);
streamingRequest.addParameter("refresh", "true");
streamingRequest.setOptions(options);

final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);
scheduler.advanceTimeBy(delay); /* emit first element */

StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"1\""))
.then(() -> scheduler.advanceTimeBy(delay))
.expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"2\""))
.then(() -> scheduler.advanceTimeBy(delay))
.expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"3\""))
.then(() -> scheduler.advanceTimeBy(delay))
.expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"4\""))
.then(() -> scheduler.advanceTimeBy(delay))
.expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"5\""))
.then(() -> scheduler.advanceTimeBy(delay))
.expectComplete()
.verify();

assertThat(streamingResponse.getHeader("X-Opaque-Id"), equalTo("1"));
assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
assertThat(streamingResponse.getWarnings(), empty());

final Request request = new Request("GET", "/test-streaming/_count");
final Response response = client().performRequest(request);
final ObjectPath objectPath = ObjectPath.createFromResponse(response);
final Integer count = objectPath.evaluate("count");
assertThat(count, equalTo(5));
}

public void testStreamingRequestOpaqueIdTwice() throws IOException {
final Stream<String> stream = IntStream.range(1, 6).mapToObj(id -> String.format(Locale.ENGLISH, """
{ "index": { "_index": "test-streaming", "_id": "%d" } }
{ "name": "josh" }
""", id));

final RequestOptions options = RequestOptions.DEFAULT.toBuilder()
.addHeader("X-Opaque-Id", "1")
.addHeader("X-Opaque-Id", "2")
Comment thread
cwperks marked this conversation as resolved.
.build();

final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Flux.fromStream(stream).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
);
streamingRequest.addParameter("refresh", "true");
streamingRequest.setOptions(options);

final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);

StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectError(ResponseException.class)
.verify();

assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(400));
assertThat(streamingResponse.getWarnings(), empty());
}

public void testStreamingRequestOneBatchBySize() throws IOException, InterruptedException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");
final Stream<String> stream = IntStream.range(1, 6).mapToObj(id -> String.format(Locale.ENGLISH, """
{ "index": { "_index": "test-streaming", "_id": "%d" } }
{ "name": "josh" }
""", id));

final Duration delay = Duration.ofMillis(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
Expand Down Expand Up @@ -128,8 +209,10 @@ public void testStreamingRequestOneBatchBySize() throws IOException, Interrupted
}

public void testStreamingRequestManyBatchesBySize() throws IOException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");
final Stream<String> stream = IntStream.range(1, 6).mapToObj(id -> String.format(Locale.ENGLISH, """
{ "index": { "_index": "test-streaming", "_id": "%d" } }
{ "name": "josh" }
""", id));

final Duration delay = Duration.ofMillis(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
Expand Down Expand Up @@ -171,8 +254,10 @@ public void testStreamingRequestManyBatchesBySize() throws IOException {
}

public void testStreamingRequestManyBatchesByInterval() throws IOException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");
final Stream<String> stream = IntStream.range(1, 6).mapToObj(id -> String.format(Locale.ENGLISH, """
{ "index": { "_index": "test-streaming", "_id": "%d" } }
{ "name": "josh" }
""", id));

final Duration delay = Duration.ofMillis(500);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
Expand Down Expand Up @@ -214,8 +299,10 @@ public void testStreamingRequestManyBatchesByInterval() throws IOException {
}

public void testStreamingRequestManyBatchesByIntervalAndSize() throws IOException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");
final Stream<String> stream = IntStream.range(1, 6).mapToObj(id -> String.format(Locale.ENGLISH, """
{ "index": { "_index": "test-streaming", "_id": "%d" } }
{ "name": "josh" }
""", id));

final Duration delay = Duration.ofSeconds(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
Expand Down Expand Up @@ -248,9 +335,10 @@ public void testStreamingRequestManyBatchesByIntervalAndSize() throws IOExceptio
}

public void testStreamingBadRequest() throws IOException {
final Stream<String> stream = Stream.of(
"{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n" + "{ \"name\": \"josh\" }\n"
);
final Stream<String> stream = Stream.of("""
{ "index": { "_index": "test-streaming", "_id": "1" } }
{ "name": "josh" }
""");

final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
Expand All @@ -272,10 +360,12 @@ public void testStreamingBadRequest() throws IOException {
public void testStreamingBadStream() throws IOException {
final VirtualTimeScheduler scheduler = VirtualTimeScheduler.create(true);

final Stream<String> stream = Stream.of(
"{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n" + "{ \"name\": \"josh\" }\n",
"{ \"name\": \"josh\" }\n"
);
final Stream<String> stream = Stream.of("""
{ "index": { "_index": "test-streaming", "_id": "1" } }
{ "name": "josh" }
""", """
{ "name": "josh" }
""");

final Duration delay = Duration.ofMillis(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
Expand All @@ -300,13 +390,10 @@ public void testStreamingBadStream() throws IOException {
}

public void testStreamingLargeDocument() throws IOException {
final Stream<String> stream = Stream.of(
String.format(
Locale.getDefault(),
"{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n{ \"name\": \"%s\" }\n",
randomAlphaOfLength(7000)
)
);
final Stream<String> stream = Stream.of(String.format(Locale.getDefault(), """
{ "index": { "_index": "test-streaming", "_id": "1" } }
{ "name": "%s" }
""", randomAlphaOfLength(7000)));

final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
Expand All @@ -329,7 +416,10 @@ public void testStreamingLargeDocumentThatExceedsChunkSize() throws IOException
final Stream<String> stream = Stream.of(
String.format(
Locale.getDefault(),
"{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n{ \"name\": \"%s\" }\n",
"""
{ "index": { "_index": "test-streaming", "_id": "1" } }
{ "name": "%s" }
""",
randomAlphaOfLength(9000) /* the default chunk size limit is set 8k */
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ public void sendResponse(HttpResponse response, ActionListener<Void> listener) {

@Override
public void prepareResponse(int status, Map<String, List<String>> headers) {
this.response.status(status);
headers.forEach((k, vs) -> vs.forEach(v -> this.response.addHeader(k, v)));
if (this.response.hasSentHeaders() == false) {
this.response.status(status);
headers.forEach((k, vs) -> vs.forEach(v -> this.response.addHeader(k, v)));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,29 @@

import org.opensearch.core.action.ActionListener;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import io.netty.handler.codec.http.HttpContent;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

class ReactorNetty4StreamingResponseProducer implements StreamingHttpContentSender, Publisher<HttpContent> {
// Buffer up to 64 messages, otherwise fail with buffer overflow (IllegalStateException)
private static final int BUFFERED_QUEUE_SIZE = 64;

private final Publisher<HttpContent> sender;
private volatile FluxSink<HttpContent> emitter;
// We choose unbounded queue as the safe buffer here (since it does not support capacity bounds),
// realistically we should only see a single final (last content) response being deferred. The BUFFERED_QUEUE_SIZE
// check takes care of potential overflows.
private final Queue<DelayedHttpContent> queue = new ConcurrentLinkedQueue<>();

// Holds the {@code HttpContent} for deferred delivery
private record DelayedHttpContent(HttpContent content, ActionListener<Void> listener, boolean isLast) {
};

ReactorNetty4StreamingResponseProducer() {
this.sender = Flux.create(emitter -> register(emitter));
Expand All @@ -30,6 +44,20 @@ private void register(FluxSink<HttpContent> emitter) {

@Override
public void send(HttpContent content, ActionListener<Void> listener, boolean isLast) {
// In case when the exception triggers the response **before** the emitter is being
// created, we defer the send till the subscribe call happens.
if (isReady() == false) {
queue.offer(new DelayedHttpContent(content, listener, isLast));
if (queue.size() > BUFFERED_QUEUE_SIZE) {
final IllegalStateException ex = new IllegalStateException(
"The buffered queue size limit is exceeded: " + BUFFERED_QUEUE_SIZE
);
listener.onFailure(ex);
throw ex;
}
return;
}

try {
emitter.next(content);
listener.onResponse(null);
Expand All @@ -45,6 +73,11 @@ public void send(HttpContent content, ActionListener<Void> listener, boolean isL
@Override
public void subscribe(Subscriber<? super HttpContent> s) {
sender.subscribe(s);

DelayedHttpContent content = null;
while ((content = queue.poll()) != null) {
send(content.content(), content.listener(), content.isLast());
}
Comment thread
reta marked this conversation as resolved.
}

@Override
Expand Down
Loading
Loading