-
Notifications
You must be signed in to change notification settings - Fork 221
Refactor two-client usage. #333
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,7 @@ | |
| import software.amazon.awssdk.http.async.AsyncExecuteRequest; | ||
| import software.amazon.awssdk.http.async.SdkAsyncHttpClient; | ||
| import software.amazon.awssdk.regions.Region; | ||
| import software.amazon.awssdk.utils.SdkAutoCloseable; | ||
|
|
||
| import javax.annotation.CheckForNull; | ||
| import javax.annotation.Nonnull; | ||
|
|
@@ -70,154 +71,103 @@ public class AwsSdk2Transport implements OpenSearchTransport { | |
| public static final Integer DEFAULT_REQUEST_COMPRESSION_SIZE = 8192; | ||
|
|
||
| private static final byte[] NO_BYTES = new byte[0]; | ||
| private final SdkHttpClient httpClient; | ||
| private final SdkAsyncHttpClient asyncHttpClient; | ||
| private final SdkAutoCloseable httpClient; | ||
| private final String host; | ||
| private final String signingServiceName; | ||
| private final Region signingRegion; | ||
| private final JsonpMapper defaultMapper; | ||
| private final AwsSdk2TransportOptions transportOptions; | ||
|
|
||
| /** | ||
| * Create an {@link OpenSearchTransport} with a synchronous AWS HTTP client. | ||
| * Create an {@link OpenSearchTransport} with an asynchronous AWS HTTP client. | ||
| * <p> | ||
| * Note that asynchronous OpenSearch requests sent through this transport will be dispatched | ||
| * *synchronously* on the calling thread. | ||
| * | ||
| * @param httpClient HTTP client to use for OpenSearch requests. | ||
| * @param asyncHttpClient Asynchronous HTTP client to use for OpenSearch requests. | ||
| * @param host The fully qualified domain name to connect to. | ||
| * @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`. | ||
| * @param options Options that apply to all requests. Can be null. Create with | ||
| * {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials, | ||
| * compression options, etc. | ||
| */ | ||
| public AwsSdk2Transport( | ||
| @Nonnull SdkHttpClient httpClient, | ||
| @CheckForNull SdkAsyncHttpClient asyncHttpClient, | ||
| @Nonnull String host, | ||
| @Nonnull Region signingRegion, | ||
| @CheckForNull AwsSdk2TransportOptions options) { | ||
| this(httpClient, null, host, "es", signingRegion, options); | ||
| this(asyncHttpClient, host, "es", signingRegion, options); | ||
| } | ||
|
|
||
| /** | ||
| * Create an {@link OpenSearchTransport} with a synchronous AWS HTTP client. | ||
| * <p> | ||
| * Note that asynchronous OpenSearch requests sent through this transport will be dispatched | ||
| * *synchronously* on the calling thread. | ||
| * | ||
| * @param httpClient HTTP client to use for OpenSearch requests. | ||
| * @param syncHttpClient Synchronous HTTP client to use for OpenSearch requests. | ||
| * @param host The fully qualified domain name to connect to. | ||
| * @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless). | ||
| * @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`. | ||
| * @param options Options that apply to all requests. Can be null. Create with | ||
| * {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials, | ||
| * compression options, etc. | ||
| */ | ||
| public AwsSdk2Transport( | ||
| @Nonnull SdkHttpClient httpClient, | ||
| @CheckForNull SdkHttpClient syncHttpClient, | ||
| @Nonnull String host, | ||
| @Nonnull String signingServiceName, | ||
| @Nonnull Region signingRegion, | ||
| @CheckForNull AwsSdk2TransportOptions options) { | ||
| this(httpClient, null, host, signingServiceName, signingRegion, options); | ||
| } | ||
|
|
||
| /** | ||
| * Create an {@link OpenSearchTransport} with an asynchronous AWS HTTP client | ||
| * <p> | ||
| * Note that synchronous OpenSearch requests sent through this transport will be dispatched | ||
| * using the asynchronous client, but the calling thread will block until they are complete. | ||
| * | ||
| * @param asyncHttpClient HTTP client to use for OpenSearch requests. | ||
| * @param host The target host. | ||
| * @param signingRegion The AWS region for which requests will be signed. This should typically match region in `host`. | ||
| * @param options Options that apply to all requests. Can be null. Create with | ||
| * {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials, | ||
| * compression options, etc. | ||
| */ | ||
| public AwsSdk2Transport( | ||
| @Nonnull SdkAsyncHttpClient asyncHttpClient, | ||
| @Nonnull String host, | ||
| @Nonnull Region signingRegion, | ||
| @CheckForNull AwsSdk2TransportOptions options) { | ||
| this(null, asyncHttpClient, host, "es", signingRegion, options); | ||
| this(syncHttpClient, host, "es", signingRegion, options); | ||
| } | ||
|
|
||
| /** | ||
| * Create an {@link OpenSearchTransport} with an asynchronous AWS HTTP client. | ||
| * <p> | ||
| * Note that synchronous OpenSearch requests sent through this transport will be dispatched | ||
| * using the asynchronous client, but the calling thread will block until they are complete. | ||
| * Note that asynchronous OpenSearch requests sent through this transport will be dispatched | ||
| * *synchronously* on the calling thread. | ||
| * | ||
| * @param asyncHttpClient HTTP client to use for OpenSearch requests. | ||
| * @param host The target host. | ||
| * @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless). | ||
| * @param asyncHttpClient Asynchronous HTTP client to use for OpenSearch requests. | ||
| * @param host The fully qualified domain name to connect to. | ||
| * @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`. | ||
| * @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless). | ||
| * @param options Options that apply to all requests. Can be null. Create with | ||
| * {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials, | ||
| * compression options, etc. | ||
| */ | ||
| public AwsSdk2Transport( | ||
| @Nonnull SdkAsyncHttpClient asyncHttpClient, | ||
| @CheckForNull SdkAsyncHttpClient asyncHttpClient, | ||
| @Nonnull String host, | ||
| @Nonnull String signingServiceName, | ||
| @Nonnull Region signingRegion, | ||
| @CheckForNull AwsSdk2TransportOptions options) { | ||
| this(null, asyncHttpClient, host, signingServiceName, signingRegion, options); | ||
| this((SdkAutoCloseable) asyncHttpClient, host, signingServiceName, signingRegion, options); | ||
| } | ||
|
|
||
| /** | ||
| * Create an {@link OpenSearchTransport} with both synchronous and asynchronous AWS HTTP clients. | ||
| * <p> | ||
| * The synchronous client will be used for synchronous OpenSearch requests, and the asynchronous client | ||
| * will be used for asynchronous HTTP requests. | ||
| * Create an {@link OpenSearchTransport} with a synchronous AWS HTTP client. | ||
| * | ||
| * @param httpClient HTTP client to use for OpenSearch requests. | ||
| * @param asyncHttpClient HTTP client to use for synchronous OpenSearch requests. | ||
| * @param syncHttpClient Synchronous HTTP client to use for OpenSearch requests. | ||
| * @param host The fully qualified domain name to connect to. | ||
| * @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`. | ||
| * @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless). | ||
| * @param options Options that apply to all requests. Can be null. Create with | ||
| * {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials, | ||
| * compression options, etc. | ||
| */ | ||
| public AwsSdk2Transport( | ||
| @CheckForNull SdkHttpClient httpClient, | ||
| @CheckForNull SdkAsyncHttpClient asyncHttpClient, | ||
| @CheckForNull SdkHttpClient syncHttpClient, | ||
| @Nonnull String host, | ||
| @Nonnull String signingServiceName, | ||
| @Nonnull Region signingRegion, | ||
| @CheckForNull AwsSdk2TransportOptions options) { | ||
| this(httpClient, asyncHttpClient, host, "es", signingRegion, options); | ||
| this((SdkAutoCloseable) syncHttpClient, host, signingServiceName, signingRegion, options); | ||
| } | ||
|
|
||
| /** | ||
| * Create an {@link OpenSearchTransport} with both synchronous and asynchronous AWS HTTP clients. | ||
| * <p> | ||
| * The synchronous client will be used for synchronous OpenSearch requests, and the asynchronous client | ||
| * will be used for asynchronous HTTP requests. | ||
| * | ||
| * @param httpClient HTTP client to use for OpenSearch requests. | ||
| * @param asyncHttpClient HTTP client to use for synchronous OpenSearch requests. | ||
| * @param host The fully qualified domain name to connect to. | ||
| * @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`. | ||
| * @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless). | ||
| * @param options Options that apply to all requests. Can be null. Create with | ||
| * {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials, | ||
| * compression options, etc. | ||
| */ | ||
| public AwsSdk2Transport( | ||
| @CheckForNull SdkHttpClient httpClient, | ||
| @CheckForNull SdkAsyncHttpClient asyncHttpClient, | ||
| private AwsSdk2Transport( | ||
| @CheckForNull SdkAutoCloseable httpClient, | ||
| @Nonnull String host, | ||
| @Nonnull String signingServiceName, | ||
| @Nonnull Region signingRegion, | ||
| @CheckForNull AwsSdk2TransportOptions options) { | ||
| if (httpClient == null && asyncHttpClient == null) | ||
| { | ||
| throw new IllegalArgumentException("At least one SdkHttpClient or SdkAsyncHttpClient must be provided"); | ||
| } | ||
| Objects.requireNonNull(host, "Target OpenSearch service host must not be null"); | ||
| this.httpClient = httpClient; | ||
| this.asyncHttpClient = asyncHttpClient; | ||
| this.host = host; | ||
| this.signingServiceName = signingServiceName; | ||
| this.signingRegion = signingRegion; | ||
|
|
@@ -237,11 +187,11 @@ public <RequestT, ResponseT, ErrorT> ResponseT performRequest( | |
| OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options); | ||
| SdkHttpFullRequest clientReq = prepareRequest(request, endpoint, options, requestBody); | ||
|
|
||
| if (httpClient != null) { | ||
| return executeSync(clientReq, endpoint, options); | ||
| } else { | ||
| if (httpClient instanceof SdkHttpClient) { | ||
| return executeSync((SdkHttpClient) httpClient, clientReq, endpoint, options); | ||
| } else if (httpClient instanceof SdkAsyncHttpClient) { | ||
| try { | ||
| return executeAsync(clientReq, requestBody, endpoint, options).get(); | ||
| return executeAsync((SdkAsyncHttpClient) httpClient, clientReq, requestBody, endpoint, options).get(); | ||
| } catch (ExecutionException e) { | ||
| Throwable cause = e.getCause(); | ||
| if (cause != null) { | ||
|
|
@@ -257,6 +207,8 @@ public <RequestT, ResponseT, ErrorT> ResponseT performRequest( | |
| } catch (InterruptedException e) { | ||
| throw new IOException("HttpRequest was interrupted", e); | ||
| } | ||
| } else { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Java is not smart enough to evaluate all paths and conclude that this code can never be reached. If you remove it, there will be an error that the method must return a value, and I don't want a catch all |
||
| throw new IOException("invalid httpClient: " + httpClient); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -269,11 +221,13 @@ public <RequestT, ResponseT, ErrorT> CompletableFuture<ResponseT> performRequest | |
| try { | ||
| OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options); | ||
| SdkHttpFullRequest clientReq = prepareRequest(request, endpoint, options, requestBody); | ||
| if (asyncHttpClient != null) { | ||
| return executeAsync(clientReq, requestBody, endpoint, options); | ||
| } else { | ||
| ResponseT result = executeSync(clientReq, endpoint, options); | ||
| if (httpClient instanceof SdkAsyncHttpClient) { | ||
| return executeAsync((SdkAsyncHttpClient) httpClient, clientReq, requestBody, endpoint, options); | ||
| } else if (httpClient instanceof SdkHttpClient) { | ||
| ResponseT result = executeSync((SdkHttpClient) httpClient, clientReq, endpoint, options); | ||
| return CompletableFuture.completedFuture(result); | ||
| } else { | ||
| throw new IOException("invalid httpClient: " + httpClient); | ||
| } | ||
| } catch (Throwable e) { | ||
| CompletableFuture<ResponseT> cf = new CompletableFuture<>(); | ||
|
|
@@ -418,6 +372,7 @@ private void applyOptionsHeaders(SdkHttpFullRequest.Builder builder, TransportOp | |
| } | ||
|
|
||
| private <ResponseT> ResponseT executeSync( | ||
| SdkHttpClient syncHttpClient, | ||
| SdkHttpFullRequest httpRequest, | ||
| Endpoint<?, ResponseT, ?> endpoint, | ||
| TransportOptions options | ||
|
|
@@ -427,7 +382,7 @@ private <ResponseT> ResponseT executeSync( | |
| if (httpRequest.contentStreamProvider().isPresent()) { | ||
| executeRequest.contentStreamProvider(httpRequest.contentStreamProvider().get()); | ||
| } | ||
| HttpExecuteResponse executeResponse = httpClient.prepareRequest(executeRequest.build()).call(); | ||
| HttpExecuteResponse executeResponse = syncHttpClient.prepareRequest(executeRequest.build()).call(); | ||
| AbortableInputStream bodyStream = null; | ||
| try { | ||
| bodyStream = executeResponse.responseBody().orElse(null); | ||
|
|
@@ -441,6 +396,7 @@ private <ResponseT> ResponseT executeSync( | |
| } | ||
|
|
||
| private <ResponseT> CompletableFuture<ResponseT> executeAsync( | ||
| SdkAsyncHttpClient asyncHttpClient, | ||
| SdkHttpFullRequest httpRequest, | ||
| @CheckForNull OpenSearchRequestBodyBuffer requestBody, | ||
| Endpoint<?, ResponseT, ?> endpoint, | ||
|
|
@@ -543,7 +499,6 @@ private <ResponseT, ErrorT> ResponseT parseResponse( | |
| ResponseT response = (ResponseT) new BooleanResponse(bep.getResult(statusCode)); | ||
| return response; | ||
| } else if (endpoint instanceof JsonEndpoint) { | ||
| @SuppressWarnings("unchecked") | ||
| JsonEndpoint<?, ResponseT, ?> jsonEndpoint = (JsonEndpoint<?, ResponseT, ?>) endpoint; | ||
| // Successful response | ||
| ResponseT response = null; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.