Skip to content

Commit ac5cc66

Browse files
committed
Refactor two-client usage.
Signed-off-by: dblock <dblock@amazon.com>
1 parent 6096034 commit ac5cc66

1 file changed

Lines changed: 20 additions & 112 deletions

File tree

java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java

Lines changed: 20 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
3939
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
4040
import software.amazon.awssdk.regions.Region;
41+
import software.amazon.awssdk.utils.SdkAutoCloseable;
4142

4243
import javax.annotation.CheckForNull;
4344
import javax.annotation.Nonnull;
@@ -70,8 +71,7 @@ public class AwsSdk2Transport implements OpenSearchTransport {
7071
public static final Integer DEFAULT_REQUEST_COMPRESSION_SIZE = 8192;
7172

7273
private static final byte[] NO_BYTES = new byte[0];
73-
private final SdkHttpClient httpClient;
74-
private final SdkAsyncHttpClient asyncHttpClient;
74+
private final SdkAutoCloseable httpClient;
7575
private final String host;
7676
private final String signingServiceName;
7777
private final Region signingRegion;
@@ -92,78 +92,11 @@ public class AwsSdk2Transport implements OpenSearchTransport {
9292
* compression options, etc.
9393
*/
9494
public AwsSdk2Transport(
95-
@Nonnull SdkHttpClient httpClient,
95+
@Nonnull SdkAutoCloseable httpClient,
9696
@Nonnull String host,
9797
@Nonnull Region signingRegion,
9898
@CheckForNull AwsSdk2TransportOptions options) {
99-
this(httpClient, null, host, "es", signingRegion, options);
100-
}
101-
102-
/**
103-
* Create an {@link OpenSearchTransport} with a synchronous AWS HTTP client.
104-
* <p>
105-
* Note that asynchronous OpenSearch requests sent through this transport will be dispatched
106-
* *synchronously* on the calling thread.
107-
*
108-
* @param httpClient HTTP client to use for OpenSearch requests.
109-
* @param host The fully qualified domain name to connect to.
110-
* @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless).
111-
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
112-
* @param options Options that apply to all requests. Can be null. Create with
113-
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
114-
* compression options, etc.
115-
*/
116-
public AwsSdk2Transport(
117-
@Nonnull SdkHttpClient httpClient,
118-
@Nonnull String host,
119-
@Nonnull String signingServiceName,
120-
@Nonnull Region signingRegion,
121-
@CheckForNull AwsSdk2TransportOptions options) {
122-
this(httpClient, null, host, signingServiceName, signingRegion, options);
123-
}
124-
125-
/**
126-
* Create an {@link OpenSearchTransport} with an asynchronous AWS HTTP client
127-
* <p>
128-
* Note that synchronous OpenSearch requests sent through this transport will be dispatched
129-
* using the asynchronous client, but the calling thread will block until they are complete.
130-
*
131-
* @param asyncHttpClient HTTP client to use for OpenSearch requests.
132-
* @param host The target host.
133-
* @param signingRegion The AWS region for which requests will be signed. This should typically match region in `host`.
134-
* @param options Options that apply to all requests. Can be null. Create with
135-
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
136-
* compression options, etc.
137-
*/
138-
public AwsSdk2Transport(
139-
@Nonnull SdkAsyncHttpClient asyncHttpClient,
140-
@Nonnull String host,
141-
@Nonnull Region signingRegion,
142-
@CheckForNull AwsSdk2TransportOptions options) {
143-
this(null, asyncHttpClient, host, "es", signingRegion, options);
144-
}
145-
146-
/**
147-
* Create an {@link OpenSearchTransport} with an asynchronous AWS HTTP client.
148-
* <p>
149-
* Note that synchronous OpenSearch requests sent through this transport will be dispatched
150-
* using the asynchronous client, but the calling thread will block until they are complete.
151-
*
152-
* @param asyncHttpClient HTTP client to use for OpenSearch requests.
153-
* @param host The target host.
154-
* @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless).
155-
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
156-
* @param options Options that apply to all requests. Can be null. Create with
157-
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
158-
* compression options, etc.
159-
*/
160-
public AwsSdk2Transport(
161-
@Nonnull SdkAsyncHttpClient asyncHttpClient,
162-
@Nonnull String host,
163-
@Nonnull String signingServiceName,
164-
@Nonnull Region signingRegion,
165-
@CheckForNull AwsSdk2TransportOptions options) {
166-
this(null, asyncHttpClient, host, signingServiceName, signingRegion, options);
99+
this(httpClient, host, "es", signingRegion, options);
167100
}
168101

169102
/**
@@ -173,30 +106,6 @@ public AwsSdk2Transport(
173106
* will be used for asynchronous HTTP requests.
174107
*
175108
* @param httpClient HTTP client to use for OpenSearch requests.
176-
* @param asyncHttpClient HTTP client to use for synchronous OpenSearch requests.
177-
* @param host The fully qualified domain name to connect to.
178-
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
179-
* @param options Options that apply to all requests. Can be null. Create with
180-
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
181-
* compression options, etc.
182-
*/
183-
public AwsSdk2Transport(
184-
@CheckForNull SdkHttpClient httpClient,
185-
@CheckForNull SdkAsyncHttpClient asyncHttpClient,
186-
@Nonnull String host,
187-
@Nonnull Region signingRegion,
188-
@CheckForNull AwsSdk2TransportOptions options) {
189-
this(httpClient, asyncHttpClient, host, "es", signingRegion, options);
190-
}
191-
192-
/**
193-
* Create an {@link OpenSearchTransport} with both synchronous and asynchronous AWS HTTP clients.
194-
* <p>
195-
* The synchronous client will be used for synchronous OpenSearch requests, and the asynchronous client
196-
* will be used for asynchronous HTTP requests.
197-
*
198-
* @param httpClient HTTP client to use for OpenSearch requests.
199-
* @param asyncHttpClient HTTP client to use for synchronous OpenSearch requests.
200109
* @param host The fully qualified domain name to connect to.
201110
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
202111
* @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless).
@@ -205,19 +114,13 @@ public AwsSdk2Transport(
205114
* compression options, etc.
206115
*/
207116
public AwsSdk2Transport(
208-
@CheckForNull SdkHttpClient httpClient,
209-
@CheckForNull SdkAsyncHttpClient asyncHttpClient,
117+
@CheckForNull SdkAutoCloseable httpClient,
210118
@Nonnull String host,
211119
@Nonnull String signingServiceName,
212120
@Nonnull Region signingRegion,
213121
@CheckForNull AwsSdk2TransportOptions options) {
214-
if (httpClient == null && asyncHttpClient == null)
215-
{
216-
throw new IllegalArgumentException("At least one SdkHttpClient or SdkAsyncHttpClient must be provided");
217-
}
218122
Objects.requireNonNull(host, "Target OpenSearch service host must not be null");
219123
this.httpClient = httpClient;
220-
this.asyncHttpClient = asyncHttpClient;
221124
this.host = host;
222125
this.signingServiceName = signingServiceName;
223126
this.signingRegion = signingRegion;
@@ -237,11 +140,11 @@ public <RequestT, ResponseT, ErrorT> ResponseT performRequest(
237140
OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options);
238141
SdkHttpFullRequest clientReq = prepareRequest(request, endpoint, options, requestBody);
239142

240-
if (httpClient != null) {
241-
return executeSync(clientReq, endpoint, options);
242-
} else {
143+
if (httpClient instanceof SdkHttpClient) {
144+
return executeSync((SdkHttpClient) httpClient, clientReq, endpoint, options);
145+
} else if (httpClient instanceof SdkAsyncHttpClient) {
243146
try {
244-
return executeAsync(clientReq, requestBody, endpoint, options).get();
147+
return executeAsync((SdkAsyncHttpClient) httpClient, clientReq, requestBody, endpoint, options).get();
245148
} catch (ExecutionException e) {
246149
Throwable cause = e.getCause();
247150
if (cause != null) {
@@ -257,6 +160,8 @@ public <RequestT, ResponseT, ErrorT> ResponseT performRequest(
257160
} catch (InterruptedException e) {
258161
throw new IOException("HttpRequest was interrupted", e);
259162
}
163+
} else {
164+
throw new IOException("invalid httpClient: " + httpClient);
260165
}
261166
}
262167

@@ -269,11 +174,13 @@ public <RequestT, ResponseT, ErrorT> CompletableFuture<ResponseT> performRequest
269174
try {
270175
OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options);
271176
SdkHttpFullRequest clientReq = prepareRequest(request, endpoint, options, requestBody);
272-
if (asyncHttpClient != null) {
273-
return executeAsync(clientReq, requestBody, endpoint, options);
274-
} else {
275-
ResponseT result = executeSync(clientReq, endpoint, options);
177+
if (httpClient instanceof SdkAsyncHttpClient) {
178+
return executeAsync((SdkAsyncHttpClient) httpClient, clientReq, requestBody, endpoint, options);
179+
} else if (httpClient instanceof SdkHttpClient) {
180+
ResponseT result = executeSync((SdkHttpClient) httpClient, clientReq, endpoint, options);
276181
return CompletableFuture.completedFuture(result);
182+
} else {
183+
throw new IOException("invalid httpClient: " + httpClient);
277184
}
278185
} catch (Throwable e) {
279186
CompletableFuture<ResponseT> cf = new CompletableFuture<>();
@@ -418,6 +325,7 @@ private void applyOptionsHeaders(SdkHttpFullRequest.Builder builder, TransportOp
418325
}
419326

420327
private <ResponseT> ResponseT executeSync(
328+
SdkHttpClient syncHttpClient,
421329
SdkHttpFullRequest httpRequest,
422330
Endpoint<?, ResponseT, ?> endpoint,
423331
TransportOptions options
@@ -427,7 +335,7 @@ private <ResponseT> ResponseT executeSync(
427335
if (httpRequest.contentStreamProvider().isPresent()) {
428336
executeRequest.contentStreamProvider(httpRequest.contentStreamProvider().get());
429337
}
430-
HttpExecuteResponse executeResponse = httpClient.prepareRequest(executeRequest.build()).call();
338+
HttpExecuteResponse executeResponse = syncHttpClient.prepareRequest(executeRequest.build()).call();
431339
AbortableInputStream bodyStream = null;
432340
try {
433341
bodyStream = executeResponse.responseBody().orElse(null);
@@ -441,6 +349,7 @@ private <ResponseT> ResponseT executeSync(
441349
}
442350

443351
private <ResponseT> CompletableFuture<ResponseT> executeAsync(
352+
SdkAsyncHttpClient asyncHttpClient,
444353
SdkHttpFullRequest httpRequest,
445354
@CheckForNull OpenSearchRequestBodyBuffer requestBody,
446355
Endpoint<?, ResponseT, ?> endpoint,
@@ -543,7 +452,6 @@ private <ResponseT, ErrorT> ResponseT parseResponse(
543452
ResponseT response = (ResponseT) new BooleanResponse(bep.getResult(statusCode));
544453
return response;
545454
} else if (endpoint instanceof JsonEndpoint) {
546-
@SuppressWarnings("unchecked")
547455
JsonEndpoint<?, ResponseT, ?> jsonEndpoint = (JsonEndpoint<?, ResponseT, ?>) endpoint;
548456
// Successful response
549457
ResponseT response = null;

0 commit comments

Comments
 (0)