Skip to content

Commit 6b27244

Browse files
committed
[BUG] Cannot communicate with http2 when reactor-netty is enabled
Signed-off-by: Andriy Redko <drreta@gmail.com>
1 parent bfbd3c4 commit 6b27244

6 files changed

Lines changed: 126 additions & 77 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3232
### Fixed
3333
- Add task cancellation checks in aggregators ([#18426](https://github.com/opensearch-project/OpenSearch/pull/18426))
3434
- Fix concurrent timings in profiler ([#18540](https://github.com/opensearch-project/OpenSearch/pull/18540))
35+
- Cannot communicate with HTTP/2 when reactor-netty is enabled ([#18599](https://github.com/opensearch-project/OpenSearch/pull/18599))
3536

3637
### Security
3738

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java

Lines changed: 40 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.http.reactor.netty4;
1010

11+
import org.opensearch.OpenSearchException;
1112
import org.opensearch.common.Nullable;
1213
import org.opensearch.common.network.NetworkService;
1314
import org.opensearch.common.settings.ClusterSettings;
@@ -27,35 +28,35 @@
2728
import org.opensearch.http.HttpServerChannel;
2829
import org.opensearch.http.reactor.netty4.ssl.SslUtils;
2930
import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
31+
import org.opensearch.plugins.SecureHttpTransportSettingsProvider.SecureHttpTransportParameters;
3032
import org.opensearch.rest.RestHandler;
3133
import org.opensearch.rest.RestRequest.Method;
3234
import org.opensearch.telemetry.tracing.Tracer;
3335
import org.opensearch.threadpool.ThreadPool;
3436
import org.opensearch.transport.reactor.SharedGroupFactory;
3537
import org.opensearch.transport.reactor.netty4.Netty4Utils;
3638

37-
import javax.net.ssl.SSLEngine;
38-
import javax.net.ssl.SSLException;
39-
import javax.net.ssl.SSLSessionContext;
39+
import javax.net.ssl.KeyManagerFactory;
4040

4141
import java.net.InetSocketAddress;
4242
import java.net.SocketOption;
4343
import java.time.Duration;
4444
import java.util.Arrays;
45-
import java.util.List;
4645
import java.util.Optional;
4746

4847
import io.netty.buffer.ByteBuf;
49-
import io.netty.buffer.ByteBufAllocator;
5048
import io.netty.channel.ChannelOption;
5149
import io.netty.channel.socket.nio.NioChannelOption;
5250
import io.netty.handler.codec.http.DefaultLastHttpContent;
5351
import io.netty.handler.codec.http.FullHttpResponse;
5452
import io.netty.handler.codec.http.HttpContent;
55-
import io.netty.handler.ssl.ApplicationProtocolNegotiator;
53+
import io.netty.handler.codec.http.HttpResponseStatus;
54+
import io.netty.handler.ssl.ApplicationProtocolConfig;
55+
import io.netty.handler.ssl.ApplicationProtocolNames;
5656
import io.netty.handler.ssl.SslContext;
57+
import io.netty.handler.ssl.SslContextBuilder;
58+
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
5759
import io.netty.handler.timeout.ReadTimeoutException;
58-
import io.netty.util.ReferenceCountUtil;
5960
import org.reactivestreams.Publisher;
6061
import reactor.core.publisher.Mono;
6162
import reactor.core.scheduler.Scheduler;
@@ -306,59 +307,33 @@ private HttpServer configure(final HttpServer server) throws Exception {
306307

307308
// Configure SSL context if available
308309
if (secureHttpTransportSettingsProvider != null) {
309-
final SSLEngine engine = secureHttpTransportSettingsProvider.buildSecureHttpServerEngine(settings, this)
310-
.orElseGet(SslUtils::createDefaultServerSSLEngine);
311-
312-
try {
313-
final List<String> cipherSuites = Arrays.asList(engine.getEnabledCipherSuites());
314-
final List<String> applicationProtocols = Arrays.asList(engine.getSSLParameters().getApplicationProtocols());
315-
316-
configured = configured.secure(spec -> spec.sslContext(new SslContext() {
317-
@Override
318-
public SSLSessionContext sessionContext() {
319-
throw new UnsupportedOperationException(); /* server only, should never be called */
320-
}
321-
322-
@Override
323-
public SSLEngine newEngine(ByteBufAllocator alloc, String peerHost, int peerPort) {
324-
throw new UnsupportedOperationException(); /* server only, should never be called */
325-
}
326-
327-
@Override
328-
public SSLEngine newEngine(ByteBufAllocator alloc) {
329-
try {
330-
return secureHttpTransportSettingsProvider.buildSecureHttpServerEngine(
331-
settings,
332-
ReactorNetty4HttpServerTransport.this
333-
).orElseGet(SslUtils::createDefaultServerSSLEngine);
334-
} catch (final SSLException ex) {
335-
throw new UnsupportedOperationException("Unable to create SSLEngine", ex);
336-
}
337-
}
338-
339-
@Override
340-
public boolean isClient() {
341-
return false; /* server only */
342-
}
343-
344-
@Override
345-
public List<String> cipherSuites() {
346-
return cipherSuites;
347-
}
310+
final Optional<SecureHttpTransportParameters> parameters = secureHttpTransportSettingsProvider.parameters(settings);
311+
312+
final KeyManagerFactory keyManagerFactory = parameters.flatMap(SecureHttpTransportParameters::keyManagerFactory)
313+
.orElseThrow(() -> new OpenSearchException("The KeyManagerFactory instance is not provided"));
314+
315+
final SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(keyManagerFactory);
316+
parameters.flatMap(SecureHttpTransportParameters::trustManagerFactory).ifPresent(sslContextBuilder::trustManager);
317+
parameters.map(SecureHttpTransportParameters::cipherSuites)
318+
.ifPresent(ciphers -> sslContextBuilder.ciphers(ciphers, SupportedCipherSuiteFilter.INSTANCE));
319+
320+
final SslContext sslContext = sslContextBuilder.protocols(
321+
parameters.map(SecureHttpTransportParameters::protocols).orElseGet(() -> Arrays.asList(SslUtils.DEFAULT_SSL_PROTOCOLS))
322+
)
323+
.applicationProtocolConfig(
324+
new ApplicationProtocolConfig(
325+
ApplicationProtocolConfig.Protocol.ALPN,
326+
// NO_ADVERTISE is currently the only mode supported by both OpenSsl and JDK providers.
327+
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
328+
// ACCEPT is currently the only mode supported by both OpenSsl and JDK providers.
329+
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
330+
ApplicationProtocolNames.HTTP_2,
331+
ApplicationProtocolNames.HTTP_1_1
332+
)
333+
)
334+
.build();
348335

349-
@Override
350-
public ApplicationProtocolNegotiator applicationProtocolNegotiator() {
351-
return new ApplicationProtocolNegotiator() {
352-
@Override
353-
public List<String> protocols() {
354-
return applicationProtocols;
355-
}
356-
};
357-
}
358-
}).build()).protocol(HttpProtocol.HTTP11, HttpProtocol.H2);
359-
} finally {
360-
ReferenceCountUtil.release(engine);
361-
}
336+
configured = configured.secure(spec -> spec.sslContext(sslContext)).protocol(HttpProtocol.HTTP11, HttpProtocol.H2);
362337
} else {
363338
configured = configured.protocol(HttpProtocol.HTTP11, HttpProtocol.H2C);
364339
}
@@ -373,6 +348,12 @@ public List<String> protocols() {
373348
* @return response publisher
374349
*/
375350
protected Publisher<Void> incomingRequest(HttpServerRequest request, HttpServerResponse response) {
351+
// At least now, Reactor Netty does not respect maxInitialLineLength setting for HTTP/2 (but
352+
// does respect it for H2C and HTTP/1.1)
353+
if (request.uri().length() > maxInitialLineLength.bytesAsInt()) {
354+
return response.status(HttpResponseStatus.REQUEST_URI_TOO_LONG).send();
355+
}
356+
376357
final Method method = HttpConversionUtil.convertMethod(request.method());
377358
final Optional<RestHandler> dispatchHandlerOpt = dispatcher.dispatchHandler(
378359
request.uri(),

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/SslUtils.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
* Helper class for creating default SSL engines
2222
*/
2323
public class SslUtils {
24-
private static final String[] DEFAULT_SSL_PROTOCOLS = { "TLSv1.3", "TLSv1.2", "TLSv1.1" };
24+
/**
25+
* Default support TLS protocols
26+
*/
27+
public static final String[] DEFAULT_SSL_PROTOCOLS = { "TLSv1.3", "TLSv1.2", "TLSv1.1" };
2528

2629
private SslUtils() {}
2730

plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import reactor.core.publisher.ParallelFlux;
5555
import reactor.netty.http.Http11SslContextSpec;
5656
import reactor.netty.http.Http2SslContextSpec;
57+
import reactor.netty.http.HttpProtocol;
5758
import reactor.netty.http.client.HttpClient;
5859

5960
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
@@ -65,6 +66,7 @@
6566
public class ReactorHttpClient implements Closeable {
6667
private final boolean compression;
6768
private final boolean secure;
69+
private final boolean useHttp11Only;
6870

6971
static Collection<String> returnHttpResponseBodies(Collection<FullHttpResponse> responses) {
7072
List<String> list = new ArrayList<>(responses.size());
@@ -85,6 +87,7 @@ static Collection<String> returnOpaqueIds(Collection<FullHttpResponse> responses
8587
public ReactorHttpClient(boolean compression, boolean secure) {
8688
this.compression = compression;
8789
this.secure = secure;
90+
this.useHttp11Only = OpenSearchTestCase.randomBoolean();
8891
}
8992

9093
public static ReactorHttpClient create() {
@@ -265,25 +268,36 @@ private HttpClient createClient(final InetSocketAddress remoteAddress, final Nio
265268
.runOn(eventLoopGroup)
266269
.host(remoteAddress.getHostString())
267270
.port(remoteAddress.getPort())
268-
.compress(compression);
271+
.compress(compression)
272+
.protocol(HttpProtocol.H2, HttpProtocol.HTTP11);
269273

270274
if (secure) {
271-
return client.secure(
272-
spec -> spec.sslContext(
273-
OpenSearchTestCase.randomBoolean()
274-
/* switch between HTTP 1.1/HTTP 2 randomly, both are supported */ ? Http11SslContextSpec.forClient()
275-
.configure(s -> s.clientAuth(ClientAuth.NONE).trustManager(InsecureTrustManagerFactory.INSTANCE))
276-
: Http2SslContextSpec.forClient()
277-
.configure(s -> s.clientAuth(ClientAuth.NONE).trustManager(InsecureTrustManagerFactory.INSTANCE))
278-
)
275+
return client.protocol(
276+
useHttp11Only ? new HttpProtocol[] { HttpProtocol.HTTP11 } : new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2 }
277+
)
278+
.secure(
279+
spec -> spec.sslContext(
280+
useHttp11Only
281+
/* switch between HTTP 1.1/HTTP 2 randomly, both are supported */
282+
? Http11SslContextSpec.forClient()
283+
.configure(s -> s.clientAuth(ClientAuth.NONE).trustManager(InsecureTrustManagerFactory.INSTANCE))
284+
: Http2SslContextSpec.forClient()
285+
.configure(s -> s.clientAuth(ClientAuth.NONE).trustManager(InsecureTrustManagerFactory.INSTANCE))
286+
)
287+
);
288+
} else {
289+
return client.protocol(
290+
useHttp11Only ? new HttpProtocol[] { HttpProtocol.HTTP11 } : new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2C }
279291
);
280292
}
281-
282-
return client;
283293
}
284294

285295
@Override
286296
public void close() {
287297

288298
}
299+
300+
public boolean useHttp11only() {
301+
return useHttp11Only;
302+
}
289303
}

plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,13 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext,
208208
final HttpContent continuationRequest = new DefaultHttpContent(Unpooled.EMPTY_BUFFER);
209209
final FullHttpResponse continuationResponse = client.send(remoteAddress.address(), request, continuationRequest);
210210
try {
211-
assertThat(continuationResponse.status(), is(HttpResponseStatus.OK));
212-
assertThat(new String(ByteBufUtil.getBytes(continuationResponse.content()), StandardCharsets.UTF_8), is("done"));
211+
if (expectedStatus == HttpResponseStatus.EXPECTATION_FAILED && client.useHttp11only() == false) {
212+
assertThat(continuationResponse.status(), is(HttpResponseStatus.EXPECTATION_FAILED));
213+
assertThat(new String(ByteBufUtil.getBytes(continuationResponse.content()), StandardCharsets.UTF_8), is(""));
214+
} else {
215+
assertThat(continuationResponse.status(), is(HttpResponseStatus.OK));
216+
assertThat(new String(ByteBufUtil.getBytes(continuationResponse.content()), StandardCharsets.UTF_8), is("done"));
217+
}
213218
} finally {
214219
continuationResponse.release();
215220
}

plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,11 @@
4848
import javax.net.ssl.KeyManagerFactory;
4949
import javax.net.ssl.SSLEngine;
5050
import javax.net.ssl.SSLException;
51+
import javax.net.ssl.TrustManagerFactory;
5152

5253
import java.nio.charset.StandardCharsets;
54+
import java.util.Arrays;
55+
import java.util.Collection;
5356
import java.util.Collections;
5457
import java.util.Optional;
5558
import java.util.concurrent.CountDownLatch;
@@ -80,6 +83,7 @@
8083
import io.netty.handler.codec.http.HttpResponseStatus;
8184
import io.netty.handler.codec.http.HttpUtil;
8285
import io.netty.handler.codec.http.HttpVersion;
86+
import io.netty.handler.codec.http2.Http2SecurityUtil;
8387
import io.netty.handler.ssl.SslContextBuilder;
8488
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
8589

@@ -108,7 +112,45 @@ public void setup() throws Exception {
108112
bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
109113
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
110114

115+
var keyManagerFactory = KeyManagerFactory.getInstance("PKIX");
116+
keyManagerFactory.init(KeyStoreUtils.createServerKeyStore(), KEYSTORE_PASSWORD);
117+
111118
secureHttpTransportSettingsProvider = new SecureHttpTransportSettingsProvider() {
119+
@Override
120+
public Optional<SecureHttpTransportParameters> parameters(Settings settings) {
121+
return Optional.of(new SecureHttpTransportParameters() {
122+
@Override
123+
public Optional<KeyManagerFactory> keyManagerFactory() {
124+
return Optional.of(keyManagerFactory);
125+
}
126+
127+
@Override
128+
public Optional<String> sslProvider() {
129+
return Optional.empty();
130+
}
131+
132+
@Override
133+
public Optional<String> clientAuth() {
134+
return Optional.empty();
135+
}
136+
137+
@Override
138+
public Collection<String> protocols() {
139+
return Arrays.asList(SslUtils.DEFAULT_SSL_PROTOCOLS);
140+
}
141+
142+
@Override
143+
public Collection<String> cipherSuites() {
144+
return Http2SecurityUtil.CIPHERS;
145+
}
146+
147+
@Override
148+
public Optional<TrustManagerFactory> trustManagerFactory() {
149+
return Optional.of(InsecureTrustManagerFactory.INSTANCE);
150+
}
151+
});
152+
}
153+
112154
@Override
113155
public Optional<TransportExceptionHandler> buildHttpServerExceptionHandler(Settings settings, HttpServerTransport transport) {
114156
return Optional.empty();
@@ -117,8 +159,6 @@ public Optional<TransportExceptionHandler> buildHttpServerExceptionHandler(Setti
117159
@Override
118160
public Optional<SSLEngine> buildSecureHttpServerEngine(Settings settings, HttpServerTransport transport) throws SSLException {
119161
try {
120-
var keyManagerFactory = KeyManagerFactory.getInstance("PKIX");
121-
keyManagerFactory.init(KeyStoreUtils.createServerKeyStore(), KEYSTORE_PASSWORD);
122162
SSLEngine engine = SslContextBuilder.forServer(keyManagerFactory)
123163
.trustManager(InsecureTrustManagerFactory.INSTANCE)
124164
.build()
@@ -221,8 +261,13 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext,
221261
final HttpContent continuationRequest = new DefaultHttpContent(Unpooled.EMPTY_BUFFER);
222262
final FullHttpResponse continuationResponse = client.send(remoteAddress.address(), request, continuationRequest);
223263
try {
224-
assertThat(continuationResponse.status(), is(HttpResponseStatus.OK));
225-
assertThat(new String(ByteBufUtil.getBytes(continuationResponse.content()), StandardCharsets.UTF_8), is("done"));
264+
if (expectedStatus == HttpResponseStatus.EXPECTATION_FAILED && client.useHttp11only() == true) {
265+
assertThat(continuationResponse.status(), is(HttpResponseStatus.EXPECTATION_FAILED));
266+
assertThat(new String(ByteBufUtil.getBytes(continuationResponse.content()), StandardCharsets.UTF_8), is(""));
267+
} else {
268+
assertThat(continuationResponse.status(), is(HttpResponseStatus.OK));
269+
assertThat(new String(ByteBufUtil.getBytes(continuationResponse.content()), StandardCharsets.UTF_8), is("done"));
270+
}
226271
} finally {
227272
continuationResponse.release();
228273
}

0 commit comments

Comments
 (0)