diff --git a/CHANGELOG.md b/CHANGELOG.md index e218fbe820996..8afa56fa3e71d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Added public getter method in `SourceFieldMapper` to return excluded field ([#20205](https://github.com/opensearch-project/OpenSearch/pull/20205)) - Relax jar hell check when extended plugins share transitive dependencies ([#20103](https://github.com/opensearch-project/OpenSearch/pull/20103)) - Added public getter method in `SourceFieldMapper` to return included field ([#20290](https://github.com/opensearch-project/OpenSearch/pull/20290)) +- Support for HTTP/3 (server side) ([#20017](https://github.com/opensearch-project/OpenSearch/pull/20017)) ### Changed - Handle custom metadata files in subdirectory-store ([#20157](https://github.com/opensearch-project/OpenSearch/pull/20157)) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 6c090d0de5be4..fa865ee59cd4d 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -149,6 +149,7 @@ netty-codec-dns = { group = "io.netty", name = "netty-codec-dns", version.ref = netty-codec-http = { group = "io.netty", name = "netty-codec-http", version.ref = "netty" } netty-codec-http2 = { group = "io.netty", name = "netty-codec-http2", version.ref = "netty" } netty-codec-http3 = { group = "io.netty", name = "netty-codec-http3", version.ref = "netty" } +netty-codec-native-quic = { group = "io.netty", name = "netty-codec-native-quic", version.ref = "netty" } netty-codec-classes-quic = { group = "io.netty", name = "netty-codec-classes-quic", version.ref = "netty" } netty-common = { group = "io.netty", name = "netty-common", version.ref = "netty" } netty-handler = { group = "io.netty", name = "netty-handler", version.ref = "netty" } diff --git a/modules/transport-netty4/build.gradle b/modules/transport-netty4/build.gradle index ac918a23791f1..0ab3fe86b701f 100644 --- a/modules/transport-netty4/build.gradle +++ b/modules/transport-netty4/build.gradle @@ -63,14 +63,25 @@ dependencies { api "io.netty:netty-codec:${versions.netty}" api "io.netty:netty-codec-http:${versions.netty}" api "io.netty:netty-codec-http2:${versions.netty}" + api "io.netty:netty-codec-http3:${versions.netty}" + api "io.netty:netty-codec-classes-quic:${versions.netty}" + api "io.netty:netty-codec-native-quic:${versions.netty}" api "io.netty:netty-common:${versions.netty}" api "io.netty:netty-handler:${versions.netty}" api "io.netty:netty-resolver:${versions.netty}" api "io.netty:netty-transport:${versions.netty}" api "io.netty:netty-transport-native-unix-common:${versions.netty}" + testFipsRuntimeOnly "org.bouncycastle:bc-fips:${versions.bouncycastle_jce}" testFipsRuntimeOnly "org.bouncycastle:bctls-fips:${versions.bouncycastle_tls}" testFipsRuntimeOnly "org.bouncycastle:bcutil-fips:${versions.bouncycastle_util}" + + // Bundle all supported OSes and Archs + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:linux-x86_64" + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:linux-aarch_64" + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:osx-x86_64" + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:osx-aarch_64" + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:windows-x86_64" } restResources { @@ -205,7 +216,9 @@ thirdPartyAudit { 'io.netty.pkitesting.CertificateBuilder', 'io.netty.pkitesting.CertificateBuilder$Algorithm', - 'io.netty.pkitesting.X509Bundle' + 'io.netty.pkitesting.X509Bundle', + + 'io.netty.channel.epoll.SegmentedDatagramPacket' ) ignoreViolations( diff --git a/modules/transport-netty4/licenses/netty-codec-classes-quic-4.2.9.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-classes-quic-4.2.9.Final.jar.sha1 new file mode 100644 index 0000000000000..8b46ba5c2ee7b --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-classes-quic-4.2.9.Final.jar.sha1 @@ -0,0 +1 @@ +8b441d8465077c1ac1ed57bad3f087bc2a84e994 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-http3-4.2.9.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-http3-4.2.9.Final.jar.sha1 new file mode 100644 index 0000000000000..8bc7028b2fb09 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-http3-4.2.9.Final.jar.sha1 @@ -0,0 +1 @@ +e8a593762196738c0265259e5728a275e2d8f1ea \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final-linux-aarch_64.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final-linux-aarch_64.jar.sha1 new file mode 100644 index 0000000000000..1104459d16a35 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final-linux-aarch_64.jar.sha1 @@ -0,0 +1 @@ +87ce82aa487b3701a83312b748d2d311b58bbe68 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final-linux-x86_64.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final-linux-x86_64.jar.sha1 new file mode 100644 index 0000000000000..792d3a0e7424d --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final-linux-x86_64.jar.sha1 @@ -0,0 +1 @@ +3f0ab9d43f00c8a02ba22702aff482f598863642 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final-osx-aarch_64.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final-osx-aarch_64.jar.sha1 new file mode 100644 index 0000000000000..ecf9df0b6d865 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final-osx-aarch_64.jar.sha1 @@ -0,0 +1 @@ +7725db063980a53420afbced5fb39dbd0eebbd8a \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final-osx-x86_64.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final-osx-x86_64.jar.sha1 new file mode 100644 index 0000000000000..c626d90d146fb --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final-osx-x86_64.jar.sha1 @@ -0,0 +1 @@ +53eee547f8731254084274d31b82171c51d509fa \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final-windows-x86_64.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final-windows-x86_64.jar.sha1 new file mode 100644 index 0000000000000..dd7184e852909 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final-windows-x86_64.jar.sha1 @@ -0,0 +1 @@ +6873dd577760b80b345fa5ab7dfe589ed9c1bbc3 \ No newline at end of file diff --git a/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final.jar.sha1 b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final.jar.sha1 new file mode 100644 index 0000000000000..dac0db01661c6 --- /dev/null +++ b/modules/transport-netty4/licenses/netty-codec-native-quic-4.2.9.Final.jar.sha1 @@ -0,0 +1 @@ +97fda97b8df8f3e8fefa50336e108756b76f8b77 \ No newline at end of file diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4CompositeHttpServerTransport.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4CompositeHttpServerTransport.java new file mode 100644 index 0000000000000..4853bdee208ca --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4CompositeHttpServerTransport.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.netty4; + +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.common.transport.BoundTransportAddress; +import org.opensearch.http.AbstractHttpServerTransport; +import org.opensearch.http.HttpInfo; +import org.opensearch.http.HttpServerTransport; +import org.opensearch.http.HttpStats; + +import java.io.IOException; + +public class Netty4CompositeHttpServerTransport extends AbstractLifecycleComponent implements HttpServerTransport { + private final AbstractHttpServerTransport[] transports; + + public Netty4CompositeHttpServerTransport(AbstractHttpServerTransport... transports) { + if (transports == null || transports.length == 0) { + throw new IllegalArgumentException("At least one transport must be provided"); + } + + this.transports = transports; + } + + @Override + public BoundTransportAddress boundAddress() { + return transports[0].boundAddress(); /* both transports bind to the same port but different protocol */ + } + + @Override + public HttpInfo info() { + return transports[0].info(); + } + + @Override + public HttpStats stats() { + long serverOpen = 0L, totalOpen = 0L; + + for (AbstractHttpServerTransport transport : transports) { + final HttpStats stats = transport.stats(); + serverOpen += stats.getServerOpen(); + totalOpen += stats.getTotalOpen(); + } + + return new HttpStats.Builder().serverOpen(serverOpen).totalOpen(totalOpen).build(); + } + + @Override + protected void doStart() { + for (AbstractHttpServerTransport transport : transports) { + transport.start(); + } + } + + @Override + protected void doStop() { + for (AbstractHttpServerTransport transport : transports) { + transport.stop(); + } + } + + @Override + protected void doClose() throws IOException { + for (AbstractHttpServerTransport transport : transports) { + IOUtils.closeWhileHandlingException(transport); + } + } +} diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3RequestCreator.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3RequestCreator.java new file mode 100644 index 0000000000000..5ab5cab15a430 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3RequestCreator.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.http.netty4; + +import org.opensearch.ExceptionsHelper; +import org.opensearch.core.common.unit.ByteSizeValue; + +import java.util.List; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.TooLongHttpLineException; + +@ChannelHandler.Sharable +class Netty4Http3RequestCreator extends MessageToMessageDecoder { + private final ByteSizeValue maxInitialLineLength; + + Netty4Http3RequestCreator(ByteSizeValue maxInitialLineLength) { + this.maxInitialLineLength = maxInitialLineLength; + } + + @Override + protected void decode(ChannelHandlerContext ctx, FullHttpRequest msg, List out) { + if (msg.decoderResult().isFailure()) { + final Throwable cause = msg.decoderResult().cause(); + final Exception nonError; + if (cause instanceof Error) { + ExceptionsHelper.maybeDieOnAnotherThread(cause); + nonError = new Exception(cause); + } else { + nonError = (Exception) cause; + } + out.add(new Netty4HttpRequest(msg.retain(), nonError)); + } else { + // The HTTP/3 implementation in Netty does not validate Request URI length, manually + // applying the validation rules here. + if (msg.uri().length() > maxInitialLineLength.bytesAsInt()) { + out.add( + new Netty4HttpRequest( + msg.retain(), + new TooLongHttpLineException("An HTTP line is larger than " + maxInitialLineLength.bytesAsInt() + " bytes.") + ) + ); + } else { + out.add(new Netty4HttpRequest(msg.retain())); + } + } + } +} diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java new file mode 100644 index 0000000000000..295be3887717e --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java @@ -0,0 +1,469 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.netty4; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchException; +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.http.AbstractHttpServerTransport; +import org.opensearch.http.HttpChannel; +import org.opensearch.http.HttpHandlingSettings; +import org.opensearch.http.HttpReadTimeoutException; +import org.opensearch.http.HttpServerChannel; +import org.opensearch.http.netty4.http3.SecureQuicTokenHandler; +import org.opensearch.plugins.SecureHttpTransportSettingsProvider; +import org.opensearch.plugins.SecureHttpTransportSettingsProvider.SecureHttpTransportParameters; +import org.opensearch.plugins.TransportExceptionHandler; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.NettyAllocator; +import org.opensearch.transport.NettyByteBufSizer; +import org.opensearch.transport.SharedGroupFactory; +import org.opensearch.transport.netty4.Netty4Utils; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLEngine; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.handler.codec.DecoderException; +import io.netty.handler.codec.compression.Brotli; +import io.netty.handler.codec.compression.CompressionOptions; +import io.netty.handler.codec.compression.DeflateOptions; +import io.netty.handler.codec.compression.GzipOptions; +import io.netty.handler.codec.compression.StandardCompressionOptions; +import io.netty.handler.codec.compression.ZstdEncoder; +import io.netty.handler.codec.http.HttpContentCompressor; +import io.netty.handler.codec.http.HttpContentDecompressor; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http3.Http3; +import io.netty.handler.codec.http3.Http3FrameToHttpObjectCodec; +import io.netty.handler.codec.http3.Http3ServerConnectionHandler; +import io.netty.handler.codec.quic.QuicChannel; +import io.netty.handler.codec.quic.QuicSslContext; +import io.netty.handler.codec.quic.QuicSslContextBuilder; +import io.netty.handler.codec.quic.QuicSslEngine; +import io.netty.handler.codec.quic.QuicStreamChannel; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.timeout.ReadTimeoutException; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.util.AttributeKey; + +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_CONNECT_TIMEOUT; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT; +import static org.opensearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS; +import static org.opensearch.http.netty4.Netty4HttpServerTransport.SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS; +import static org.opensearch.http.netty4.Netty4HttpServerTransport.SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE; + +/** + * The HTTP/3 transport implementations based on Netty 4. + */ +public class Netty4Http3ServerTransport extends AbstractHttpServerTransport { + private static final Logger logger = LogManager.getLogger(Netty4Http3ServerTransport.class); + + /** + * Set the initial maximum data limit for local bidirectional streams (in bytes). + */ + public static final Setting SETTING_H3_MAX_STREAM_LOCAL_LENGTH = Setting.byteSizeSetting( + "h3.max_stream_local_length", + new ByteSizeValue(1000000, ByteSizeUnit.BYTES), + Property.NodeScope + ); + + /** + * Set the initial maximum data limit for remote bidirectional streams (in bytes). + */ + public static final Setting SETTING_H3_MAX_STREAM_REMOTE_LENGTH = Setting.byteSizeSetting( + "h3.max_stream_remote_length", + new ByteSizeValue(1000000, ByteSizeUnit.BYTES), + Property.NodeScope + ); + + /** + * Set the initial maximum stream limit for bidirectional streams. + * + * The HTTP/3 standard expects that each end configures at least 100 + * concurrent bidirectional streams at a time, to avoid reducing performance + * by reducing parallelism. + */ + public static final Setting SETTING_H3_MAX_STREAMS = Setting.longSetting("h3.max_streams", 100L, Property.NodeScope); + + private final ByteSizeValue maxInitialLineLength; + private final ByteSizeValue maxHeaderSize; + private final ByteSizeValue maxChunkSize; + + private final SharedGroupFactory sharedGroupFactory; + private final RecvByteBufAllocator recvByteBufAllocator; + private final int readTimeoutMillis; + private final int connectTimeoutMillis; + + private final int maxCompositeBufferComponents; + private final int pipeliningMaxEvents; + + private volatile Bootstrap bootstrap; + private volatile SharedGroupFactory.SharedGroup sharedGroup; + private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider; + private final TransportExceptionHandler exceptionHandler; + + /** + * Creates new HTTP transport implementations based on Netty 4 + * @param settings settings + * @param networkService network service + * @param bigArrays big array allocator + * @param threadPool thread pool instance + * @param xContentRegistry XContent registry instance + * @param dispatcher dispatcher instance + * @param clusterSettings cluster settings + * @param sharedGroupFactory shared group factory + */ + public Netty4Http3ServerTransport( + final Settings settings, + final NetworkService networkService, + final BigArrays bigArrays, + final ThreadPool threadPool, + final NamedXContentRegistry xContentRegistry, + final Dispatcher dispatcher, + final ClusterSettings clusterSettings, + final SharedGroupFactory sharedGroupFactory, + final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider, + final Tracer tracer + ) { + super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, tracer); + Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings)); + NettyAllocator.logAllocatorDescriptionIfNeeded(); + this.sharedGroupFactory = sharedGroupFactory; + this.secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider; + this.exceptionHandler = secureHttpTransportSettingsProvider.buildHttpServerExceptionHandler(settings, this) + .orElse(TransportExceptionHandler.NOOP); + + this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings); + this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); + this.maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings); + this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings); + + this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings); + + this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis()); + this.connectTimeoutMillis = Math.toIntExact(SETTING_HTTP_CONNECT_TIMEOUT.get(settings).getMillis()); + + ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings); + recvByteBufAllocator = new FixedRecvByteBufAllocator(receivePredictor.bytesAsInt()); + + logger.debug( + "using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], " + + "receive_predictor[{}], max_composite_buffer_components[{}]", + maxChunkSize, + maxHeaderSize, + maxInitialLineLength, + maxContentLength, + receivePredictor, + maxCompositeBufferComponents + ); + } + + public Settings settings() { + return this.settings; + } + + @Override + protected void doStart() { + boolean success = false; + try { + sharedGroup = sharedGroupFactory.getHttpGroup(); + bootstrap = new Bootstrap(); + + bootstrap.group(sharedGroup.getLowLevelGroup()); + + // NettyAllocator will return the channel type designed to work with the configuredAllocator + bootstrap.channel(NioDatagramChannel.class); + + // Set the allocators for both the server channel and the child channels created + bootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator(true)); + bootstrap.handler(configureServerChannelHandler()); + bootstrap.option(ChannelOption.RECVBUF_ALLOCATOR, recvByteBufAllocator); + + bindServer(); + success = true; + } finally { + if (success == false) { + doStop(); // otherwise we leak threads since we never moved to started + } + } + } + + @Override + protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Exception { + ChannelFuture future = bootstrap.bind(socketAddress).sync(); + Channel channel = future.channel(); + Netty4HttpServerChannel httpServerChannel = new Netty4HttpServerChannel(channel); + channel.attr(HTTP_SERVER_CHANNEL_KEY).set(httpServerChannel); + return httpServerChannel; + } + + @Override + protected void stopInternal() { + if (sharedGroup != null) { + sharedGroup.shutdown(); + sharedGroup = null; + } + } + + @Override + public void onException(HttpChannel channel, Exception cause0) { + Throwable cause = cause0; + + if (cause0 instanceof DecoderException) { + cause = cause0.getCause(); + } + + exceptionHandler.onError(cause); + logger.error("Exception during establishing a HTTP/3 connection: " + cause, cause); + + if (cause instanceof ReadTimeoutException) { + super.onException(channel, new HttpReadTimeoutException(readTimeoutMillis, cause0)); + } else { + super.onException(channel, cause0); + } + } + + public ChannelHandler configureServerChannelHandler() { + return new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + final Optional parameters = secureHttpTransportSettingsProvider.parameters(settings); + + final KeyManagerFactory keyManagerFactory = parameters.flatMap(SecureHttpTransportParameters::keyManagerFactory) + .orElseThrow(() -> new OpenSearchException("The KeyManagerFactory instance is not provided")); + + final QuicSslContextBuilder sslContextBuilder = QuicSslContextBuilder.forServer(keyManagerFactory, null); + + parameters.flatMap(SecureHttpTransportParameters::trustManagerFactory).ifPresent(sslContextBuilder::trustManager); + parameters.flatMap(SecureHttpTransportParameters::clientAuth) + .ifPresent(clientAuth -> sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth))); + + final QuicSslContext sslContext = sslContextBuilder.applicationProtocols( + io.netty.handler.codec.http3.Http3.supportedApplicationProtocols() + ).build(); + + ch.pipeline().addLast(Http3.newQuicServerCodecBuilder().sslEngineProvider(q -> { + final QuicSslEngine engine = sslContext.newEngine(q.alloc()); + q.attr(HTTP_SERVER_ENGINE_KEY).set(engine); + return engine; + }) + .maxIdleTimeout(connectTimeoutMillis, TimeUnit.MILLISECONDS) + .initialMaxData(SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings).getBytes()) + .initialMaxStreamDataBidirectionalLocal(SETTING_H3_MAX_STREAM_LOCAL_LENGTH.get(settings).getBytes()) + .initialMaxStreamDataBidirectionalRemote(SETTING_H3_MAX_STREAM_REMOTE_LENGTH.get(settings).getBytes()) + .initialMaxStreamsBidirectional(SETTING_H3_MAX_STREAMS.get(settings).longValue()) + .tokenHandler(new SecureQuicTokenHandler()) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(QuicChannel ch) { + // Called for each connection + ch.pipeline() + .addLast( + new Http3ServerConnectionHandler( + new HttpChannelHandler(Netty4Http3ServerTransport.this, handlingSettings) + ) + ); + } + }) + .build()); + } + }; + } + + public static final AttributeKey HTTP_CHANNEL_KEY = AttributeKey.newInstance("opensearch-quic-channel"); + protected static final AttributeKey HTTP_SERVER_ENGINE_KEY = AttributeKey.newInstance("opensearch-quic-server-ssl-engine"); + + protected static final AttributeKey HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance( + "opensearch-quic-server-channel" + ); + + protected static class HttpChannelHandler extends ChannelInitializer { + + private final Netty4Http3ServerTransport transport; + private final NettyByteBufSizer byteBufSizer; + private final Netty4Http3RequestCreator requestCreator; + private final Netty4HttpRequestHandler requestHandler; + private final Netty4HttpResponseCreator responseCreator; + private final HttpHandlingSettings handlingSettings; + + protected HttpChannelHandler(final Netty4Http3ServerTransport transport, final HttpHandlingSettings handlingSettings) { + this.transport = transport; + this.handlingSettings = handlingSettings; + this.byteBufSizer = new NettyByteBufSizer(); + this.requestCreator = new Netty4Http3RequestCreator(transport.maxInitialLineLength); + this.requestHandler = new Netty4HttpRequestHandler(transport, HTTP_CHANNEL_KEY); + this.responseCreator = new Netty4HttpResponseCreator(); + } + + public ChannelHandler getRequestHandler() { + return requestHandler; + } + + @Override + protected void initChannel(QuicStreamChannel ch) throws Exception { + final Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch, HTTP_SERVER_ENGINE_KEY); + ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel); + ch.pipeline().addLast("byte_buf_sizer", byteBufSizer); + ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS)); + + configurePipeline(ch); + transport.serverAcceptedChannel(nettyHttpChannel); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ExceptionsHelper.maybeDieOnAnotherThread(cause); + super.exceptionCaught(ctx, cause); + + final Channel parent = parent(ctx.channel()); + final Channel channel = (parent != null) ? parent : ctx.channel(); + final Netty4HttpServerChannel httpServerChannel = channel.attr(HTTP_SERVER_CHANNEL_KEY).get(); + if (cause instanceof Error) { + transport.onServerException(httpServerChannel, new Exception(cause)); + } else { + transport.onServerException(httpServerChannel, (Exception) cause); + } + } + + protected void configurePipeline(Channel ch) { + ch.pipeline().addLast(new Http3FrameToHttpObjectCodec(true)); + ch.pipeline().addLast("header_verifier", transport.createHeaderVerifier()); + ch.pipeline().addLast("decoder_compress", transport.createDecompressor()); + final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength()); + aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); + ch.pipeline().addLast("aggregator", aggregator); + if (handlingSettings.isCompression()) { + ch.pipeline() + .addLast( + "encoder_compress", + new HttpContentCompressor(defaultCompressionOptions(handlingSettings.getCompressionLevel())) + ); + } + ch.pipeline().addLast("request_creator", requestCreator); + ch.pipeline().addLast("response_creator", responseCreator); + ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents)); + ch.pipeline().addLast("handler", requestHandler); + } + + private NioDatagramChannel parent(Channel channel) { + if (channel == null) { + return null; + } else if (channel instanceof NioDatagramChannel ndc) { + return ndc; /* parent server channel */ + } else { + return parent(channel.parent()); + } + } + } + + /** + * Extension point that allows a NetworkPlugin to extend the netty pipeline and inspect headers after request decoding + */ + protected ChannelInboundHandlerAdapter createHeaderVerifier() { + // pass-through + return new ChannelInboundHandlerAdapter(); + } + + /** + * Extension point that allows a NetworkPlugin to override the default netty HttpContentDecompressor and supply a custom decompressor. + * + * Used in instances to conditionally decompress depending on the outcome from header verification + */ + protected ChannelInboundHandlerAdapter createDecompressor() { + return new HttpContentDecompressor(); + } + + /** + * Copy of {@link HttpContentCompressor} default compression options with ZSTD excluded: + * although zstd-jni is on the classpath, {@link ZstdEncoder} requires direct buffers support + * which by default {@link NettyAllocator} does not provide. + * + * @param compressionLevel + * {@code 1} yields the fastest compression and {@code 9} yields the + * best compression. {@code 0} means no compression. The default + * compression level is {@code 6}. + * + * @return default compression options + */ + private static CompressionOptions[] defaultCompressionOptions(int compressionLevel) { + return defaultCompressionOptions(compressionLevel, 15, 8); + } + + /** + * Copy of {@link HttpContentCompressor} default compression options with ZSTD excluded: + * although zstd-jni is on the classpath, {@link ZstdEncoder} requires direct buffers support + * which by default {@link NettyAllocator} does not provide. + * + * @param compressionLevel + * {@code 1} yields the fastest compression and {@code 9} yields the + * best compression. {@code 0} means no compression. The default + * compression level is {@code 6}. + * @param windowBits + * The base two logarithm of the size of the history buffer. The + * value should be in the range {@code 9} to {@code 15} inclusive. + * Larger values result in better compression at the expense of + * memory usage. The default value is {@code 15}. + * @param memLevel + * How much memory should be allocated for the internal compression + * state. {@code 1} uses minimum memory and {@code 9} uses maximum + * memory. Larger values result in better and faster compression + * at the expense of memory usage. The default value is {@code 8} + * + * @return default compression options + */ + private static CompressionOptions[] defaultCompressionOptions(int compressionLevel, int windowBits, int memLevel) { + final List options = new ArrayList(4); + final GzipOptions gzipOptions = StandardCompressionOptions.gzip(compressionLevel, windowBits, memLevel); + final DeflateOptions deflateOptions = StandardCompressionOptions.deflate(compressionLevel, windowBits, memLevel); + + options.add(gzipOptions); + options.add(deflateOptions); + options.add(StandardCompressionOptions.snappy()); + + if (Brotli.isAvailable()) { + options.add(StandardCompressionOptions.brotli()); + } + + return options.toArray(new CompressionOptions[0]); + } + +} diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java index 75d30aa9797c0..8e4aed40e330e 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java @@ -39,26 +39,45 @@ import org.opensearch.http.HttpResponse; import org.opensearch.transport.netty4.Netty4TcpChannel; +import javax.net.ssl.SSLEngine; + +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.Optional; +import java.util.function.Function; import io.netty.channel.Channel; import io.netty.channel.ChannelPipeline; +import io.netty.util.AttributeKey; public class Netty4HttpChannel implements HttpChannel { + private static final InetSocketAddress NO_SOCKET_ADDRESS = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); + private static final String CHANNEL_PROPERTY = "channel"; + private static final String SSL_ENGINE_PROPERTY = "ssl_engine"; private final Channel channel; private final CompletableContext closeContext = new CompletableContext<>(); private final ChannelPipeline inboundPipeline; + private final AttributeKey sslEngineKey; Netty4HttpChannel(Channel channel) { - this(channel, null); + this(channel, null, null); + } + + Netty4HttpChannel(Channel channel, AttributeKey sslEngineKey) { + this(channel, null, sslEngineKey); } Netty4HttpChannel(Channel channel, ChannelPipeline inboundPipeline) { + this(channel, inboundPipeline, null); + } + + Netty4HttpChannel(Channel channel, ChannelPipeline inboundPipeline, AttributeKey sslEngineKey) { this.channel = channel; this.inboundPipeline = inboundPipeline; + this.sslEngineKey = sslEngineKey; Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext); } @@ -69,12 +88,26 @@ public void sendResponse(HttpResponse response, ActionListener listener) { @Override public InetSocketAddress getLocalAddress() { - return (InetSocketAddress) channel.localAddress(); + if (channel.localAddress() instanceof InetSocketAddress isa) { + return isa; + } else { + return getAddressFromParent(channel, Channel::localAddress); + } } @Override public InetSocketAddress getRemoteAddress() { - return (InetSocketAddress) channel.remoteAddress(); + if (channel.remoteAddress() instanceof InetSocketAddress isa) { + return isa; + } else { + final InetSocketAddress address = getAddressFromParent(channel, Channel::remoteAddress); + if (address == null && channel.remoteAddress() != null) { + // In case of QUIC / HTTP3, the datagram channel (parent) may not have address + // populated, but QuicChannelXxx does, returning the placeholder with 0 port here. + return NO_SOCKET_ADDRESS; + } + return address; + } } @Override @@ -107,6 +140,16 @@ public Optional get(String name, Class clazz) { return (Optional) Optional.of(getNettyChannel()); } + if (SSL_ENGINE_PROPERTY.equalsIgnoreCase(name) && clazz.isAssignableFrom(SSLEngine.class)) { + SSLEngine engine = channel.attr(sslEngineKey).get(); + if (engine == null && channel.parent() != null) { + engine = channel.parent().attr(sslEngineKey).get(); + } + if (engine != null) { + return (Optional) Optional.of(engine); + } + } + Object handler = getNettyChannel().pipeline().get(name); if (handler == null && inboundPipeline() != null) { @@ -124,4 +167,22 @@ public Optional get(String name, Class clazz) { public String toString() { return "Netty4HttpChannel{" + "localAddress=" + getLocalAddress() + ", remoteAddress=" + getRemoteAddress() + '}'; } + + /** + * Attempts to extract the {@link InetSocketAddress} from parent channel, since + * some channels, like QuicXxxChannel, do not expose {@link InetSocketAddress} but + * {@link SocketAddress} only + */ + private InetSocketAddress getAddressFromParent(Channel channel, Function socketAddressSupplier) { + final Channel parent = channel.parent(); + if (parent != null) { + if (socketAddressSupplier.apply(parent) instanceof InetSocketAddress isa) { + return isa; + } else { + return getAddressFromParent(parent, socketAddressSupplier); + } + } else { + return null; /* Not connected */ + } + } } diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpRequest.java index b0bbe4b2fdf1a..c32ac37f903bf 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpRequest.java @@ -219,6 +219,8 @@ public HttpVersion protocolVersion() { return HttpRequest.HttpVersion.HTTP_1_1; } else if (request.protocolVersion().equals("HTTP/2.0")) { return HttpRequest.HttpVersion.HTTP_2_0; + } else if (request.protocolVersion().equals("HTTP/3.0")) { + return HttpRequest.HttpVersion.HTTP_3_0; } else { throw new IllegalArgumentException("Unexpected http protocol version: " + request.protocolVersion()); } diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpRequestHandler.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpRequestHandler.java index 1f7aaf17d2191..c86cad6701f69 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpRequestHandler.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpRequestHandler.java @@ -33,24 +33,28 @@ package org.opensearch.http.netty4; import org.opensearch.ExceptionsHelper; +import org.opensearch.http.AbstractHttpServerTransport; import org.opensearch.http.HttpPipelinedRequest; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.AttributeKey; @ChannelHandler.Sharable class Netty4HttpRequestHandler extends SimpleChannelInboundHandler { - private final Netty4HttpServerTransport serverTransport; + private final AbstractHttpServerTransport serverTransport; + private final AttributeKey channelKey; - Netty4HttpRequestHandler(Netty4HttpServerTransport serverTransport) { + Netty4HttpRequestHandler(AbstractHttpServerTransport serverTransport, AttributeKey channelKey) { this.serverTransport = serverTransport; + this.channelKey = channelKey; } @Override protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest httpRequest) { - final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get(); + final Netty4HttpChannel channel = ctx.channel().attr(channelKey).get(); boolean success = false; try { serverTransport.incomingRequest(httpRequest, channel); @@ -65,7 +69,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest http @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ExceptionsHelper.maybeDieOnAnotherThread(cause); - Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get(); + Netty4HttpChannel channel = ctx.channel().attr(channelKey).get(); if (cause instanceof Error) { serverTransport.onException(channel, new Exception(cause)); } else { diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java index 534a8298a1f5e..becce9018aa41 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java @@ -202,7 +202,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { /** * Creates new HTTP transport implementations based on Netty 4 - * @param settings seetings + * @param settings settings * @param networkService network service * @param bigArrays big array allocator * @param threadPool thread pool instance @@ -380,7 +380,7 @@ protected HttpChannelHandler(final Netty4HttpServerTransport transport, final Ht this.handlingSettings = handlingSettings; this.byteBufSizer = new NettyByteBufSizer(); this.requestCreator = new Netty4HttpRequestCreator(); - this.requestHandler = new Netty4HttpRequestHandler(transport); + this.requestHandler = new Netty4HttpRequestHandler(transport, HTTP_CHANNEL_KEY); this.responseCreator = new Netty4HttpResponseCreator(); } diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/http3/Http3Utils.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/http3/Http3Utils.java new file mode 100644 index 0000000000000..94c0fbdb44e0d --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/http3/Http3Utils.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.netty4.http3; + +/** + * Adapted from reactor.netty.http.internal.Http3 class + */ +public final class Http3Utils { + static final boolean isHttp3Available; + + static { + boolean http3; + try { + Class.forName("io.netty.handler.codec.http3.Http3"); + http3 = true; + } catch (Throwable t) { + http3 = false; + } + isHttp3Available = http3; + } + + private Http3Utils() { + + } + + /** + * Check if the current runtime supports HTTP/3, by verifying if {@code io.netty:netty-codec-native-quic} is on the classpath. + * + * @return true if {@code io.netty:netty-codec-native-quic} is available + */ + public static boolean isHttp3Available() { + return isHttp3Available; + } + +} diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/http3/SecureQuicTokenHandler.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/http3/SecureQuicTokenHandler.java new file mode 100644 index 0000000000000..8b666fd76ee88 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/http3/SecureQuicTokenHandler.java @@ -0,0 +1,111 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.netty4.http3; + +import org.opensearch.common.Randomness; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; + +import java.net.InetSocketAddress; +import java.security.InvalidKeyException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.quic.Quic; +import io.netty.handler.codec.quic.QuicTokenHandler; +import io.netty.util.CharsetUtil; +import io.netty.util.NetUtil; + +/** + * Secure {@link QuicTokenHandler} which uses HMAC_SHA256. + */ +public class SecureQuicTokenHandler implements QuicTokenHandler { + private static final int HMAC_KEY_LEN = 32; + private static final int HMAC_TAG_LEN = 32; + private static final String HMAC_SHA_256 = "HmacSHA256"; + + private static final String SERVER_NAME = "opensearch-netty"; + private static final byte[] SERVER_NAME_BYTES = SERVER_NAME.getBytes(CharsetUtil.US_ASCII); + private static final ByteBuf SERVER_NAME_BUFFER = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(SERVER_NAME_BYTES)).asReadOnly(); + + private static final int MAX_TOKEN_LEN = HMAC_TAG_LEN + Quic.MAX_CONN_ID_LEN + NetUtil.LOCALHOST6.getAddress().length + + SERVER_NAME_BYTES.length; + + private final byte[] key; + + public SecureQuicTokenHandler() { + this.key = new byte[HMAC_KEY_LEN]; + Randomness.createSecure().nextBytes(key); + } + + @Override + public boolean writeToken(ByteBuf out, ByteBuf dcid, InetSocketAddress address) { + final byte[] addr = address.getAddress().getAddress(); + final byte[] buffer = new byte[HMAC_TAG_LEN + addr.length + dcid.readableBytes()]; + + System.arraycopy(addr, 0, buffer, HMAC_TAG_LEN, addr.length); + dcid.getBytes(dcid.readerIndex(), buffer, HMAC_TAG_LEN + addr.length, dcid.readableBytes()); + + try { + final Mac mac = Mac.getInstance(HMAC_SHA_256); + mac.init(new SecretKeySpec(key, HMAC_SHA_256)); + mac.update(buffer, HMAC_TAG_LEN, addr.length + dcid.readableBytes()); + System.arraycopy(mac.doFinal(), 0, buffer, 0, HMAC_TAG_LEN); + } catch (final InvalidKeyException | NoSuchAlgorithmException ex) { + return false; + } + + out.writeBytes(SERVER_NAME_BYTES).writeBytes(buffer); + return true; + } + + @Override + public int validateToken(ByteBuf token, InetSocketAddress address) { + final byte[] addr = address.getAddress().getAddress(); + final int minLength = SERVER_NAME_BYTES.length + HMAC_TAG_LEN + addr.length; + if (token.readableBytes() <= minLength) { + return -1; + } + + if (!SERVER_NAME_BUFFER.equals(token.slice(0, SERVER_NAME_BYTES.length))) { + return -1; + } + + final ByteBuf tag = token.slice(SERVER_NAME_BYTES.length, HMAC_TAG_LEN); + final int length = token.readableBytes() - HMAC_TAG_LEN - SERVER_NAME_BYTES.length; + final ByteBuf payload = token.slice(SERVER_NAME_BYTES.length + HMAC_TAG_LEN, length); + try { + final Mac mac = Mac.getInstance(HMAC_SHA_256); + mac.init(new SecretKeySpec(key, HMAC_SHA_256)); + for (int i = 0; i < payload.readableBytes(); ++i) { + mac.update(payload.getByte(payload.readerIndex() + i)); + } + + final byte[] actual = new byte[tag.readableBytes()]; + tag.getBytes(tag.readerIndex(), actual, 0, tag.readableBytes()); + + final byte[] expected = mac.doFinal(); + if (!MessageDigest.isEqual(actual, expected)) { + return -1; + } + } catch (final InvalidKeyException | NoSuchAlgorithmException ex) { + return -1; + } + + return minLength; + } + + @Override + public int maxTokenLength() { + return MAX_TOKEN_LEN; + } +} diff --git a/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java b/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java index e2c84ab5d339a..b994cc086b1f1 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java @@ -45,7 +45,10 @@ import org.opensearch.core.indices.breaker.CircuitBreakerService; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.http.HttpServerTransport; +import org.opensearch.http.netty4.Netty4CompositeHttpServerTransport; +import org.opensearch.http.netty4.Netty4Http3ServerTransport; import org.opensearch.http.netty4.Netty4HttpServerTransport; +import org.opensearch.http.netty4.http3.Http3Utils; import org.opensearch.http.netty4.ssl.SecureNetty4HttpServerTransport; import org.opensearch.plugins.NetworkPlugin; import org.opensearch.plugins.Plugin; @@ -62,6 +65,8 @@ import java.util.Map; import java.util.function.Supplier; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_HTTP3_ENABLED; + public class Netty4ModulePlugin extends Plugin implements NetworkPlugin { public static final String NETTY_TRANSPORT_NAME = "netty4"; @@ -164,21 +169,53 @@ public Map> getSecureHttpTransports( SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider, Tracer tracer ) { - return Collections.singletonMap( - NETTY_SECURE_HTTP_TRANSPORT_NAME, - () -> new SecureNetty4HttpServerTransport( - settings, - networkService, - bigArrays, - threadPool, - xContentRegistry, - dispatcher, - clusterSettings, - getSharedGroupFactory(settings), - secureHttpTransportSettingsProvider, - tracer - ) - ); + if (Http3Utils.isHttp3Available() == true && SETTING_HTTP_HTTP3_ENABLED.get(settings).booleanValue() == true) { + return Collections.singletonMap( + NETTY_SECURE_HTTP_TRANSPORT_NAME, + () -> new Netty4CompositeHttpServerTransport( + new SecureNetty4HttpServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry, + dispatcher, + clusterSettings, + getSharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + tracer + ), + new Netty4Http3ServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry, + dispatcher, + clusterSettings, + getSharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + tracer + ) + ) + ); + } else { + return Collections.singletonMap( + NETTY_SECURE_HTTP_TRANSPORT_NAME, + () -> new SecureNetty4HttpServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry, + dispatcher, + clusterSettings, + getSharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + tracer + ) + ); + } } @Override diff --git a/modules/transport-netty4/src/main/java/org/opensearch/transport/NettyAllocator.java b/modules/transport-netty4/src/main/java/org/opensearch/transport/NettyAllocator.java index c291680f4fd5e..878dba01e19af 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/transport/NettyAllocator.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/transport/NettyAllocator.java @@ -155,6 +155,14 @@ public static void logAllocatorDescriptionIfNeeded() { } } + public static ByteBufAllocator getAllocator(boolean directBuffers) { + if (directBuffers == true && ALLOCATOR instanceof NoDirectBuffers ndb) { + return ndb.delegate; /* Http3/Quic only supports direct buffers */ + } else { + return ALLOCATOR; + } + } + public static ByteBufAllocator getAllocator() { return ALLOCATOR; } diff --git a/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/ssl/SslUtils.java b/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/ssl/SslUtils.java index 8b8223da70c08..7d260821c3fd4 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/ssl/SslUtils.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/ssl/SslUtils.java @@ -24,7 +24,7 @@ * @see TLSUtil */ public class SslUtils { - private static final String[] DEFAULT_SSL_PROTOCOLS = { "TLSv1.3", "TLSv1.2", "TLSv1.1" }; + public static final String[] DEFAULT_SSL_PROTOCOLS = { "TLSv1.3", "TLSv1.2", "TLSv1.1" }; private static final int SSL_CONTENT_TYPE_CHANGE_CIPHER_SPEC = 20; private static final int SSL_CONTENT_TYPE_ALERT = 21; diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4Http3ServerTransportTests.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4Http3ServerTransportTests.java new file mode 100644 index 0000000000000..67d83a657a351 --- /dev/null +++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4Http3ServerTransportTests.java @@ -0,0 +1,493 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.netty4; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.network.NetworkAddress; +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.MockBigArrays; +import org.opensearch.common.util.MockPageCacheRecycler; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.http.BindHttpException; +import org.opensearch.http.CorsHandler; +import org.opensearch.http.HttpServerTransport; +import org.opensearch.http.HttpTransportSettings; +import org.opensearch.http.NullDispatcher; +import org.opensearch.http.netty4.http3.Http3Utils; +import org.opensearch.plugins.SecureHttpTransportSettingsProvider; +import org.opensearch.plugins.TransportExceptionHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.telemetry.tracing.noop.NoopTracer; +import org.opensearch.test.KeyStoreUtils; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.NettyAllocator; +import org.opensearch.transport.SharedGroupFactory; +import org.opensearch.transport.netty4.ssl.SslUtils; +import org.junit.After; +import org.junit.Before; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLException; +import javax.net.ssl.TrustManagerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http2.Http2SecurityUtil; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.pkitesting.CertificateBuilder.Algorithm; + +import static org.opensearch.core.rest.RestStatus.BAD_REQUEST; +import static org.opensearch.core.rest.RestStatus.OK; +import static org.opensearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN; +import static org.opensearch.http.HttpTransportSettings.SETTING_CORS_ENABLED; +import static org.opensearch.http.TestDispatcherBuilder.dispatcherBuilderWithDefaults; +import static org.opensearch.test.KeyStoreUtils.KEYSTORE_PASSWORD; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.junit.Assume.assumeThat; + +/** + * Tests for the {@link Netty4Http3ServerTransport} class. + */ +public class Netty4Http3ServerTransportTests extends OpenSearchTestCase { + + private NetworkService networkService; + private ThreadPool threadPool; + private MockBigArrays bigArrays; + private ClusterSettings clusterSettings; + private SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider; + + @Before + public void setup() throws Exception { + networkService = new NetworkService(Collections.emptyList()); + threadPool = new TestThreadPool("test"); + bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("PKIX"); + keyManagerFactory.init(KeyStoreUtils.createServerKeyStore(Algorithm.ecp384), KEYSTORE_PASSWORD); + + secureHttpTransportSettingsProvider = new SecureHttpTransportSettingsProvider() { + @Override + public Optional parameters(Settings settings) { + return Optional.of(new SecureHttpTransportParameters() { + @Override + public Optional keyManagerFactory() { + return Optional.of(keyManagerFactory); + } + + @Override + public Optional sslProvider() { + return Optional.empty(); + } + + @Override + public Optional clientAuth() { + return Optional.empty(); + } + + @Override + public Collection protocols() { + return Arrays.asList(SslUtils.DEFAULT_SSL_PROTOCOLS); + } + + @Override + public Collection cipherSuites() { + return Http2SecurityUtil.CIPHERS; + } + + @Override + public Optional trustManagerFactory() { + return Optional.of(InsecureTrustManagerFactory.INSTANCE); + } + }); + } + + @Override + public Optional buildHttpServerExceptionHandler(Settings settings, HttpServerTransport transport) { + return Optional.empty(); + } + + @Override + public Optional buildSecureHttpServerEngine(Settings settings, HttpServerTransport transport) throws SSLException { + final SSLEngine engine = SslContextBuilder.forServer(keyManagerFactory) + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build() + .newEngine(NettyAllocator.getAllocator()); + return Optional.of(engine); + } + }; + } + + @After + public void shutdown() throws Exception { + if (threadPool != null) { + threadPool.shutdownNow(); + } + threadPool = null; + networkService = null; + bigArrays = null; + clusterSettings = null; + } + + /** + * Test that {@link Netty4Http3ServerTransportTests} supports the "Expect: 100-continue" HTTP header + * @throws InterruptedException if the client communication with the server is interrupted + */ + public void testExpectContinueHeader() throws InterruptedException { + assumeThat("HTTP/3 is not available on this arch/platform", Http3Utils.isHttp3Available(), is(true)); + + final Settings settings = createSettings(); + final int contentLength = randomIntBetween(1, HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings).bytesAsInt()); + runExpectHeaderTest(settings, HttpHeaderValues.CONTINUE.toString(), contentLength, HttpResponseStatus.CONTINUE); + } + + /** + * Test that {@link Netty4Http3ServerTransportTests} responds to a + * 100-continue expectation with too large a content-length + * with a 413 status. + * @throws InterruptedException if the client communication with the server is interrupted + */ + public void testExpectContinueHeaderContentLengthTooLong() throws InterruptedException { + assumeThat("HTTP/3 is not available on this arch/platform", Http3Utils.isHttp3Available(), is(true)); + + final String key = HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(); + final int maxContentLength = randomIntBetween(1, 104857600); + final Settings settings = createBuilderWithPort().put(key, maxContentLength + "b").build(); + final int contentLength = randomIntBetween(maxContentLength + 1, Integer.MAX_VALUE); + runExpectHeaderTest(settings, HttpHeaderValues.CONTINUE.toString(), contentLength, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE); + } + + /** + * Test that {@link Netty4Http3ServerTransportTests} responds to an unsupported expectation with a 417 status. + * @throws InterruptedException if the client communication with the server is interrupted + */ + public void testExpectUnsupportedExpectation() throws InterruptedException { + assumeThat("HTTP/3 is not available on this arch/platform", Http3Utils.isHttp3Available(), is(true)); + + final Settings settings = createSettings(); + runExpectHeaderTest(settings, "chocolate=yummy", 0, HttpResponseStatus.EXPECTATION_FAILED); + } + + private void runExpectHeaderTest( + final Settings settings, + final String expectation, + final int contentLength, + final HttpResponseStatus expectedStatus + ) throws InterruptedException { + assumeThat("HTTP/3 is not available on this arch/platform", Http3Utils.isHttp3Available(), is(true)); + + final HttpServerTransport.Dispatcher dispatcher = dispatcherBuilderWithDefaults().withDispatchRequest( + (request, channel, threadContext) -> channel.sendResponse( + new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done")) + ) + ).build(); + try ( + Netty4Http3ServerTransport transport = new Netty4Http3ServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + try (Netty4HttpClient client = Netty4HttpClient.http3()) { + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/"); + request.headers().set(HttpHeaderNames.EXPECT, expectation); + HttpUtil.setContentLength(request, contentLength); + + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(expectedStatus)); + if (expectedStatus.equals(HttpResponseStatus.CONTINUE)) { + final FullHttpRequest continuationRequest = new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, + HttpMethod.POST, + "/", + Unpooled.EMPTY_BUFFER + ); + final FullHttpResponse continuationResponse = client.send(remoteAddress.address(), continuationRequest); + try { + assertThat(continuationResponse.status(), is(HttpResponseStatus.OK)); + assertThat( + new String(ByteBufUtil.getBytes(continuationResponse.content()), StandardCharsets.UTF_8), + is("done") + ); + } finally { + continuationResponse.release(); + } + } + } finally { + response.release(); + } + } + } + } + + public void testBindUnavailableAddress() { + assumeThat("HTTP/3 is not available on this arch/platform", Http3Utils.isHttp3Available(), is(true)); + + final Settings initialSettings = createSettings(); + try ( + Netty4Http3ServerTransport transport = new Netty4Http3ServerTransport( + initialSettings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + new NullDispatcher(), + clusterSettings, + new SharedGroupFactory(initialSettings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + Settings settings = Settings.builder() + .put("http.port", remoteAddress.getPort()) + .put("network.host", remoteAddress.getAddress()) + .build(); + try ( + Netty4Http3ServerTransport otherTransport = new Netty4Http3ServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + new NullDispatcher(), + clusterSettings, + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + BindHttpException bindHttpException = expectThrows(BindHttpException.class, otherTransport::start); + assertEquals("Failed to bind to " + NetworkAddress.format(remoteAddress.address()), bindHttpException.getMessage()); + } + } + } + + public void testBadRequest() throws InterruptedException { + assumeThat("HTTP/3 is not available on this arch/platform", Http3Utils.isHttp3Available(), is(true)); + + final AtomicReference causeReference = new AtomicReference<>(); + final HttpServerTransport.Dispatcher dispatcher = dispatcherBuilderWithDefaults().withDispatchBadRequest( + (channel, threadContext, cause) -> { + causeReference.set(cause); + try { + final OpenSearchException e = new OpenSearchException("you sent a bad request and you should feel bad"); + channel.sendResponse(new BytesRestResponse(channel, BAD_REQUEST, e)); + } catch (final IOException e) { + throw new AssertionError(e); + } + } + ).build(); + + final Settings settings; + final int maxInitialLineLength; + final Setting httpMaxInitialLineLengthSetting = HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH; + if (randomBoolean()) { + maxInitialLineLength = httpMaxInitialLineLengthSetting.getDefault(Settings.EMPTY).bytesAsInt(); + settings = createSettings(); + } else { + maxInitialLineLength = randomIntBetween(1, 8192); + settings = createBuilderWithPort().put(httpMaxInitialLineLengthSetting.getKey(), maxInitialLineLength + "b").build(); + } + + try ( + Netty4Http3ServerTransport transport = new Netty4Http3ServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + try (Netty4HttpClient client = Netty4HttpClient.http3()) { + final String url = "/" + randomAlphaOfLength(maxInitialLineLength); + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); + + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.BAD_REQUEST)); + assertThat( + new String(response.content().array(), StandardCharsets.UTF_8), + containsString("you sent a bad request and you should feel bad") + ); + } finally { + response.release(); + } + } + } + + assertNotNull(causeReference.get()); + assertThat(causeReference.get(), instanceOf(TooLongFrameException.class)); + } + + public void testLargeCompressedResponse() throws InterruptedException { + assumeThat("HTTP/3 is not available on this arch/platform", Http3Utils.isHttp3Available(), is(true)); + + final Settings settings = createSettings(); + final String responseString = randomAlphaOfLength(4 * 1024 * 1024); + final String url = "/thing"; + final HttpServerTransport.Dispatcher dispatcher = dispatcherBuilderWithDefaults().withDispatchRequest( + (request, channel, threadContext) -> { + if (url.equals(request.uri())) { + channel.sendResponse(new BytesRestResponse(OK, responseString)); + } else { + logger.error("--> Unexpected successful uri [{}]", request.uri()); + throw new AssertionError(); + } + } + ).build(); + + try ( + Netty4Http3ServerTransport transport = new Netty4Http3ServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + try (Netty4HttpClient client = Netty4HttpClient.http3()) { + DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); + request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, randomFrom("deflate", "gzip")); + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.OK)); + byte[] bytes = new byte[response.content().readableBytes()]; + response.content().readBytes(bytes); + assertThat(new String(bytes, StandardCharsets.UTF_8), equalTo(responseString)); + } finally { + response.release(); + } + } + } + } + + public void testCorsRequest() throws InterruptedException { + assumeThat("HTTP/3 is not available on this arch/platform", Http3Utils.isHttp3Available(), is(true)); + + final HttpServerTransport.Dispatcher dispatcher = dispatcherBuilderWithDefaults().build(); + final Settings settings = createBuilderWithPort().put(SETTING_CORS_ENABLED.getKey(), true) + .put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "test-cors.org") + .build(); + + try ( + Netty4Http3ServerTransport transport = new Netty4Http3ServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + // Test pre-flight request + try (Netty4HttpClient client = Netty4HttpClient.http3()) { + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "/"); + request.headers().add(CorsHandler.ORIGIN, "test-cors.org"); + request.headers().add(CorsHandler.ACCESS_CONTROL_REQUEST_METHOD, "POST"); + + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.OK)); + assertThat(response.headers().get(CorsHandler.ACCESS_CONTROL_ALLOW_ORIGIN), equalTo("test-cors.org")); + assertThat(response.headers().get(CorsHandler.VARY), equalTo(CorsHandler.ORIGIN)); + assertTrue(response.headers().contains(CorsHandler.DATE)); + } finally { + response.release(); + } + } + + // Test short-circuited request + try (Netty4HttpClient client = Netty4HttpClient.http3()) { + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + request.headers().add(CorsHandler.ORIGIN, "google.com"); + + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.FORBIDDEN)); + } finally { + response.release(); + } + } + } + } + + private Settings createSettings() { + return createBuilderWithPort().build(); + } + + private Settings.Builder createBuilderWithPort() { + return Settings.builder().put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), getPortRange()); + } +} diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java index cf841f2e24b1e..3215a202227af 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java @@ -34,6 +34,7 @@ import org.opensearch.common.TriFunction; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.tasks.Task; @@ -52,7 +53,9 @@ import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -61,7 +64,9 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; @@ -86,12 +91,24 @@ import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler; import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder; import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapterBuilder; +import io.netty.handler.codec.http3.Http3; +import io.netty.handler.codec.http3.Http3ClientConnectionHandler; +import io.netty.handler.codec.http3.Http3FrameToHttpObjectCodec; +import io.netty.handler.codec.http3.Http3RequestStreamInitializer; +import io.netty.handler.codec.quic.QuicChannel; +import io.netty.handler.codec.quic.QuicSslContext; +import io.netty.handler.codec.quic.QuicSslContextBuilder; +import io.netty.handler.codec.quic.QuicStreamChannel; import io.netty.handler.ssl.ClientAuth; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.util.AttributeKey; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; +import static org.opensearch.http.netty4.Netty4Http3ServerTransport.SETTING_H3_MAX_STREAMS; +import static org.opensearch.http.netty4.Netty4Http3ServerTransport.SETTING_H3_MAX_STREAM_LOCAL_LENGTH; +import static org.opensearch.http.netty4.Netty4Http3ServerTransport.SETTING_H3_MAX_STREAM_REMOTE_LENGTH; import static io.netty.handler.codec.http.HttpHeaderNames.HOST; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; import static org.junit.Assert.fail; @@ -118,12 +135,12 @@ static Collection returnOpaqueIds(Collection responses } private final Bootstrap clientBootstrap; - private final TriFunction, Boolean, AwaitableChannelInitializer> handlerFactory; + private final TriFunction, Boolean, AwaitableChannelInitializer> handlerFactory; private final boolean secure; Netty4HttpClient( Bootstrap clientBootstrap, - TriFunction, Boolean, AwaitableChannelInitializer> handlerFactory, + TriFunction, Boolean, AwaitableChannelInitializer> handlerFactory, boolean secure ) { this.clientBootstrap = clientBootstrap; @@ -161,13 +178,23 @@ public static Netty4HttpClient http2() { ); } + public static Netty4HttpClient http3() { + return new Netty4HttpClient( + new Bootstrap().channel(NioDatagramChannel.class) + .option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator(true)) + .group(new NioEventLoopGroup(1)), + CountDownLatchHandlerHttp3::new, + true + ); + } + public List get(SocketAddress remoteAddress, String... uris) throws InterruptedException { List requests = new ArrayList<>(uris.length); for (int i = 0; i < uris.length; i++) { final HttpRequest httpRequest = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]); httpRequest.headers().add(HOST, "localhost"); httpRequest.headers().add("X-Opaque-ID", String.valueOf(i)); - httpRequest.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), secure ? "http" : "https"); + httpRequest.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), secure ? "https" : "http"); requests.add(httpRequest); } return sendRequests(remoteAddress, requests); @@ -179,6 +206,8 @@ public final Collection post(SocketAddress remoteAddress, List } public final FullHttpResponse send(SocketAddress remoteAddress, FullHttpRequest httpRequest) throws InterruptedException { + httpRequest.headers().add(HOST, "localhost"); + httpRequest.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), secure ? "https" : "http"); List responses = sendRequests(remoteAddress, Collections.singleton(httpRequest)); assert responses.size() == 1 : "expected 1 and only 1 http response"; return responses.get(0); @@ -202,7 +231,7 @@ private List processRequestsWithBody( request.headers().add(HttpHeaderNames.HOST, "localhost"); request.headers().add(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); request.headers().add(HttpHeaderNames.CONTENT_TYPE, "application/json"); - request.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), "http"); + request.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), secure ? "https" : "http"); request.headers().add("X-Opaque-ID", String.valueOf(i)); requests.add(request); } @@ -214,20 +243,29 @@ private synchronized List sendRequests(final SocketAddress rem final CountDownLatch latch = new CountDownLatch(requests.size()); final List content = Collections.synchronizedList(new ArrayList<>(requests.size())); - final AwaitableChannelInitializer handler = handlerFactory.apply(latch, content, secure); + final AwaitableChannelInitializer handler = handlerFactory.apply(latch, content, secure); clientBootstrap.handler(handler); ChannelFuture channelFuture = null; try { + if (handler.supportManyRequests() == false && requests.size() > 1) { + throw new IllegalStateException("The handler supports only one request / response mode"); + } + channelFuture = clientBootstrap.connect(remoteAddress); channelFuture.sync(); handler.await(); - for (HttpRequest request : requests) { - channelFuture.channel().writeAndFlush(request); - } - if (latch.await(30L, TimeUnit.SECONDS) == false) { - fail("Failed to get all expected responses."); + final Channel channel = handler.prepare(clientBootstrap, channelFuture.channel()); + try { + for (HttpRequest request : requests) { + channel.writeAndFlush(request); + } + if (latch.await(30L, TimeUnit.SECONDS) == false) { + fail("Failed to get all expected responses."); + } + } finally { + channel.close().awaitUninterruptibly(); } } finally { @@ -247,7 +285,7 @@ public void close() { /** * helper factory which adds returned data to a list and uses a count down latch to decide when done */ - private static class CountDownLatchHandlerHttp extends AwaitableChannelInitializer { + private static class CountDownLatchHandlerHttp extends AwaitableChannelInitializer { private final CountDownLatch latch; private final Collection content; @@ -302,16 +340,24 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E * The channel initializer with the ability to await for initialization to be completed * */ - private static abstract class AwaitableChannelInitializer extends ChannelInitializer { + private static abstract class AwaitableChannelInitializer extends ChannelInitializer { void await() { // do nothing } + + Channel prepare(Bootstrap clientBootstrap, Channel channel) throws InterruptedException { + return channel; + } + + boolean supportManyRequests() { + return true; + } } /** * helper factory which adds returned data to a list and uses a count down latch to decide when done */ - private static class CountDownLatchHandlerHttp2 extends AwaitableChannelInitializer { + private static class CountDownLatchHandlerHttp2 extends AwaitableChannelInitializer { private final CountDownLatch latch; private final Collection content; @@ -439,4 +485,78 @@ protected void channelRead0(ChannelHandlerContext ctx, Http2Settings msg) throws } } + /** + * helper factory which adds returned data to a list and uses a count down latch to decide when done + */ + private static class CountDownLatchHandlerHttp3 extends AwaitableChannelInitializer { + private final CountDownLatch latch; + private final Collection content; + + CountDownLatchHandlerHttp3(final CountDownLatch latch, final Collection content, final boolean secure) { + this.latch = latch; + this.content = content; + } + + @Override + protected void initChannel(DatagramChannel ch) { + final QuicSslContext context = QuicSslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .applicationProtocols(Http3.supportedApplicationProtocols()) + .build(); + + final ChannelHandler quicClientCodec = Http3.newQuicClientCodecBuilder() + .sslContext(context) + .initialMaxData(SETTING_HTTP_MAX_CONTENT_LENGTH.get(Settings.EMPTY).getBytes()) + .initialMaxStreamDataBidirectionalLocal(SETTING_H3_MAX_STREAM_LOCAL_LENGTH.get(Settings.EMPTY).getBytes()) + .initialMaxStreamDataBidirectionalRemote(SETTING_H3_MAX_STREAM_REMOTE_LENGTH.get(Settings.EMPTY).getBytes()) + .initialMaxStreamsBidirectional(SETTING_H3_MAX_STREAMS.get(Settings.EMPTY).longValue()) + .build(); + + ch.pipeline().addLast(quicClientCodec); + } + + @Override + Channel prepare(Bootstrap clientBootstrap, Channel channel) throws InterruptedException { + final QuicChannel quicChannel = QuicChannel.newBootstrap(channel) + .handler(new Http3ClientConnectionHandler()) + .remoteAddress(channel.remoteAddress()) + .connect() + .sync() + .getNow(); + quicChannel.closeFuture().addListener(f -> channel.close()); + + return Http3.newRequestStream(quicChannel, new Http3RequestStreamInitializer() { + @Override + protected void initRequestStream(QuicStreamChannel ch) { + final int maxContentLength = new ByteSizeValue(100, ByteSizeUnit.MB).bytesAsInt(); + ch.pipeline().addLast(new Http3FrameToHttpObjectCodec(false)); + ch.pipeline().addLast(new HttpContentDecompressor()); + ch.pipeline().addLast(new HttpObjectAggregator(maxContentLength)); + ch.pipeline().addLast(new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) { + if (msg instanceof FullHttpResponse ht) { + // We copy the buffer manually to avoid a huge allocation on a pooled allocator. We have + // a test that tracks huge allocations, so we want to avoid them in this test code. + ByteBuf newContent = Unpooled.copiedBuffer(ht.content()); + content.add(ht.replace(newContent)); + latch.countDown(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + latch.countDown(); + } + }); + } + }).sync().getNow(); + } + + @Override + boolean supportManyRequests() { + return false; + } + } } diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpServerTransportTests.java index 3869118a0da55..97b40b1fc8fd2 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpServerTransportTests.java @@ -326,6 +326,7 @@ public void testBadRequest() throws InterruptedException { } public void testLargeCompressedResponse() throws InterruptedException { + final Settings settings = createSettings(); final String responseString = randomAlphaOfLength(4 * 1024 * 1024); final String url = "/thing"; final HttpServerTransport.Dispatcher dispatcher = dispatcherBuilderWithDefaults().withDispatchRequest( @@ -340,14 +341,14 @@ public void testLargeCompressedResponse() throws InterruptedException { ).build(); try ( Netty4HttpServerTransport transport = new Netty4HttpServerTransport( - Settings.EMPTY, + settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher, clusterSettings, - new SharedGroupFactory(Settings.EMPTY), + new SharedGroupFactory(settings), NoopTracer.INSTANCE ) ) { diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransportTests.java index dd71d8e75b486..44f47ea9102c6 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransportTests.java @@ -374,6 +374,7 @@ public void testBadRequest() throws InterruptedException { } public void testLargeCompressedResponse() throws InterruptedException { + final Settings settings = createSettings(); final String responseString = randomAlphaOfLength(4 * 1024 * 1024); final String url = "/thing"; final HttpServerTransport.Dispatcher dispatcher = dispatcherBuilderWithDefaults().withDispatchRequest( @@ -388,14 +389,14 @@ public void testLargeCompressedResponse() throws InterruptedException { ).build(); try ( SecureNetty4HttpServerTransport transport = new SecureNetty4HttpServerTransport( - Settings.EMPTY, + settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher, clusterSettings, - new SharedGroupFactory(Settings.EMPTY), + new SharedGroupFactory(settings), secureHttpTransportSettingsProvider, NoopTracer.INSTANCE ) diff --git a/plugins/repository-azure/build.gradle b/plugins/repository-azure/build.gradle index dd5a5cf89a3fc..00f84af68e815 100644 --- a/plugins/repository-azure/build.gradle +++ b/plugins/repository-azure/build.gradle @@ -146,24 +146,8 @@ thirdPartyAudit { 'io.netty.channel.uring.IoUringIoHandler', 'io.netty.channel.uring.IoUringServerSocketChannel', 'io.netty.channel.uring.IoUringSocketChannel', - 'io.netty.handler.codec.quic.Quic', - 'io.netty.handler.codec.quic.QuicChannel', - 'io.netty.handler.codec.quic.QuicChannelBootstrap', - 'io.netty.handler.codec.quic.QuicClientCodecBuilder', - 'io.netty.handler.codec.quic.QuicServerCodecBuilder', - 'io.netty.handler.codec.quic.QuicSslContext', - 'io.netty.handler.codec.quic.QuicSslContextBuilder', - 'io.netty.handler.codec.quic.QuicSslEngine', - 'io.netty.handler.codec.quic.QuicStreamChannel', - 'io.netty.handler.codec.quic.QuicStreamChannelBootstrap', - 'io.netty.handler.codec.quic.QuicTokenHandler', 'io.netty.handler.codec.haproxy.HAProxyMessage', 'io.netty.handler.codec.haproxy.HAProxyMessageDecoder', - 'io.netty.handler.codec.http3.Http3', - 'io.netty.handler.codec.http3.Http3ClientConnectionHandler', - 'io.netty.handler.codec.http3.Http3DataFrame', - 'io.netty.handler.codec.http3.Http3Headers', - 'io.netty.handler.codec.http3.Http3HeadersFrame', 'javax.activation.DataHandler', 'javax.activation.DataSource', 'javax.xml.bind.JAXBElement', diff --git a/plugins/transport-reactor-netty4/build.gradle b/plugins/transport-reactor-netty4/build.gradle index b2ae6acdfd946..ceb9faee43e1c 100644 --- a/plugins/transport-reactor-netty4/build.gradle +++ b/plugins/transport-reactor-netty4/build.gradle @@ -36,6 +36,14 @@ dependencies { api libs.bundles.reactornetty api libs.netty.codec.http3 api libs.netty.codec.classes.quic + api libs.netty.codec.native.quic + + // Bundle all supported OSes and Archs + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:linux-x86_64" + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:linux-aarch_64" + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:osx-x86_64" + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:osx-aarch_64" + runtimeOnly "io.netty:netty-codec-native-quic:${versions.netty}:windows-x86_64" testImplementation libs.log4jslf4jimpl javaRestTestImplementation libs.reactor.test diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final-linux-aarch_64.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final-linux-aarch_64.jar.sha1 new file mode 100644 index 0000000000000..1104459d16a35 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final-linux-aarch_64.jar.sha1 @@ -0,0 +1 @@ +87ce82aa487b3701a83312b748d2d311b58bbe68 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final-linux-x86_64.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final-linux-x86_64.jar.sha1 new file mode 100644 index 0000000000000..792d3a0e7424d --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final-linux-x86_64.jar.sha1 @@ -0,0 +1 @@ +3f0ab9d43f00c8a02ba22702aff482f598863642 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final-osx-aarch_64.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final-osx-aarch_64.jar.sha1 new file mode 100644 index 0000000000000..ecf9df0b6d865 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final-osx-aarch_64.jar.sha1 @@ -0,0 +1 @@ +7725db063980a53420afbced5fb39dbd0eebbd8a \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final-osx-x86_64.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final-osx-x86_64.jar.sha1 new file mode 100644 index 0000000000000..c626d90d146fb --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final-osx-x86_64.jar.sha1 @@ -0,0 +1 @@ +53eee547f8731254084274d31b82171c51d509fa \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final-windows-x86_64.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final-windows-x86_64.jar.sha1 new file mode 100644 index 0000000000000..dd7184e852909 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final-windows-x86_64.jar.sha1 @@ -0,0 +1 @@ +6873dd577760b80b345fa5ab7dfe589ed9c1bbc3 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final.jar.sha1 b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final.jar.sha1 new file mode 100644 index 0000000000000..dac0db01661c6 --- /dev/null +++ b/plugins/transport-reactor-netty4/licenses/netty-codec-native-quic-4.2.9.Final.jar.sha1 @@ -0,0 +1 @@ +97fda97b8df8f3e8fefa50336e108756b76f8b77 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequestSizeLimitIT.java b/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequestSizeLimitIT.java index 833d60375a2bd..5485da8581e35 100644 --- a/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequestSizeLimitIT.java +++ b/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequestSizeLimitIT.java @@ -101,7 +101,7 @@ public void testLimitsInFlightRequests() throws Exception { HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()); - try (ReactorHttpClient nettyHttpClient = ReactorHttpClient.create(false)) { + try (ReactorHttpClient nettyHttpClient = ReactorHttpClient.create(false, Settings.EMPTY)) { final Collection singleResponse = nettyHttpClient.post(transportAddress.address(), requests.subList(0, 1)); try { assertThat(singleResponse, hasSize(1)); @@ -131,7 +131,7 @@ public void testDoesNotLimitExcludedRequests() throws Exception { HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()); - try (ReactorHttpClient nettyHttpClient = ReactorHttpClient.create(false)) { + try (ReactorHttpClient nettyHttpClient = ReactorHttpClient.create(false, Settings.EMPTY)) { final Collection responses = nettyHttpClient.put(transportAddress.address(), requestUris); try { assertThat(responses, hasSize(requestUris.size())); diff --git a/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4PipeliningIT.java b/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4PipeliningIT.java index c0e43de06f6ff..46c30557b3c5a 100644 --- a/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4PipeliningIT.java +++ b/plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4PipeliningIT.java @@ -14,6 +14,7 @@ package org.opensearch.http.reactor.netty4; import org.opensearch.OpenSearchReactorNetty4IntegTestCase; +import org.opensearch.common.settings.Settings; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.http.HttpServerTransport; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; @@ -43,7 +44,7 @@ public void testThatNettyHttpServerSupportsPipelining() throws Exception { TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses(); TransportAddress transportAddress = randomFrom(boundAddresses); - try (ReactorHttpClient client = ReactorHttpClient.create()) { + try (ReactorHttpClient client = ReactorHttpClient.create(Settings.EMPTY)) { Collection responses = client.get(transportAddress.address(), true, requests); try { assertThat(responses, hasSize(5)); diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4CompositeHttpServerChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4CompositeHttpServerChannel.java new file mode 100644 index 0000000000000..9e77f703c63d5 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4CompositeHttpServerChannel.java @@ -0,0 +1,94 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.common.concurrent.CompletableContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.http.HttpServerChannel; +import org.opensearch.transport.reactor.netty4.Netty4Utils; + +import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; + +import io.netty.channel.Channel; + +class ReactorNetty4CompositeHttpServerChannel implements HttpServerChannel { + private final Channel[] channels; + private final CompletableContext[] closeContexts; + + @SuppressWarnings({ "unchecked", "rawtypes" }) + ReactorNetty4CompositeHttpServerChannel(Channel... channels) { + if (channels == null || channels.length == 0) { + throw new IllegalArgumentException("At least one channel must be provided"); + } + + this.channels = channels; + this.closeContexts = new CompletableContext[channels.length]; + for (int i = 0; i < channels.length; ++i) { + closeContexts[i] = new CompletableContext<>(); + Netty4Utils.addListener(this.channels[i].closeFuture(), closeContexts[i]); + } + } + + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) channels[0].localAddress(); + } + + @Override + public void addCloseListener(ActionListener listener) { + @SuppressWarnings({ "unchecked", "rawtypes" }) + final CompletableFuture[] futures = new CompletableFuture[closeContexts.length]; + for (int i = 0; i < closeContexts.length; ++i) { + final CompletableFuture future = new CompletableFuture<>(); + closeContexts[i].addListener((v, t) -> { + if (t == null) { + future.complete(v); + } else { + future.completeExceptionally(t); + } + }); + futures[i] = future; + } + + // Wait for all contexts to be closed + CompletableFuture.allOf(futures).whenComplete((v, t) -> { + if (t == null) { + listener.onResponse(v); + } else if (t instanceof Exception ex) { + listener.onFailure(ex); + } else { + listener.onFailure(new UndeclaredThrowableException(t)); + } + }); + } + + @Override + public boolean isOpen() { + for (int i = 0; i < channels.length; ++i) { + if (channels[i].isOpen() == false) { + return false; + } + } + return true; + } + + @Override + public void close() { + for (int i = 0; i < channels.length; ++i) { + channels[i].close(); + } + } + + @Override + public String toString() { + return "ReactorNetty4CompositeHttpServerChannel{localAddress=" + getLocalAddress() + "}"; + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequest.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequest.java index 10f06ad91b78d..bf1bcd63b7138 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequest.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpRequest.java @@ -158,6 +158,8 @@ public HttpVersion protocolVersion() { return HttpRequest.HttpVersion.HTTP_1_1; } else if (protocol.equals("HTTP/2.0")) { return HttpRequest.HttpVersion.HTTP_2_0; + } else if (protocol.equals("HTTP/3.0")) { + return HttpRequest.HttpVersion.HTTP_3_0; } else { throw new IllegalArgumentException("Unexpected http protocol version: " + protocol); } diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java index 78e5edf48cc57..0685f625b4b4e 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java @@ -26,6 +26,7 @@ import org.opensearch.http.HttpChannel; import org.opensearch.http.HttpReadTimeoutException; import org.opensearch.http.HttpServerChannel; +import org.opensearch.http.reactor.netty4.http3.Http3Utils; import org.opensearch.http.reactor.netty4.ssl.SslUtils; import org.opensearch.plugins.SecureHttpTransportSettingsProvider; import org.opensearch.plugins.SecureHttpTransportSettingsProvider.SecureHttpTransportParameters; @@ -45,12 +46,14 @@ import java.util.Optional; import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.socket.nio.NioChannelOption; import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.quic.QuicSslContextBuilder; import io.netty.handler.ssl.ApplicationProtocolConfig; import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.ClientAuth; @@ -62,6 +65,7 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.netty.DisposableChannel; import reactor.netty.DisposableServer; import reactor.netty.http.HttpProtocol; import reactor.netty.http.server.HttpServer; @@ -69,6 +73,7 @@ import reactor.netty.http.server.HttpServerResponse; import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_CONNECT_TIMEOUT; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_HTTP3_ENABLED; import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE; import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; @@ -103,6 +108,33 @@ public class ReactorNetty4HttpServerTransport extends AbstractHttpServerTranspor Property.NodeScope ); + /** + * Set the initial maximum data limit for local bidirectional streams (in bytes). + */ + public static final Setting SETTING_H3_MAX_STREAM_LOCAL_LENGTH = Setting.byteSizeSetting( + "h3.max_stream_local_length", + new ByteSizeValue(1000000, ByteSizeUnit.BYTES), + Property.NodeScope + ); + + /** + * Set the initial maximum data limit for remote bidirectional streams (in bytes). + */ + public static final Setting SETTING_H3_MAX_STREAM_REMOTE_LENGTH = Setting.byteSizeSetting( + "h3.max_stream_remote_length", + new ByteSizeValue(1000000, ByteSizeUnit.BYTES), + Property.NodeScope + ); + + /** + * Set the initial maximum stream limit for bidirectional streams. + * + * The HTTP/3 standard expects that each end configures at least 100 + * concurrent bidirectional streams at a time, to avoid reducing performance + * by reducing parallelism. + */ + public static final Setting SETTING_H3_MAX_STREAMS = Setting.longSetting("h3.max_streams", 100L, Property.NodeScope); + /** * The number of Reactor Netty HTTP workers */ @@ -239,7 +271,7 @@ public ReactorNetty4HttpServerTransport( */ @Override protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Exception { - final HttpServer server = configure( + final HttpServer http11or2 = configureHttp11orHttp2( HttpServer.create() .httpFormDecoder(builder -> builder.scheduler(scheduler)) .idleTimeout(Duration.ofMillis(connectTimeoutMillis)) @@ -258,11 +290,85 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti .handle((req, res) -> incomingRequest(req, res)) ); - disposableServer = server.bindNow(); - return new ReactorNetty4HttpServerChannel(disposableServer.channel()); + // The HTTP/3 server binds to the same port as HTTP/2 or HTTP/1.1 since those are + // different protocols (UDP, TCP) + final Optional http3Opt = configureHttp3(socketAddress).map(HttpServer::bindNow); + if (http3Opt.isEmpty()) { + disposableServer = http11or2.bindNow(); + return new ReactorNetty4HttpServerChannel(disposableServer.channel()); + } else { + final DisposableServer http3Server = http3Opt.get(); + final DisposableServer http11or2Server = http11or2.bindNow(); + + disposableServer = new DisposableServer() { + @Override + public Channel channel() { + throw new UnsupportedOperationException("The channel() operation is not supported"); + } + + @Override + public void disposeNow() { + disposeQuietly(http3Server); + disposeQuietly(http11or2Server); + } + }; + + return new ReactorNetty4CompositeHttpServerChannel(http11or2Server.channel(), http3Server.channel()); + } } - private HttpServer configure(final HttpServer server) throws Exception { + private Optional configureHttp3(InetSocketAddress socketAddress) throws Exception { + // Configure SSL context if available + if (secureHttpTransportSettingsProvider != null) { + final Optional parameters = secureHttpTransportSettingsProvider.parameters(settings); + + final KeyManagerFactory keyManagerFactory = parameters.flatMap(SecureHttpTransportParameters::keyManagerFactory) + .orElseThrow(() -> new OpenSearchException("The KeyManagerFactory instance is not provided")); + + if (Http3Utils.isHttp3Available() && SETTING_HTTP_HTTP3_ENABLED.get(settings).booleanValue() == true) { + final QuicSslContextBuilder sslContextBuilder = QuicSslContextBuilder.forServer(keyManagerFactory, null); + + parameters.flatMap(SecureHttpTransportParameters::trustManagerFactory).ifPresent(sslContextBuilder::trustManager); + parameters.flatMap(SecureHttpTransportParameters::clientAuth) + .ifPresent(clientAuth -> sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth))); + + final SslContext sslContext = sslContextBuilder.applicationProtocols( + io.netty.handler.codec.http3.Http3.supportedApplicationProtocols() + ).build(); + + return Optional.of( + HttpServer.create() + .httpFormDecoder(builder -> builder.scheduler(scheduler)) + .idleTimeout(Duration.ofMillis(connectTimeoutMillis)) + .readTimeout(Duration.ofMillis(readTimeoutMillis)) + .runOn(sharedGroup.getLowLevelGroup()) + .bindAddress(() -> socketAddress) + .compress(true) + .httpRequestDecoder( + spec -> spec.maxChunkSize(maxChunkSize.bytesAsInt()) + .h2cMaxContentLength(h2cMaxContentLength.bytesAsInt()) + .maxHeaderSize(maxHeaderSize.bytesAsInt()) + .maxInitialLineLength(maxInitialLineLength.bytesAsInt()) + .allowPartialChunks(false) + ) + .handle((req, res) -> incomingRequest(req, res)) + .http3Settings( + spec -> spec.idleTimeout(Duration.ofMillis(connectTimeoutMillis)) + .maxData(SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings).getBytes()) + .maxStreamDataBidirectionalLocal(SETTING_H3_MAX_STREAM_LOCAL_LENGTH.get(settings).getBytes()) + .maxStreamDataBidirectionalRemote(SETTING_H3_MAX_STREAM_REMOTE_LENGTH.get(settings).getBytes()) + .maxStreamsBidirectional(SETTING_H3_MAX_STREAMS.get(settings).longValue()) + ) + .secure(spec -> spec.sslContext(sslContext)) + .protocol(HttpProtocol.HTTP3) + ); + } + } + + return Optional.empty(); + } + + private HttpServer configureHttp11orHttp2(final HttpServer server) throws Exception { HttpServer configured = server.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings)) .childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)); @@ -433,7 +539,7 @@ protected void stopInternal() { } if (disposableServer != null) { - disposableServer.disposeNow(); + disposeQuietly(disposableServer); disposableServer = null; } } @@ -474,4 +580,16 @@ public void onException(HttpChannel channel, Exception cause) { } } + /** + * Quietly disposes the channel. + */ + private static void disposeQuietly(final DisposableChannel disposable) { + try { + if (disposable != null) { + disposable.disposeNow(); + } + } catch (final RuntimeException ex) { + // Do nothing + } + } } diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/http3/Http3Utils.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/http3/Http3Utils.java new file mode 100644 index 0000000000000..e548417f39c82 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/http3/Http3Utils.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4.http3; + +/** + * Adapted from reactor.netty.http.internal.Http3 class + */ +public final class Http3Utils { + static final boolean isHttp3Available; + + static { + boolean http3; + try { + Class.forName("io.netty.handler.codec.http3.Http3"); + http3 = true; + } catch (Throwable t) { + http3 = false; + } + isHttp3Available = http3; + } + + private Http3Utils() { + + } + + /** + * Check if the current runtime supports HTTP/3, by verifying if {@code io.netty:netty-codec-native-quic} is on the classpath. + * + * @return true if {@code io.netty:netty-codec-native-quic} is available + */ + public static boolean isHttp3Available() { + return isHttp3Available; + } + +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/http3/package-info.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/http3/package-info.java new file mode 100644 index 0000000000000..ed3b7477bc99e --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/http3/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * HTTP/3 supporting utility classes + */ +package org.opensearch.http.reactor.netty4.http3; diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java index 90ed1fe729d3a..d938b40b6abab 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java @@ -57,7 +57,12 @@ public ReactorNetty4Plugin() {} */ @Override public List> getSettings() { - return Arrays.asList(ReactorNetty4HttpServerTransport.SETTING_H2C_MAX_CONTENT_LENGTH); + return Arrays.asList( + ReactorNetty4HttpServerTransport.SETTING_H2C_MAX_CONTENT_LENGTH, + ReactorNetty4HttpServerTransport.SETTING_H3_MAX_STREAM_LOCAL_LENGTH, + ReactorNetty4HttpServerTransport.SETTING_H3_MAX_STREAM_REMOTE_LENGTH, + ReactorNetty4HttpServerTransport.SETTING_H3_MAX_STREAMS + ); } /** diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/AbstractReactorNetty4HttpServerTransportStreamingTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/AbstractReactorNetty4HttpServerTransportStreamingTests.java new file mode 100644 index 0000000000000..d248d6df73834 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/AbstractReactorNetty4HttpServerTransportStreamingTests.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.xcontent.support.XContentHttpChunk; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.http.HttpServerTransport; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.StreamingRestChannel; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.client.node.NodeClient; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.Optional; +import java.util.function.Function; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import static org.opensearch.http.TestDispatcherBuilder.dispatcherBuilderWithDefaults; +import static org.hamcrest.CoreMatchers.instanceOf; + +public abstract class AbstractReactorNetty4HttpServerTransportStreamingTests extends OpenSearchTestCase { + private static final Function XCONTENT_CONVERTER = str -> new ToXContent() { + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + return builder.startObject().field("doc", str).endObject(); + } + }; + + protected HttpServerTransport.Dispatcher createStreamingDispatcher(ThreadPool threadPool, String url, String responseString) { + return dispatcherBuilderWithDefaults().withDispatchHandler((uri, rawPath, method, params) -> Optional.of(new RestHandler() { + @Override + public boolean supportsStreaming() { + return true; + } + + @Override + public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception { + logger.error("--> Unexpected request [{}]", request.uri()); + throw new AssertionError(); + } + })).withDispatchRequest((request, channel, threadContext) -> { + if (url.equals(request.uri())) { + assertThat(channel, instanceOf(StreamingRestChannel.class)); + final StreamingRestChannel streamingChannel = (StreamingRestChannel) channel; + + // Await at most 5 seconds till channel is ready for writing the response stream, fail otherwise + final Mono ready = Mono.fromRunnable(() -> { + while (!streamingChannel.isWritable()) { + Thread.onSpinWait(); + } + }).timeout(Duration.ofSeconds(5)); + + threadPool.executor(ThreadPool.Names.WRITE).execute(() -> Flux.concat(Flux.fromArray(newChunks(responseString)).map(e -> { + try (XContentBuilder builder = channel.newBuilder(XContentType.JSON, true)) { + return XContentHttpChunk.from(e.toXContent(builder, ToXContent.EMPTY_PARAMS)); + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } + }), Mono.just(XContentHttpChunk.last())).delaySubscription(ready).subscribe(streamingChannel::sendChunk, null, () -> { + if (channel.bytesOutput() instanceof Releasable) { + ((Releasable) channel.bytesOutput()).close(); + } + })); + } else { + logger.error("--> Unexpected successful uri [{}]", request.uri()); + throw new AssertionError(); + } + }).build(); + } + + protected static ToXContent[] newChunks(final String responseString) { + final ToXContent[] chunks = new ToXContent[responseString.length() / 16]; + + for (int chunk = 0; chunk < responseString.length(); chunk += 16) { + chunks[chunk / 16] = XCONTENT_CONVERTER.apply(responseString.substring(chunk, chunk + 16)); + } + + return chunks; + } + +} diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java index edd5bc97368dd..1754e21f52106 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java @@ -14,9 +14,11 @@ package org.opensearch.http.reactor.netty4; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.http.reactor.netty4.http3.Http3Utils; import org.opensearch.tasks.Task; import org.opensearch.test.OpenSearchTestCase; @@ -55,11 +57,17 @@ import reactor.core.publisher.ParallelFlux; import reactor.netty.http.Http11SslContextSpec; import reactor.netty.http.Http2SslContextSpec; +import reactor.netty.http.Http3SslContextSpec; import reactor.netty.http.HttpProtocol; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.PrematureCloseException; import reactor.util.retry.Retry; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_HTTP3_ENABLED; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; +import static org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport.SETTING_H3_MAX_STREAMS; +import static org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport.SETTING_H3_MAX_STREAM_LOCAL_LENGTH; +import static org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport.SETTING_H3_MAX_STREAM_REMOTE_LENGTH; import static io.netty.handler.codec.http.HttpHeaderNames.HOST; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; @@ -69,7 +77,8 @@ public class ReactorHttpClient implements Closeable { private final boolean compression; private final boolean secure; - private final boolean useHttp11Only; + private final HttpProtocol protocol; + private final Settings settings; static Collection returnHttpResponseBodies(Collection responses) { List list = new ArrayList<>(responses.size()); @@ -87,22 +96,23 @@ static Collection returnOpaqueIds(Collection responses return list; } - public ReactorHttpClient(boolean compression, boolean secure) { + public ReactorHttpClient(boolean compression, boolean secure, Settings settings) { this.compression = compression; this.secure = secure; - this.useHttp11Only = OpenSearchTestCase.randomBoolean(); + this.protocol = randomProtocol(secure, settings); + this.settings = settings; } - public static ReactorHttpClient create() { - return create(true); + public static ReactorHttpClient create(Settings settings) { + return create(true, settings); } - public static ReactorHttpClient create(boolean compression) { - return new ReactorHttpClient(compression, false); + public static ReactorHttpClient create(boolean compression, Settings settings) { + return new ReactorHttpClient(compression, false, settings); } - public static ReactorHttpClient https() { - return new ReactorHttpClient(true, true); + public static ReactorHttpClient https(Settings settings) { + return new ReactorHttpClient(true, true, settings); } public List get(InetSocketAddress remoteAddress, String... uris) throws InterruptedException { @@ -272,27 +282,46 @@ private HttpClient createClient(final InetSocketAddress remoteAddress, final Nio .runOn(eventLoopGroup) .host(remoteAddress.getHostString()) .port(remoteAddress.getPort()) - .compress(compression) - .protocol(HttpProtocol.H2, HttpProtocol.HTTP11); + .compress(compression); if (secure) { - return client.protocol( - useHttp11Only ? new HttpProtocol[] { HttpProtocol.HTTP11 } : new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2 } - ) - .secure( - spec -> spec.sslContext( - useHttp11Only - /* switch between HTTP 1.1/HTTP 2 randomly, both are supported */ - ? Http11SslContextSpec.forClient() + if (protocol == HttpProtocol.HTTP11) { + return client.protocol(protocol) + .secure( + spec -> spec.sslContext( + Http11SslContextSpec.forClient() .configure(s -> s.clientAuth(ClientAuth.NONE).trustManager(InsecureTrustManagerFactory.INSTANCE)) - : Http2SslContextSpec.forClient() + ).handshakeTimeout(Duration.ofSeconds(30)) + ); + } else if (protocol == HttpProtocol.H2) { + return client.protocol(new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2 }) + .secure( + spec -> spec.sslContext( + Http2SslContextSpec.forClient() .configure(s -> s.clientAuth(ClientAuth.NONE).trustManager(InsecureTrustManagerFactory.INSTANCE)) - ).handshakeTimeout(Duration.ofSeconds(30)) - ); + ).handshakeTimeout(Duration.ofSeconds(30)) + ); + } else { + return client.protocol(protocol) + .secure( + spec -> spec.sslContext( + Http3SslContextSpec.forClient().configure(s -> s.trustManager(InsecureTrustManagerFactory.INSTANCE)) + ).handshakeTimeout(Duration.ofSeconds(30)) + ) + .http3Settings( + spec -> spec.idleTimeout(Duration.ofSeconds(5)) + .maxData(SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings).getBytes()) + .maxStreamDataBidirectionalLocal(SETTING_H3_MAX_STREAM_LOCAL_LENGTH.get(settings).getBytes()) + .maxStreamDataBidirectionalRemote(SETTING_H3_MAX_STREAM_REMOTE_LENGTH.get(settings).getBytes()) + .maxStreamsBidirectional(SETTING_H3_MAX_STREAMS.get(settings).longValue()) + ); + } } else { - return client.protocol( - useHttp11Only ? new HttpProtocol[] { HttpProtocol.HTTP11 } : new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2C } - ); + if (protocol == HttpProtocol.HTTP11) { + return client.protocol(protocol); + } else { + return client.protocol(new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2C }); + } } } @@ -302,6 +331,23 @@ public void close() { } public boolean useHttp11only() { - return useHttp11Only; + return protocol == HttpProtocol.HTTP11; + } + + private static HttpProtocol randomProtocol(boolean secure, Settings settings) { + HttpProtocol[] values = null; + + if (secure) { + if (Http3Utils.isHttp3Available() && SETTING_HTTP_HTTP3_ENABLED.get(settings).booleanValue() == true) { + values = new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2, HttpProtocol.HTTP3 }; + } else { + values = new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2 }; + } + } else { + values = new HttpProtocol[] { HttpProtocol.HTTP11, HttpProtocol.H2C }; + } + + return values[OpenSearchTestCase.randomInt(values.length - 1)]; } + } diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4BadRequestTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4BadRequestTests.java index 101fdd3c3b839..94306a7335330 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4BadRequestTests.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4BadRequestTests.java @@ -89,7 +89,7 @@ public void testBadParameterEncoding() throws Exception { httpServerTransport.start(); final TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()); - try (ReactorHttpClient nettyHttpClient = ReactorHttpClient.create()) { + try (ReactorHttpClient nettyHttpClient = ReactorHttpClient.create(settings)) { final List responses = nettyHttpClient.get(transportAddress.address(), "/_cluster/settings?pretty=%"); try { diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java index f4d7a44c17c37..0ee927298d286 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java @@ -8,28 +8,20 @@ package org.opensearch.http.reactor.netty4; -import org.opensearch.common.lease.Releasable; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.MockBigArrays; import org.opensearch.common.util.MockPageCacheRecycler; import org.opensearch.common.xcontent.XContentType; -import org.opensearch.common.xcontent.support.XContentHttpChunk; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.http.HttpServerTransport; -import org.opensearch.rest.RestChannel; -import org.opensearch.rest.RestHandler; -import org.opensearch.rest.RestRequest; -import org.opensearch.rest.StreamingRestChannel; import org.opensearch.telemetry.tracing.noop.NoopTracer; -import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.client.node.NodeClient; import org.opensearch.transport.reactor.SharedGroupFactory; import org.junit.After; import org.junit.Before; @@ -37,11 +29,8 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.Arrays; import java.util.Collections; -import java.util.Optional; -import java.util.function.Function; import java.util.stream.Collectors; import io.netty.handler.codec.http.DefaultHttpRequest; @@ -50,24 +39,13 @@ import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import static org.opensearch.http.TestDispatcherBuilder.dispatcherBuilderWithDefaults; -import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.equalTo; /** * Tests for the {@link ReactorNetty4HttpServerTransport} class with streaming support. */ -public class ReactorNetty4HttpServerTransportStreamingTests extends OpenSearchTestCase { - private static final Function XCONTENT_CONVERTER = str -> new ToXContent() { - @Override - public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { - return builder.startObject().field("doc", str).endObject(); - } - }; - +public class ReactorNetty4HttpServerTransportStreamingTests extends AbstractReactorNetty4HttpServerTransportStreamingTests { private NetworkService networkService; private ThreadPool threadPool; private MockBigArrays bigArrays; @@ -97,7 +75,7 @@ public void testRequestResponseStreaming() throws InterruptedException { final String url = "/stream/"; final ToXContent[] chunks = newChunks(responseString); - final HttpServerTransport.Dispatcher dispatcher = createStreamingDispatcher(url, responseString); + final HttpServerTransport.Dispatcher dispatcher = createStreamingDispatcher(threadPool, url, responseString); try ( ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( @@ -115,7 +93,7 @@ public void testRequestResponseStreaming() throws InterruptedException { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.create(false)) { + try (ReactorHttpClient client = ReactorHttpClient.create(false, Settings.EMPTY)) { HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); final FullHttpResponse response = client.stream(remoteAddress.address(), request, Arrays.stream(chunks)); try { @@ -141,7 +119,7 @@ public void testConnectionsGettingClosedForStreamingRequests() throws Interrupte final String url = "/stream/"; final ToXContent[] chunks = newChunks(responseString); - final HttpServerTransport.Dispatcher dispatcher = createStreamingDispatcher(url, responseString); + final HttpServerTransport.Dispatcher dispatcher = createStreamingDispatcher(threadPool, url, responseString); try ( ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( @@ -155,7 +133,7 @@ public void testConnectionsGettingClosedForStreamingRequests() throws Interrupte new SharedGroupFactory(Settings.EMPTY), NoopTracer.INSTANCE ); - ReactorHttpClient client = ReactorHttpClient.create(false) + ReactorHttpClient client = ReactorHttpClient.create(false, Settings.EMPTY) ) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); @@ -174,56 +152,4 @@ public void testConnectionsGettingClosedForStreamingRequests() throws Interrupte assertThat(transport.stats().getTotalOpen(), equalTo(numRequests)); } } - - private HttpServerTransport.Dispatcher createStreamingDispatcher(String url, String responseString) { - return dispatcherBuilderWithDefaults().withDispatchHandler((uri, rawPath, method, params) -> Optional.of(new RestHandler() { - @Override - public boolean supportsStreaming() { - return true; - } - - @Override - public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception { - logger.error("--> Unexpected request [{}]", request.uri()); - throw new AssertionError(); - } - })).withDispatchRequest((request, channel, threadContext) -> { - if (url.equals(request.uri())) { - assertThat(channel, instanceOf(StreamingRestChannel.class)); - final StreamingRestChannel streamingChannel = (StreamingRestChannel) channel; - - // Await at most 5 seconds till channel is ready for writing the response stream, fail otherwise - final Mono ready = Mono.fromRunnable(() -> { - while (!streamingChannel.isWritable()) { - Thread.onSpinWait(); - } - }).timeout(Duration.ofSeconds(5)); - - threadPool.executor(ThreadPool.Names.WRITE).execute(() -> Flux.concat(Flux.fromArray(newChunks(responseString)).map(e -> { - try (XContentBuilder builder = channel.newBuilder(XContentType.JSON, true)) { - return XContentHttpChunk.from(e.toXContent(builder, ToXContent.EMPTY_PARAMS)); - } catch (final IOException ex) { - throw new UncheckedIOException(ex); - } - }), Mono.just(XContentHttpChunk.last())).delaySubscription(ready).subscribe(streamingChannel::sendChunk, null, () -> { - if (channel.bytesOutput() instanceof Releasable) { - ((Releasable) channel.bytesOutput()).close(); - } - })); - } else { - logger.error("--> Unexpected successful uri [{}]", request.uri()); - throw new AssertionError(); - } - }).build(); - } - - private static ToXContent[] newChunks(final String responseString) { - final ToXContent[] chunks = new ToXContent[responseString.length() / 16]; - - for (int chunk = 0; chunk < responseString.length(); chunk += 16) { - chunks[chunk / 16] = XCONTENT_CONVERTER.apply(responseString.substring(chunk, chunk + 16)); - } - - return chunks; - } } diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java index 75780e21f12cc..5181956cc58ea 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java @@ -189,7 +189,7 @@ private void runExpectHeaderTest( ) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.create()) { + try (ReactorHttpClient client = ReactorHttpClient.create(settings)) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/"); request.headers().set(HttpHeaderNames.EXPECT, expectation); HttpUtil.setContentLength(request, contentLength); @@ -281,7 +281,7 @@ public void testBadRequest() throws InterruptedException { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.create()) { + try (ReactorHttpClient client = ReactorHttpClient.create(settings)) { final String url = "/" + randomAlphaOfLength(maxInitialLineLength); final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); @@ -320,7 +320,7 @@ public void testDispatchFailed() throws InterruptedException { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.create()) { + try (ReactorHttpClient client = ReactorHttpClient.create(settings)) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); final FullHttpResponse response = client.send(remoteAddress.address(), request); @@ -335,6 +335,7 @@ public void testDispatchFailed() throws InterruptedException { } public void testLargeCompressedResponse() throws InterruptedException { + final Settings settings = createSettings(); final String responseString = randomAlphaOfLength(4 * 1024 * 1024); final String url = "/thing/"; final HttpServerTransport.Dispatcher dispatcher = dispatcherBuilderWithDefaults().withDispatchRequest( @@ -350,21 +351,21 @@ public void testLargeCompressedResponse() throws InterruptedException { try ( ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( - Settings.EMPTY, + settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher, clusterSettings, - new SharedGroupFactory(Settings.EMPTY), + new SharedGroupFactory(settings), NoopTracer.INSTANCE ) ) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.create()) { + try (ReactorHttpClient client = ReactorHttpClient.create(settings)) { DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, randomFrom("deflate", "gzip")); long numOfHugeAllocations = getHugeAllocationCount(); @@ -420,7 +421,7 @@ public void testConnectionsGettingClosed() throws InterruptedException { new SharedGroupFactory(Settings.EMPTY), NoopTracer.INSTANCE ); - ReactorHttpClient client = ReactorHttpClient.create() + ReactorHttpClient client = ReactorHttpClient.create(Settings.EMPTY) ) { transport.start(); TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); @@ -465,7 +466,7 @@ public void testCorsRequest() throws InterruptedException { final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); // Test pre-flight request - try (ReactorHttpClient client = ReactorHttpClient.create()) { + try (ReactorHttpClient client = ReactorHttpClient.create(settings)) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "/"); request.headers().add(CorsHandler.ORIGIN, "test-cors.org"); request.headers().add(CorsHandler.ACCESS_CONTROL_REQUEST_METHOD, "POST"); @@ -482,7 +483,7 @@ public void testCorsRequest() throws InterruptedException { } // Test short-circuited request - try (ReactorHttpClient client = ReactorHttpClient.create()) { + try (ReactorHttpClient client = ReactorHttpClient.create(settings)) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); request.headers().add(CorsHandler.ORIGIN, "google.com"); diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportStreamingTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportStreamingTests.java new file mode 100644 index 0000000000000..0e19e221177e2 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportStreamingTests.java @@ -0,0 +1,248 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4.ssl; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.MockBigArrays; +import org.opensearch.common.util.MockPageCacheRecycler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.http.HttpServerTransport; +import org.opensearch.http.HttpTransportSettings; +import org.opensearch.http.reactor.netty4.AbstractReactorNetty4HttpServerTransportStreamingTests; +import org.opensearch.http.reactor.netty4.ReactorHttpClient; +import org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport; +import org.opensearch.plugins.SecureHttpTransportSettingsProvider; +import org.opensearch.plugins.TransportExceptionHandler; +import org.opensearch.telemetry.tracing.noop.NoopTracer; +import org.opensearch.test.BouncyCastleThreadFilter; +import org.opensearch.test.KeyStoreUtils; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.NettyAllocator; +import org.opensearch.transport.reactor.SharedGroupFactory; +import org.junit.After; +import org.junit.Before; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLException; +import javax.net.ssl.TrustManagerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; +import java.util.stream.Collectors; + +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http2.Http2SecurityUtil; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.pkitesting.CertificateBuilder.Algorithm; + +import static org.opensearch.test.KeyStoreUtils.KEYSTORE_PASSWORD; +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests for the secure {@link ReactorNetty4HttpServerTransport} class. + */ +@ThreadLeakFilters(filters = BouncyCastleThreadFilter.class) +public class SecureReactorNetty4HttpServerTransportStreamingTests extends AbstractReactorNetty4HttpServerTransportStreamingTests { + private NetworkService networkService; + private ThreadPool threadPool; + private MockBigArrays bigArrays; + private ClusterSettings clusterSettings; + private SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider; + + @Before + public void setup() throws Exception { + networkService = new NetworkService(Collections.emptyList()); + threadPool = new TestThreadPool("test"); + bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + var keyManagerFactory = KeyManagerFactory.getInstance("PKIX"); + keyManagerFactory.init(KeyStoreUtils.createServerKeyStore(Algorithm.ecp384), KEYSTORE_PASSWORD); + + secureHttpTransportSettingsProvider = new SecureHttpTransportSettingsProvider() { + @Override + public Optional parameters(Settings settings) { + return Optional.of(new SecureHttpTransportParameters() { + @Override + public Optional keyManagerFactory() { + return Optional.of(keyManagerFactory); + } + + @Override + public Optional sslProvider() { + return Optional.empty(); + } + + @Override + public Optional clientAuth() { + return Optional.empty(); + } + + @Override + public Collection protocols() { + return Arrays.asList(SslUtils.DEFAULT_SSL_PROTOCOLS); + } + + @Override + public Collection cipherSuites() { + return Http2SecurityUtil.CIPHERS; + } + + @Override + public Optional trustManagerFactory() { + return Optional.of(InsecureTrustManagerFactory.INSTANCE); + } + }); + } + + @Override + public Optional buildHttpServerExceptionHandler(Settings settings, HttpServerTransport transport) { + return Optional.empty(); + } + + @Override + public Optional buildSecureHttpServerEngine(Settings settings, HttpServerTransport transport) throws SSLException { + SSLEngine engine = SslContextBuilder.forServer(keyManagerFactory) + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build() + .newEngine(NettyAllocator.getAllocator()); + return Optional.of(engine); + } + }; + } + + @After + public void shutdown() { + if (threadPool != null) { + threadPool.shutdownNow(); + } + threadPool = null; + networkService = null; + bigArrays = null; + clusterSettings = null; + } + + public void testRequestResponseStreaming() throws InterruptedException { + final String responseString = randomAlphaOfLength(4 * 1024); + final String url = "/stream/"; + + final Settings settings = createSettings(); + final ToXContent[] chunks = newChunks(responseString); + final HttpServerTransport.Dispatcher dispatcher = createStreamingDispatcher(threadPool, url, responseString); + + try ( + ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + try (ReactorHttpClient client = ReactorHttpClient.https(settings)) { + HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); + final FullHttpResponse response = client.stream(remoteAddress.address(), request, Arrays.stream(chunks)); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.OK)); + byte[] bytes = new byte[response.content().readableBytes()]; + response.content().readBytes(bytes); + assertThat(new String(bytes, StandardCharsets.UTF_8), equalTo(Arrays.stream(newChunks(responseString)).map(s -> { + try (XContentBuilder builder = XContentType.JSON.contentBuilder()) { + return s.toXContent(builder, ToXContent.EMPTY_PARAMS).toString(); + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } + }).collect(Collectors.joining("\r\n", "", "\r\n")))); + } finally { + response.release(); + } + } + } + } + + public void testConnectionsGettingClosedForStreamingRequests() throws InterruptedException { + final String responseString = randomAlphaOfLength(4 * 1024); + final String url = "/stream/"; + + final Settings settings = createSettings(); + final ToXContent[] chunks = newChunks(responseString); + final HttpServerTransport.Dispatcher dispatcher = createStreamingDispatcher(threadPool, url, responseString); + + try ( + ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ); + ReactorHttpClient client = ReactorHttpClient.https(settings) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); + long numRequests = randomLongBetween(5L, 15L); + for (int i = 0; i < numRequests; i++) { + logger.info("Sending request {}/{}", i + 1, numRequests); + final FullHttpResponse response = client.stream(remoteAddress.address(), request, Arrays.stream(chunks)); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.OK)); + } finally { + response.release(); + } + } + assertThat(transport.stats().getServerOpen(), equalTo(0L)); + assertThat(transport.stats().getTotalOpen(), equalTo(numRequests)); + } + } + + private Settings createSettings() { + return createBuilderWithPort().build(); + } + + private Settings.Builder createBuilderWithPort() { + return Settings.builder() + .put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), getPortRange()) + .put(HttpTransportSettings.SETTING_HTTP_HTTP3_ENABLED.getKey(), randomBoolean()); + } +} diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java index 9d0a837e1004e..349ef4d1bafe9 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java @@ -84,6 +84,7 @@ import io.netty.handler.codec.http2.Http2SecurityUtil; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.pkitesting.CertificateBuilder.Algorithm; import static org.opensearch.core.rest.RestStatus.OK; import static org.opensearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN; @@ -113,7 +114,7 @@ public void setup() throws Exception { clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); var keyManagerFactory = KeyManagerFactory.getInstance("PKIX"); - keyManagerFactory.init(KeyStoreUtils.createServerKeyStore(), KEYSTORE_PASSWORD); + keyManagerFactory.init(KeyStoreUtils.createServerKeyStore(Algorithm.ecp384), KEYSTORE_PASSWORD); secureHttpTransportSettingsProvider = new SecureHttpTransportSettingsProvider() { @Override @@ -158,15 +159,11 @@ public Optional buildHttpServerExceptionHandler(Setti @Override public Optional buildSecureHttpServerEngine(Settings settings, HttpServerTransport transport) throws SSLException { - try { - SSLEngine engine = SslContextBuilder.forServer(keyManagerFactory) - .trustManager(InsecureTrustManagerFactory.INSTANCE) - .build() - .newEngine(NettyAllocator.getAllocator()); - return Optional.of(engine); - } catch (final Exception ex) { - throw new SSLException(ex); - } + SSLEngine engine = SslContextBuilder.forServer(keyManagerFactory) + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build() + .newEngine(NettyAllocator.getAllocator()); + return Optional.of(engine); } }; } @@ -241,7 +238,7 @@ private void runExpectHeaderTest(final Settings settings, final String expectati ) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.https()) { + try (ReactorHttpClient client = ReactorHttpClient.https(settings)) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/"); request.headers().set(HttpHeaderNames.EXPECT, expectation); HttpUtil.setContentLength(request, contentLength); @@ -270,7 +267,7 @@ public void testBindUnavailableAddress() { xContentRegistry(), new NullDispatcher(), clusterSettings, - new SharedGroupFactory(Settings.EMPTY), + new SharedGroupFactory(initialSettings), secureHttpTransportSettingsProvider, NoopTracer.INSTANCE ) @@ -280,6 +277,10 @@ public void testBindUnavailableAddress() { Settings settings = Settings.builder() .put("http.port", remoteAddress.getPort()) .put("network.host", remoteAddress.getAddress()) + .put( + HttpTransportSettings.SETTING_HTTP_HTTP3_ENABLED.getKey(), + HttpTransportSettings.SETTING_HTTP_HTTP3_ENABLED.get(initialSettings) + ) .build(); try ( ReactorNetty4HttpServerTransport otherTransport = new ReactorNetty4HttpServerTransport( @@ -332,7 +333,7 @@ public void testBadRequest() throws InterruptedException { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.https()) { + try (ReactorHttpClient client = ReactorHttpClient.https(settings)) { final String url = "/" + randomAlphaOfLength(maxInitialLineLength); final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); @@ -372,7 +373,7 @@ public void testDispatchFailed() throws InterruptedException { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.https()) { + try (ReactorHttpClient client = ReactorHttpClient.https(settings)) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); final FullHttpResponse response = client.send(remoteAddress.address(), request); @@ -387,6 +388,7 @@ public void testDispatchFailed() throws InterruptedException { } public void testLargeCompressedResponse() throws InterruptedException { + final Settings settings = createSettings(); final String responseString = randomAlphaOfLength(4 * 1024 * 1024); final String url = "/thing/"; final HttpServerTransport.Dispatcher dispatcher = dispatcherBuilderWithDefaults().withDispatchRequest( @@ -402,14 +404,14 @@ public void testLargeCompressedResponse() throws InterruptedException { try ( ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( - Settings.EMPTY, + settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher, clusterSettings, - new SharedGroupFactory(Settings.EMPTY), + new SharedGroupFactory(settings), secureHttpTransportSettingsProvider, NoopTracer.INSTANCE ) @@ -417,7 +419,7 @@ public void testLargeCompressedResponse() throws InterruptedException { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (ReactorHttpClient client = ReactorHttpClient.https()) { + try (ReactorHttpClient client = ReactorHttpClient.https(settings)) { DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, randomFrom("deflate", "gzip")); long numOfHugeAllocations = getHugeAllocationCount(); @@ -471,7 +473,7 @@ public void testCorsRequest() throws InterruptedException { final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); // Test pre-flight request - try (ReactorHttpClient client = ReactorHttpClient.https()) { + try (ReactorHttpClient client = ReactorHttpClient.https(settings)) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "/"); request.headers().add(CorsHandler.ORIGIN, "test-cors.org"); request.headers().add(CorsHandler.ACCESS_CONTROL_REQUEST_METHOD, "POST"); @@ -488,7 +490,7 @@ public void testCorsRequest() throws InterruptedException { } // Test short-circuited request - try (ReactorHttpClient client = ReactorHttpClient.https()) { + try (ReactorHttpClient client = ReactorHttpClient.https(settings)) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); request.headers().add(CorsHandler.ORIGIN, "google.com"); @@ -556,6 +558,8 @@ private Settings createSettings() { } private Settings.Builder createBuilderWithPort() { - return Settings.builder().put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), getPortRange()); + return Settings.builder() + .put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), getPortRange()) + .put(HttpTransportSettings.SETTING_HTTP_HTTP3_ENABLED.getKey(), randomBoolean()); } } diff --git a/server/src/main/java/org/opensearch/bootstrap/Security.java b/server/src/main/java/org/opensearch/bootstrap/Security.java index a8652dffa6910..90b623094fff1 100644 --- a/server/src/main/java/org/opensearch/bootstrap/Security.java +++ b/server/src/main/java/org/opensearch/bootstrap/Security.java @@ -129,7 +129,9 @@ */ @SuppressWarnings("removal") final class Security { - private static final Pattern CODEBASE_JAR_WITH_CLASSIFIER = Pattern.compile("^(.+)-\\d+\\.\\d+[^-]*.*?[-]?([^-]+)?\\.jar$"); + private static final Pattern CODEBASE_JAR_WITH_CLASSIFIER = Pattern.compile( + "^(.+)-\\d+\\.\\d+[^-]*.*?[-]?((?:linux-|windows-|osx-)?[^-]+)?\\.jar$" + ); /** no instantiation */ private Security() {} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 59e039f12e6bf..55720b9a5bde8 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -425,6 +425,7 @@ public void apply(Settings value, Settings current, Settings previous) { HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE, HttpTransportSettings.SETTING_HTTP_TRACE_LOG_INCLUDE, HttpTransportSettings.SETTING_HTTP_TRACE_LOG_EXCLUDE, + HttpTransportSettings.SETTING_HTTP_HTTP3_ENABLED, HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING, HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, diff --git a/server/src/main/java/org/opensearch/http/HttpChannel.java b/server/src/main/java/org/opensearch/http/HttpChannel.java index 7048f08faff9f..4ebb3a870b8d6 100644 --- a/server/src/main/java/org/opensearch/http/HttpChannel.java +++ b/server/src/main/java/org/opensearch/http/HttpChannel.java @@ -32,6 +32,7 @@ package org.opensearch.http; +import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.network.CloseableChannel; import org.opensearch.core.action.ActionListener; @@ -66,6 +67,7 @@ default void handleException(Exception ex) {} * * @return the local address of this channel. */ + @Nullable InetSocketAddress getLocalAddress(); /** @@ -73,6 +75,7 @@ default void handleException(Exception ex) {} * * @return the remote address of this channel. */ + @Nullable InetSocketAddress getRemoteAddress(); /** diff --git a/server/src/main/java/org/opensearch/http/HttpRequest.java b/server/src/main/java/org/opensearch/http/HttpRequest.java index 56e6276811377..4fefed54e8f27 100644 --- a/server/src/main/java/org/opensearch/http/HttpRequest.java +++ b/server/src/main/java/org/opensearch/http/HttpRequest.java @@ -60,7 +60,8 @@ public interface HttpRequest { enum HttpVersion { HTTP_1_0, HTTP_1_1, - HTTP_2_0 + HTTP_2_0, + HTTP_3_0 } /** diff --git a/server/src/main/java/org/opensearch/http/HttpTransportSettings.java b/server/src/main/java/org/opensearch/http/HttpTransportSettings.java index 05c0a28157cb4..2405798009764 100644 --- a/server/src/main/java/org/opensearch/http/HttpTransportSettings.java +++ b/server/src/main/java/org/opensearch/http/HttpTransportSettings.java @@ -259,5 +259,13 @@ public final class HttpTransportSettings { Setting.Property.NodeScope ); + // Enable HTTP/3 protocol if supported by the operating system and architecture + // The HTTP/3 transport is still experimental and should be used with caution. + public static final Setting SETTING_HTTP_HTTP3_ENABLED = Setting.boolSetting( + "http.protocol.http3.enabled", + false, + Property.NodeScope + ); + private HttpTransportSettings() {} } diff --git a/test/framework/src/main/java/org/opensearch/test/KeyStoreUtils.java b/test/framework/src/main/java/org/opensearch/test/KeyStoreUtils.java index 4ca11d724468f..0ca4eb09c377d 100644 --- a/test/framework/src/main/java/org/opensearch/test/KeyStoreUtils.java +++ b/test/framework/src/main/java/org/opensearch/test/KeyStoreUtils.java @@ -48,7 +48,11 @@ public static String inferStoreType(String filePath) { } public static KeyStore createServerKeyStore() throws Exception { - var serverCred = generateCert(); + return createServerKeyStore(Algorithm.rsa2048); + } + + public static KeyStore createServerKeyStore(Algorithm algorithm) throws Exception { + var serverCred = generateCert(algorithm); var keyStore = KeyStore.getInstance(FipsMode.CHECK.isFipsEnabled() ? "BCFKS" : "JKS"); keyStore.load(null, null); keyStore.setKeyEntry( @@ -60,7 +64,7 @@ public static KeyStore createServerKeyStore() throws Exception { return keyStore; } - private static X509Bundle generateCert() throws Exception { + private static X509Bundle generateCert(Algorithm algorithm) throws Exception { final Locale locale = Locale.getDefault(); try { Locale.setDefault(LocaleUtil.EN_Locale); @@ -68,7 +72,7 @@ private static X509Bundle generateCert() throws Exception { // reference: https://csrc.nist.gov/projects/cryptographic-module-validation-program/certificate/4943 return new CertificateBuilder().subject("CN=Test CA Certificate") .setIsCertificateAuthority(true) - .algorithm(Algorithm.rsa2048) + .algorithm(algorithm) .provider(new BouncyCastleFipsProvider()) .buildSelfSigned(); } finally {