diff --git a/CHANGELOG.md b/CHANGELOG.md index 53cb18c04dde1..b7a9cb628126f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add ingestion management APIs for pause, resume and get ingestion state ([#17631](https://github.com/opensearch-project/OpenSearch/pull/17631)) - [Security Manager Replacement] Enhance Java Agent to intercept System::exit ([#17746](https://github.com/opensearch-project/OpenSearch/pull/17746)) - Support AutoExpand for SearchReplica ([#17741](https://github.com/opensearch-project/OpenSearch/pull/17741)) +- Add TLS enabled SecureNetty4GrpcServerTransport ([#17406](https://github.com/opensearch-project/OpenSearch/pull/17406)) ### Changed - Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912)) diff --git a/plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightService.java b/plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightService.java index 7735fc3df73e0..fdcbbf43d75bf 100644 --- a/plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightService.java +++ b/plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/FlightService.java @@ -134,6 +134,7 @@ public StreamManager getStreamManager() { * Retrieves the bound address of the FlightService. * @return The BoundTransportAddress instance. */ + @Override public BoundTransportAddress getBoundAddress() { return serverComponents.getBoundAddress(); } diff --git a/plugins/transport-grpc/README.md b/plugins/transport-grpc/README.md new file mode 100644 index 0000000000000..59c9bc94205b5 --- /dev/null +++ b/plugins/transport-grpc/README.md @@ -0,0 +1,42 @@ +# transport-grpc + +An auxiliary transport which runs in parallel to the REST API. +The `transport-grpc` plugin initializes a new client/server transport implementing a gRPC protocol on Netty4. + +Enable this transport with: + +``` +setting 'aux.transport.types', '[experimental-transport-grpc]' +setting 'aux.transport.experimental-transport-grpc.port', '9400-9500' //optional +``` + +For the secure transport: + +``` +setting 'aux.transport.types', '[experimental-secure-transport-grpc]' +setting 'aux.transport.experimental-secure-transport-grpc.port', '9400-9500' //optional +``` + +Other settings are agnostic as to the gRPC transport type: + +``` +setting 'grpc.publish_port', '9400' +setting 'grpc.host', '["0.0.0.0"]' +setting 'grpc.bind_host', '["0.0.0.0", "::", "10.0.0.1"]' +setting 'grpc.publish_host', '["thisnode.example.com"]' +setting 'grpc.netty.worker_count', '2' +``` + +## Testing + +### Unit Tests + +``` +./gradlew :plugins:transport-grpc:test +``` + +### Integration Tests + +``` +./gradlew :plugins:transport-grpc:internalClusterTest +``` diff --git a/plugins/transport-grpc/build.gradle b/plugins/transport-grpc/build.gradle index 5c6bc8efe1098..86d1f3a51bd44 100644 --- a/plugins/transport-grpc/build.gradle +++ b/plugins/transport-grpc/build.gradle @@ -1,5 +1,3 @@ -import org.gradle.api.attributes.java.TargetJvmEnvironment - /* * SPDX-License-Identifier: Apache-2.0 * @@ -8,11 +6,21 @@ import org.gradle.api.attributes.java.TargetJvmEnvironment * compatible open source license. */ +apply plugin: 'opensearch.testclusters' +apply plugin: 'opensearch.internal-cluster-test' + opensearchplugin { description = 'gRPC based transport implementation' classname = 'org.opensearch.transport.grpc.GrpcPlugin' } +testClusters { + integTest { + plugin(project.path) + setting 'aux.transport.types', '[experimental-transport-grpc]' + } +} + dependencies { compileOnly "com.google.code.findbugs:jsr305:3.0.2" runtimeOnly "com.google.guava:guava:${versions.guava}" @@ -27,6 +35,7 @@ dependencies { implementation "io.grpc:grpc-stub:${versions.grpc}" implementation "io.grpc:grpc-util:${versions.grpc}" implementation "io.perfmark:perfmark-api:0.26.0" + testImplementation project(':test:framework') } tasks.named("dependencyLicenses").configure { diff --git a/plugins/transport-grpc/src/internalClusterTest/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportIT.java b/plugins/transport-grpc/src/internalClusterTest/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportIT.java new file mode 100644 index 0000000000000..2284a335b06db --- /dev/null +++ b/plugins/transport-grpc/src/internalClusterTest/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportIT.java @@ -0,0 +1,59 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.grpc; + +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import io.grpc.health.v1.HealthCheckResponse; + +import static org.opensearch.plugins.NetworkPlugin.AuxTransport.AUX_TRANSPORT_TYPES_KEY; +import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.GRPC_TRANSPORT_SETTING_KEY; + +public class Netty4GrpcServerTransportIT extends OpenSearchIntegTestCase { + + private TransportAddress randomNetty4GrpcServerTransportAddr() { + List addresses = new ArrayList<>(); + for (Netty4GrpcServerTransport transport : internalCluster().getInstances(Netty4GrpcServerTransport.class)) { + TransportAddress tAddr = new TransportAddress(transport.getBoundAddress().publishAddress().address()); + addresses.add(tAddr); + } + return randomFrom(addresses); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(AUX_TRANSPORT_TYPES_KEY, GRPC_TRANSPORT_SETTING_KEY).build(); + } + + @Override + protected Collection> nodePlugins() { + return Collections.singleton(GrpcPlugin.class); + } + + public void testStartGrpcTransportClusterHealth() throws Exception { + // REST api cluster health + ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().get(); + assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus()); + + // gRPC transport service health + try (NettyGrpcClient client = new NettyGrpcClient.Builder().setAddress(randomNetty4GrpcServerTransportAddr()).build()) { + assertEquals(client.checkHealth(), HealthCheckResponse.ServingStatus.SERVING); + } + } +} diff --git a/plugins/transport-grpc/src/internalClusterTest/java/org/opensearch/transport/grpc/SecureNetty4GrpcServerTransportIT.java b/plugins/transport-grpc/src/internalClusterTest/java/org/opensearch/transport/grpc/SecureNetty4GrpcServerTransportIT.java new file mode 100644 index 0000000000000..a2ae0d9aebcf2 --- /dev/null +++ b/plugins/transport-grpc/src/internalClusterTest/java/org/opensearch/transport/grpc/SecureNetty4GrpcServerTransportIT.java @@ -0,0 +1,222 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.grpc; + +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.SecureAuxTransportSettingsProvider; +import org.opensearch.plugins.SecureHttpTransportSettingsProvider; +import org.opensearch.plugins.SecureSettingsFactory; +import org.opensearch.plugins.SecureTransportSettingsProvider; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.transport.grpc.ssl.SecureNetty4GrpcServerTransport; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +import io.grpc.health.v1.HealthCheckResponse; + +import static org.opensearch.plugins.NetworkPlugin.AuxTransport.AUX_TRANSPORT_TYPES_KEY; +import static org.opensearch.transport.grpc.SecureSettingsHelpers.getServerClientAuthNone; +import static org.opensearch.transport.grpc.SecureSettingsHelpers.getServerClientAuthOptional; +import static org.opensearch.transport.grpc.SecureSettingsHelpers.getServerClientAuthRequired; +import static org.opensearch.transport.grpc.ssl.SecureNetty4GrpcServerTransport.GRPC_SECURE_TRANSPORT_SETTING_KEY; + +public abstract class SecureNetty4GrpcServerTransportIT extends OpenSearchIntegTestCase { + + public static class MockSecurityPlugin extends Plugin implements NetworkPlugin { + public MockSecurityPlugin() {} + + static class MockSecureSettingsFactory implements SecureSettingsFactory { + @Override + public Optional getSecureTransportSettingsProvider(Settings settings) { + return Optional.empty(); + } + + @Override + public Optional getSecureHttpTransportSettingsProvider(Settings settings) { + return Optional.empty(); + } + + @Override + public Optional getSecureAuxTransportSettingsProvider(Settings settings) { + return Optional.empty(); + } + } + } + + protected TransportAddress randomNetty4GrpcServerTransportAddr() { + List addresses = new ArrayList<>(); + for (SecureNetty4GrpcServerTransport transport : internalCluster().getInstances(SecureNetty4GrpcServerTransport.class)) { + TransportAddress tAddr = new TransportAddress(transport.getBoundAddress().publishAddress().address()); + addresses.add(tAddr); + } + return randomFrom(addresses); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(AUX_TRANSPORT_TYPES_KEY, GRPC_SECURE_TRANSPORT_SETTING_KEY) + .build(); + } + + @Override + protected Collection> nodePlugins() { + return List.of(GrpcPlugin.class, MockSecurityPlugin.class); + } + + private SecureSettingsHelpers.ConnectExceptions tryConnectClient(NettyGrpcClient client) { + try { + HealthCheckResponse.ServingStatus status = client.checkHealth(); + if (status == HealthCheckResponse.ServingStatus.SERVING) { + return SecureSettingsHelpers.ConnectExceptions.NONE; + } else { + throw new RuntimeException("Illegal state - unexpected server status: " + status.toString()); + } + } catch (Exception e) { + return SecureSettingsHelpers.ConnectExceptions.get(e); + } + } + + protected SecureSettingsHelpers.ConnectExceptions plaintextClientConnect() throws Exception { + try (NettyGrpcClient client = new NettyGrpcClient.Builder().setAddress(randomNetty4GrpcServerTransportAddr()).build()) { + return tryConnectClient(client); + } + } + + protected SecureSettingsHelpers.ConnectExceptions insecureClientConnect() throws Exception { + try ( + NettyGrpcClient client = new NettyGrpcClient.Builder().setAddress(randomNetty4GrpcServerTransportAddr()).insecure(true).build() + ) { + return tryConnectClient(client); + } + } + + protected SecureSettingsHelpers.ConnectExceptions trustedCertClientConnect() throws Exception { + try ( + NettyGrpcClient client = new NettyGrpcClient.Builder().setAddress(randomNetty4GrpcServerTransportAddr()) + .clientAuth(true) + .build() + ) { + return tryConnectClient(client); + } + } + + public void testClusterHealth() { + ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().get(); + assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus()); + } + + public static class SecureNetty4GrpcServerTransportNoAuthIT extends SecureNetty4GrpcServerTransportIT { + public static class NoAuthMockSecurityPlugin extends MockSecurityPlugin { + public NoAuthMockSecurityPlugin() {} + + @Override + public Optional getSecureSettingFactory(Settings settings) { + return Optional.of(new MockSecureSettingsFactory() { + @Override + public Optional getSecureAuxTransportSettingsProvider(Settings settings) { + return Optional.of(getServerClientAuthNone()); + } + }); + } + } + + @Override + protected Collection> nodePlugins() { + return List.of(GrpcPlugin.class, NoAuthMockSecurityPlugin.class); + } + + public void testPlaintextClientConnect() throws Exception { + assertEquals(plaintextClientConnect(), SecureSettingsHelpers.ConnectExceptions.UNAVAILABLE); + } + + public void testInsecureClientConnect() throws Exception { + assertEquals(insecureClientConnect(), SecureSettingsHelpers.ConnectExceptions.NONE); + } + + public void testTrustedClientConnect() throws Exception { + assertEquals(trustedCertClientConnect(), SecureSettingsHelpers.ConnectExceptions.NONE); + } + } + + public static class SecureNetty4GrpcServerTransportOptionalAuthIT extends SecureNetty4GrpcServerTransportIT { + public static class OptAuthMockSecurityPlugin extends MockSecurityPlugin { + public OptAuthMockSecurityPlugin() {} + + @Override + public Optional getSecureSettingFactory(Settings settings) { + return Optional.of(new MockSecureSettingsFactory() { + @Override + public Optional getSecureAuxTransportSettingsProvider(Settings settings) { + return Optional.of(getServerClientAuthOptional()); + } + }); + } + } + + @Override + protected Collection> nodePlugins() { + return List.of(GrpcPlugin.class, OptAuthMockSecurityPlugin.class); + } + + public void testPlaintextClientConnect() throws Exception { + assertEquals(plaintextClientConnect(), SecureSettingsHelpers.ConnectExceptions.UNAVAILABLE); + } + + public void testInsecureClientConnect() throws Exception { + assertEquals(insecureClientConnect(), SecureSettingsHelpers.ConnectExceptions.NONE); + } + + public void testTrustedClientConnect() throws Exception { + assertEquals(trustedCertClientConnect(), SecureSettingsHelpers.ConnectExceptions.NONE); + } + } + + public static class SecureNetty4GrpcServerTransportRequiredAuthIT extends SecureNetty4GrpcServerTransportIT { + public static class RequireAuthMockSecurityPlugin extends MockSecurityPlugin { + public RequireAuthMockSecurityPlugin() {} + + @Override + public Optional getSecureSettingFactory(Settings settings) { + return Optional.of(new MockSecureSettingsFactory() { + @Override + public Optional getSecureAuxTransportSettingsProvider(Settings settings) { + return Optional.of(getServerClientAuthRequired()); + } + }); + } + } + + @Override + protected Collection> nodePlugins() { + return List.of(GrpcPlugin.class, RequireAuthMockSecurityPlugin.class); + } + + public void testPlaintextClientConnect() throws Exception { + assertEquals(plaintextClientConnect(), SecureSettingsHelpers.ConnectExceptions.UNAVAILABLE); + } + + public void testInsecureClientConnect() throws Exception { + assertEquals(insecureClientConnect(), SecureSettingsHelpers.ConnectExceptions.BAD_CERT); + } + + public void testTrustedClientConnect() throws Exception { + assertEquals(trustedCertClientConnect(), SecureSettingsHelpers.ConnectExceptions.NONE); + } + } +} diff --git a/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java b/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java index 7f02983010f98..ac726f4f890b5 100644 --- a/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java +++ b/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java @@ -14,8 +14,10 @@ import org.opensearch.core.indices.breaker.CircuitBreakerService; import org.opensearch.plugins.NetworkPlugin; import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.SecureAuxTransportSettingsProvider; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.grpc.ssl.SecureNetty4GrpcServerTransport; import java.util.Collections; import java.util.List; @@ -29,6 +31,8 @@ import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_PUBLISH_HOST; import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_PUBLISH_PORT; import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_WORKER_COUNT; +import static org.opensearch.transport.grpc.ssl.SecureNetty4GrpcServerTransport.GRPC_SECURE_TRANSPORT_SETTING_KEY; +import static org.opensearch.transport.grpc.ssl.SecureNetty4GrpcServerTransport.SETTING_GRPC_SECURE_PORT; /** * Main class for the gRPC plugin. @@ -55,10 +59,27 @@ public Map> getAuxTransports( ); } + @Override + public Map> getSecureAuxTransports( + Settings settings, + ThreadPool threadPool, + CircuitBreakerService circuitBreakerService, + NetworkService networkService, + ClusterSettings clusterSettings, + SecureAuxTransportSettingsProvider secureAuxTransportSettingsProvider, + Tracer tracer + ) { + return Collections.singletonMap( + GRPC_SECURE_TRANSPORT_SETTING_KEY, + () -> new SecureNetty4GrpcServerTransport(settings, Collections.emptyList(), networkService, secureAuxTransportSettingsProvider) + ); + } + @Override public List> getSettings() { return List.of( SETTING_GRPC_PORT, + SETTING_GRPC_SECURE_PORT, SETTING_GRPC_HOST, SETTING_GRPC_PUBLISH_HOST, SETTING_GRPC_BIND_HOST, diff --git a/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/Netty4GrpcServerTransport.java b/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/Netty4GrpcServerTransport.java index 1fb6a0bca03ea..81da6ba32b011 100644 --- a/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/Netty4GrpcServerTransport.java +++ b/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/Netty4GrpcServerTransport.java @@ -32,9 +32,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.UnaryOperator; import io.grpc.BindableService; -import io.grpc.InsecureServerCredentials; import io.grpc.Server; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup; @@ -115,14 +115,29 @@ public class Netty4GrpcServerTransport extends NetworkPlugin.AuxTransport { Setting.Property.NodeScope ); - private final Settings settings; + /** + * Port range on which servers bind. + */ + protected PortsRange port; + + /** + * Port settings are set using the transport type, in this case GRPC_TRANSPORT_SETTING_KEY. + * Child classes have distinct transport type keys and need to override these settings. + */ + protected String portSettingKey; + + /** + * Settings. + */ + protected final Settings settings; + private final NetworkService networkService; private final List services; - private final CopyOnWriteArrayList servers = new CopyOnWriteArrayList<>(); private final String[] bindHosts; private final String[] publishHosts; - private final PortsRange port; private final int nettyEventLoopThreads; + private final CopyOnWriteArrayList servers = new CopyOnWriteArrayList<>(); + private final List> serverBuilderConfigs = new ArrayList<>(); private volatile BoundTransportAddress boundAddress; private volatile EventLoopGroup eventLoopGroup; @@ -150,12 +165,23 @@ public Netty4GrpcServerTransport(Settings settings, List servic this.port = SETTING_GRPC_PORT.get(settings); this.nettyEventLoopThreads = SETTING_GRPC_WORKER_COUNT.get(settings); + this.portSettingKey = SETTING_GRPC_PORT.getKey(); } - BoundTransportAddress boundAddress() { + // public for tests + @Override + public BoundTransportAddress getBoundAddress() { return this.boundAddress; } + /** + * Inject a NettyServerBuilder configuration to be applied at server bind and start. + * @param configModifier builder configuration to set. + */ + protected void addServerConfig(UnaryOperator configModifier) { + serverBuilderConfigs.add(configModifier); + } + @Override protected void doStart() { boolean success = false; @@ -198,7 +224,7 @@ protected void doStop() { @Override protected void doClose() { - + eventLoopGroup.close(); } private void bindServer() { @@ -230,7 +256,7 @@ private void bindServer() { + publishInetAddress + "). " + "Please specify a unique port by setting " - + SETTING_GRPC_PORT.getKey() + + portSettingKey + " or " + SETTING_GRPC_PUBLISH_PORT.getKey() ); @@ -249,13 +275,18 @@ private TransportAddress bindAddress(InetAddress hostAddress, PortsRange portRan try { final InetSocketAddress address = new InetSocketAddress(hostAddress, portNumber); - final NettyServerBuilder serverBuilder = NettyServerBuilder.forAddress(address, InsecureServerCredentials.create()) + final NettyServerBuilder serverBuilder = NettyServerBuilder.forAddress(address) + .directExecutor() .bossEventLoopGroup(eventLoopGroup) .workerEventLoopGroup(eventLoopGroup) .channelType(NioServerSocketChannel.class) .addService(new HealthStatusManager().getHealthService()) .addService(ProtoReflectionService.newInstance()); + for (UnaryOperator op : serverBuilderConfigs) { + op.apply(serverBuilder); + } + services.forEach(serverBuilder::addService); Server srv = serverBuilder.build().start(); diff --git a/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/ssl/SecureAuxTransportSslContext.java b/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/ssl/SecureAuxTransportSslContext.java new file mode 100644 index 0000000000000..aa5595d5adb24 --- /dev/null +++ b/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/ssl/SecureAuxTransportSslContext.java @@ -0,0 +1,195 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.grpc.ssl; + +import org.opensearch.OpenSearchSecurityException; +import org.opensearch.plugins.SecureAuxTransportSettingsProvider; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLSessionContext; + +import java.util.List; +import java.util.Optional; + +import io.grpc.netty.shaded.io.netty.buffer.ByteBufAllocator; +import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolConfig; +import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolNames; +import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolNegotiator; +import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider; + +import static io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth.NONE; +import static io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth.OPTIONAL; +import static io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth.REQUIRE; +import static io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider.JDK; +import static io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider.OPENSSL; +import static io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider.OPENSSL_REFCNT; + +/** + * An io.grpc.SslContext which builds and delegates functionality to an internal delegate. + * As this ssl context is provided for aux transports it operates in server mode always. + * TODO: Currently a light SslContext wrapper - hot swap functionality will be added here. + */ +public class SecureAuxTransportSslContext extends SslContext { + private final SslContext sslContext; + + /** + * Simple client auth string to enum conversion helper. + * @param clientAuthStr client auth as string. + * @return ClientAuth enum. + */ + public static ClientAuth clientAuthHelper(String clientAuthStr) { + switch (clientAuthStr) { + case "NONE" -> { + return NONE; + } + case "OPTIONAL" -> { + return OPTIONAL; + } + case "REQUIRE" -> { + return REQUIRE; + } + default -> throw new OpenSearchSecurityException("unsupported client auth: " + clientAuthStr); + } + } + + /** + * Simple ssl provider string to enum conversion helper. + * @param providerStr provider as string. + * @return provider enum. + */ + public static SslProvider providerHelper(String providerStr) { + switch (providerStr) { + case "JDK" -> { + return JDK; + } + case "OPENSSL" -> { + return OPENSSL; + } + case "OPENSSL_REFCNT" -> { + return OPENSSL_REFCNT; + } + default -> throw new OpenSearchSecurityException("unsupported ssl provider: " + providerStr); + } + } + + /** + * Initializes a new SecureAuxTransportSslContext. + * @param provider source of SecureAuxTransportParameters required to build an SslContext. + */ + public SecureAuxTransportSslContext(SecureAuxTransportSettingsProvider provider) { + Optional params = provider.parameters(); + if (params.isEmpty()) { + throw new OpenSearchSecurityException("Aux transport ssl context failed to initialize. Secure settings provider not found."); + } + try { + this.sslContext = buildContext(params.get()); + } catch (SSLException e) { + throw new OpenSearchSecurityException("Unable to build io.grpc.SslContext from secure settings", e); + } + } + + /** + * @param p fields necessary to construct an SslContext. + * @return new SslContext. + */ + private SslContext buildContext(SecureAuxTransportSettingsProvider.SecureAuxTransportParameters p) throws SSLException { + if (p.keyManagerFactory().isEmpty()) { + throw new OpenSearchSecurityException("Aux transport ssl context failed to initialize. No keystore provided."); + } + if (p.sslProvider().isEmpty()) { + throw new OpenSearchSecurityException("Aux transport ssl context failed to initialize. Ssl provider not found."); + } + if (p.clientAuth().isEmpty()) { + throw new OpenSearchSecurityException("Aux transport ssl context failed to initialize. No client auth mode configured."); + } + if (p.trustManagerFactory().isEmpty()) { + throw new OpenSearchSecurityException("Aux transport ssl context failed to initialize. No truststore provided."); + } + SslContextBuilder builder = SslContextBuilder.forServer(p.keyManagerFactory().get()) + .sslProvider(providerHelper(p.sslProvider().get())) + .protocols(p.protocols()) + .ciphers(p.cipherSuites()); + builder.applicationProtocolConfig( + new ApplicationProtocolConfig( + ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2 + ) + ); + builder.clientAuth(clientAuthHelper(p.clientAuth().get())); + builder.trustManager(p.trustManagerFactory().get()); + return builder.build(); + } + + /* + Mirror the io.grpc.netty.shaded.io.netty.handler.ssl API with our delegate. + Note sslContext is volatile and active connections may fail if a hot swap occurs. + */ + + /** + * Create a new SSLEngine instance to handle TLS for a connection. + * @param byteBufAllocator netty allocator. + * @return new SSLEngine instance. + */ + @Override + public SSLEngine newEngine(ByteBufAllocator byteBufAllocator) { + return sslContext.newEngine(byteBufAllocator); + } + + /** + * Create a new SSLEngine instance to handle TLS for a connection. + * @param byteBufAllocator netty allocator. + * @param s host hint. + * @param i port hint. + * @return new SSLEngine instance. + */ + @Override + public SSLEngine newEngine(ByteBufAllocator byteBufAllocator, String s, int i) { + return sslContext.newEngine(byteBufAllocator, s, i); + } + + /** + * @return server only context - always false. + */ + @Override + public boolean isClient() { + return false; + } + + /** + * @return supported cipher suites. + */ + @Override + public List cipherSuites() { + return sslContext.cipherSuites(); + } + + /** + * Deprecated. + * @return HTTP2 requires "h2" be specified in ALPN. + */ + @Deprecated + @Override + public ApplicationProtocolNegotiator applicationProtocolNegotiator() { + return sslContext.applicationProtocolNegotiator(); + } + + /** + * @return session context. + */ + @Override + public SSLSessionContext sessionContext() { + return sslContext.sessionContext(); + } +} diff --git a/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransport.java b/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransport.java new file mode 100644 index 0000000000000..b39ad2fee02ca --- /dev/null +++ b/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransport.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.grpc.ssl; + +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.transport.PortsRange; +import org.opensearch.plugins.SecureAuxTransportSettingsProvider; +import org.opensearch.transport.grpc.Netty4GrpcServerTransport; + +import java.util.List; + +import io.grpc.BindableService; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; + +/** + * Netty4GrpcServerTransport with TLS enabled. + * Security settings injected through a SecureAuxTransportSettingsProvider. + */ +public class SecureNetty4GrpcServerTransport extends Netty4GrpcServerTransport { + /** + * Type key to select secure transport. + */ + public static final String GRPC_SECURE_TRANSPORT_SETTING_KEY = "experimental-secure-transport-grpc"; + + /** + * Distinct port setting required as it depends on transport type key. + */ + public static final Setting SETTING_GRPC_SECURE_PORT = AUX_TRANSPORT_PORT.getConcreteSettingForNamespace( + GRPC_SECURE_TRANSPORT_SETTING_KEY + ); + + private final SslContext sslContext; + + /** + * Creates a new SecureNetty4GrpcServerTransport instance and inject a SecureAuxTransportSslContext + * into the NettyServerBuilder config to enable TLS on the server. + * @param settings the configured settings. + * @param services the gRPC compatible services to be registered with the server. + * @param networkService the bind/publish addresses. + * @param secureTransportSettingsProvider TLS configuration settings. + */ + public SecureNetty4GrpcServerTransport( + Settings settings, + List services, + NetworkService networkService, + SecureAuxTransportSettingsProvider secureTransportSettingsProvider + ) { + super(settings, services, networkService); + this.port = SecureNetty4GrpcServerTransport.SETTING_GRPC_SECURE_PORT.get(settings); + this.portSettingKey = SecureNetty4GrpcServerTransport.SETTING_GRPC_SECURE_PORT.getKey(); + this.sslContext = new SecureAuxTransportSslContext(secureTransportSettingsProvider); + this.addServerConfig((NettyServerBuilder builder) -> builder.sslContext(this.sslContext)); + } +} diff --git a/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/ssl/package-info.java b/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/ssl/package-info.java new file mode 100644 index 0000000000000..bffc3e762a0f4 --- /dev/null +++ b/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/ssl/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * gRPC transport for OpenSearch implementing TLS. + */ +package org.opensearch.transport.grpc.ssl; diff --git a/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportTests.java b/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportTests.java index 8cf44eebb293e..83a9af87f006e 100644 --- a/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportTests.java +++ b/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportTests.java @@ -10,6 +10,7 @@ import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.test.OpenSearchTestCase; import org.hamcrest.MatcherAssert; import org.junit.Before; @@ -17,12 +18,12 @@ import java.util.List; import io.grpc.BindableService; +import io.grpc.health.v1.HealthCheckResponse; import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.not; public class Netty4GrpcServerTransportTests extends OpenSearchTestCase { - private NetworkService networkService; private List services; @@ -32,18 +33,42 @@ public void setup() { services = List.of(); } - public void test() { + private static Settings createSettings() { + return Settings.builder().put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), getPortRange()).build(); + } + + public void testGrpcTransportStartStop() { try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(createSettings(), services, networkService)) { transport.start(); + MatcherAssert.assertThat(transport.getBoundAddress().boundAddresses(), not(emptyArray())); + assertNotNull(transport.getBoundAddress().publishAddress().address()); + transport.stop(); + } + } - MatcherAssert.assertThat(transport.boundAddress().boundAddresses(), not(emptyArray())); - assertNotNull(transport.boundAddress().publishAddress().address()); - + public void testGrpcTransportHealthcheck() { + try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(createSettings(), services, networkService)) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.getBoundAddress().boundAddresses()); + try (NettyGrpcClient client = new NettyGrpcClient.Builder().setAddress(remoteAddress).build()) { + assertEquals(client.checkHealth(), HealthCheckResponse.ServingStatus.SERVING); + } transport.stop(); + } catch (Exception e) { + throw new RuntimeException(e); } } - private static Settings createSettings() { - return Settings.builder().put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), getPortRange()).build(); + public void testGrpcTransportListServices() { + try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(createSettings(), services, networkService)) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.getBoundAddress().boundAddresses()); + try (NettyGrpcClient client = new NettyGrpcClient.Builder().setAddress(remoteAddress).build()) { + assertTrue(client.listServices().get().size() > 1); + } + transport.stop(); + } catch (Exception e) { + throw new RuntimeException(e); + } } } diff --git a/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/NettyGrpcClient.java b/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/NettyGrpcClient.java new file mode 100644 index 0000000000000..52c103510b84c --- /dev/null +++ b/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/NettyGrpcClient.java @@ -0,0 +1,168 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.grpc; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.common.transport.TransportAddress; + +import javax.net.ssl.SSLException; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import io.grpc.ManagedChannel; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthGrpc; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolConfig; +import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolNames; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider; +import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.grpc.reflection.v1alpha.ServerReflectionGrpc; +import io.grpc.reflection.v1alpha.ServerReflectionRequest; +import io.grpc.reflection.v1alpha.ServerReflectionResponse; +import io.grpc.reflection.v1alpha.ServiceResponse; +import io.grpc.stub.StreamObserver; + +import static org.opensearch.transport.grpc.SecureSettingsHelpers.CLIENT_KEYSTORE; +import static org.opensearch.transport.grpc.SecureSettingsHelpers.getTestKeyManagerFactory; +import static io.grpc.internal.GrpcUtil.NOOP_PROXY_DETECTOR; + +public class NettyGrpcClient implements AutoCloseable { + private static final Logger logger = LogManager.getLogger(NettyGrpcClient.class); + private final ManagedChannel channel; + private final HealthGrpc.HealthBlockingStub healthStub; + private final ServerReflectionGrpc.ServerReflectionStub reflectionStub; + + public NettyGrpcClient(NettyChannelBuilder channelBuilder) { + channel = channelBuilder.build(); + healthStub = HealthGrpc.newBlockingStub(channel); + reflectionStub = ServerReflectionGrpc.newStub(channel); + } + + public void shutdown() throws InterruptedException { + channel.shutdown(); + if (!channel.awaitTermination(5, TimeUnit.SECONDS)) { + channel.shutdownNow(); // forced shutdown + if (!channel.awaitTermination(5, TimeUnit.SECONDS)) { + logger.warn("Unable to shutdown the managed channel gracefully"); + } + } + } + + @Override + public void close() throws Exception { + shutdown(); + } + + /** + * List available gRPC services available on server. + * Note: ProtoReflectionService only implements a streaming interface and has no blocking stub. + * @return services registered on the server. + */ + public CompletableFuture> listServices() { + CompletableFuture> respServices = new CompletableFuture<>(); + + StreamObserver responseObserver = new StreamObserver<>() { + final List services = new ArrayList<>(); + + @Override + public void onNext(ServerReflectionResponse response) { + if (response.hasListServicesResponse()) { + services.addAll(response.getListServicesResponse().getServiceList()); + } + } + + @Override + public void onError(Throwable t) { + respServices.completeExceptionally(t); + throw new RuntimeException(t); + } + + @Override + public void onCompleted() { + respServices.complete(services); + } + }; + + StreamObserver requestObserver = reflectionStub.serverReflectionInfo(responseObserver); + requestObserver.onNext(ServerReflectionRequest.newBuilder().setListServices("").build()); + requestObserver.onCompleted(); + return respServices; + } + + /** + * Request server status. + * @return HealthCheckResponse.ServingStatus. + */ + public HealthCheckResponse.ServingStatus checkHealth() { + return healthStub.check(HealthCheckRequest.newBuilder().build()).getStatus(); + } + + public static class Builder { + private Boolean clientAuth = false; + private Boolean insecure = false; + private TransportAddress addr; + + private static final ApplicationProtocolConfig CLIENT_ALPN = new ApplicationProtocolConfig( + ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2 + ); + + Builder() {} + + public NettyGrpcClient build() throws SSLException { + NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(addr.getAddress(), addr.getPort()) + .proxyDetector(NOOP_PROXY_DETECTOR); + + if (clientAuth || insecure) { + SslContextBuilder builder = SslContextBuilder.forClient(); + builder.sslProvider(SslProvider.JDK); + builder.applicationProtocolConfig(CLIENT_ALPN); + if (clientAuth) { + builder.keyManager(getTestKeyManagerFactory(CLIENT_KEYSTORE)); + } + builder.trustManager(InsecureTrustManagerFactory.INSTANCE); + channelBuilder.sslContext(builder.build()); + } else { + channelBuilder.usePlaintext(); + } + + return new NettyGrpcClient(channelBuilder); + } + + public Builder setAddress(TransportAddress addr) { + this.addr = addr; + return this; + } + + /** + * Enable clientAuth - load client keystore. + */ + public Builder clientAuth(boolean enable) { + this.clientAuth = enable; + return this; + } + + /** + * Enable insecure TLS client. + */ + public Builder insecure(boolean enable) { + this.insecure = enable; + return this; + } + } +} diff --git a/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/SecureNetty4GrpcServerTransportTests.java b/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/SecureNetty4GrpcServerTransportTests.java new file mode 100644 index 0000000000000..dc9e33bee203d --- /dev/null +++ b/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/SecureNetty4GrpcServerTransportTests.java @@ -0,0 +1,157 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.grpc; + +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.transport.grpc.ssl.SecureNetty4GrpcServerTransport; +import org.junit.After; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import io.grpc.BindableService; +import io.grpc.StatusRuntimeException; +import io.grpc.health.v1.HealthCheckResponse; + +import static org.opensearch.transport.grpc.SecureSettingsHelpers.ConnectExceptions.BAD_CERT; +import static org.opensearch.transport.grpc.SecureSettingsHelpers.getServerClientAuthNone; +import static org.opensearch.transport.grpc.SecureSettingsHelpers.getServerClientAuthOptional; +import static org.opensearch.transport.grpc.SecureSettingsHelpers.getServerClientAuthRequired; + +public class SecureNetty4GrpcServerTransportTests extends OpenSearchTestCase { + private NetworkService networkService; + private final List services = new ArrayList<>(); + + static Settings createSettings() { + return Settings.builder().put(SecureNetty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), getPortRange()).build(); + } + + @Before + public void setup() { + networkService = new NetworkService(Collections.emptyList()); + } + + @After + public void shutdown() { + networkService = null; + } + + public void testGrpcSecureTransportStartStop() { + try ( + SecureNetty4GrpcServerTransport transport = new SecureNetty4GrpcServerTransport( + createSettings(), + services, + networkService, + getServerClientAuthNone() + ) + ) { + transport.start(); + assertTrue(transport.getBoundAddress().boundAddresses().length > 0); + assertNotNull(transport.getBoundAddress().publishAddress().address()); + transport.stop(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void testGrpcInsecureAuthTLS() { + try ( + SecureNetty4GrpcServerTransport transport = new SecureNetty4GrpcServerTransport( + createSettings(), + services, + networkService, + getServerClientAuthNone() + ) + ) { + transport.start(); + assertTrue(transport.getBoundAddress().boundAddresses().length > 0); + assertNotNull(transport.getBoundAddress().publishAddress().address()); + final TransportAddress remoteAddress = randomFrom(transport.getBoundAddress().boundAddresses()); + + // Client without cert + NettyGrpcClient client = new NettyGrpcClient.Builder().setAddress(remoteAddress).insecure(true).build(); + assertEquals(client.checkHealth(), HealthCheckResponse.ServingStatus.SERVING); + client.close(); + + transport.stop(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void testGrpcOptionalAuthTLS() { + try ( + SecureNetty4GrpcServerTransport transport = new SecureNetty4GrpcServerTransport( + createSettings(), + services, + networkService, + getServerClientAuthOptional() + ) + ) { + transport.start(); + assertTrue(transport.getBoundAddress().boundAddresses().length > 0); + assertNotNull(transport.getBoundAddress().publishAddress().address()); + final TransportAddress remoteAddress = randomFrom(transport.getBoundAddress().boundAddresses()); + + // Client without cert + NettyGrpcClient hasNoCertClient = new NettyGrpcClient.Builder().setAddress(remoteAddress).insecure(true).build(); + assertEquals(hasNoCertClient.checkHealth(), HealthCheckResponse.ServingStatus.SERVING); + hasNoCertClient.close(); + + // Client with trusted cert + NettyGrpcClient hasTrustedCertClient = new NettyGrpcClient.Builder().setAddress(remoteAddress).clientAuth(true).build(); + assertEquals(hasTrustedCertClient.checkHealth(), HealthCheckResponse.ServingStatus.SERVING); + hasTrustedCertClient.close(); + + transport.stop(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void testGrpcRequiredAuthTLS() { + try ( + SecureNetty4GrpcServerTransport transport = new SecureNetty4GrpcServerTransport( + createSettings(), + services, + networkService, + getServerClientAuthRequired() + ) + ) { + transport.start(); + assertTrue(transport.getBoundAddress().boundAddresses().length > 0); + assertNotNull(transport.getBoundAddress().publishAddress().address()); + final TransportAddress remoteAddress = randomFrom(transport.getBoundAddress().boundAddresses()); + + // Client without cert + NettyGrpcClient hasNoCertClient = new NettyGrpcClient.Builder().setAddress(remoteAddress).insecure(true).build(); + assertThrows(StatusRuntimeException.class, hasNoCertClient::checkHealth); + try { + hasNoCertClient.checkHealth(); + } catch (Exception e) { + assertEquals(SecureSettingsHelpers.ConnectExceptions.get(e), BAD_CERT); + } + hasNoCertClient.close(); + + // Client with trusted cert + NettyGrpcClient hasTrustedCertClient = new NettyGrpcClient.Builder().setAddress(remoteAddress).clientAuth(true).build(); + assertEquals(hasTrustedCertClient.checkHealth(), HealthCheckResponse.ServingStatus.SERVING); + hasTrustedCertClient.close(); + + transport.stop(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } +} diff --git a/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/SecureSettingsHelpers.java b/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/SecureSettingsHelpers.java new file mode 100644 index 0000000000000..d4e058d2a3a14 --- /dev/null +++ b/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/SecureSettingsHelpers.java @@ -0,0 +1,174 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.grpc; + +import org.opensearch.plugins.SecureAuxTransportSettingsProvider; +import org.opensearch.transport.grpc.ssl.SecureNetty4GrpcServerTransport; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; + +import java.io.IOException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Optional; + +import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth; +import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +public class SecureSettingsHelpers { + private static final String PROVIDER = "JDK"; // only guaranteed provider + private static final String TEST_PASS = "password"; // used for all keystores + + static final String SERVER_KEYSTORE = "/netty4-server-secure.jks"; + static final String CLIENT_KEYSTORE = "/netty4-client-secure.jks"; + static final String[] DEFAULT_SSL_PROTOCOLS = { "TLSv1.3", "TLSv1.2", "TLSv1.1" }; + static final String[] DEFAULT_CIPHERS = { + "TLS_AES_128_GCM_SHA256", + "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", + "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256" }; + + /** + * Exception messages for various types of TLS client/server connection failure. + * We would like to check to ensure a connection fails in the way we expect. + * However, depending on the default JDK provider exceptions may differ slightly, + * so we allow a couple different error messages for each possible error. + */ + protected enum ConnectExceptions { + NONE(List.of("Connection succeeded")), + UNAVAILABLE(List.of("Network closed for unknown reason")), + BAD_CERT(List.of("bad_certificate", "certificate_required")); + + List msgList = null; + + ConnectExceptions(List exceptionMsg) { + this.msgList = exceptionMsg; + } + + private static boolean containsSubstring(List substrings, String str) { + for (String sub : substrings) { + if (str.contains(sub)) { + return true; + } + } + return false; + } + + static ConnectExceptions get(Throwable e) { + if (e.getMessage() != null && containsSubstring(UNAVAILABLE.msgList, e.getMessage())) { + return UNAVAILABLE; + } + if (e.getMessage() != null && containsSubstring(BAD_CERT.msgList, e.getMessage())) { + return BAD_CERT; + } + if (e.getCause() != null) { + return get(e.getCause()); + } + throw new RuntimeException("Unexpected exception", e); + } + } + + static KeyManagerFactory getTestKeyManagerFactory(String keystorePath) { + KeyManagerFactory keyManagerFactory; + try { + final KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); + keyStore.load(SecureNetty4GrpcServerTransport.class.getResourceAsStream(keystorePath), TEST_PASS.toCharArray()); + keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + keyManagerFactory.init(keyStore, TEST_PASS.toCharArray()); + } catch (UnrecoverableKeyException | CertificateException | KeyStoreException | IOException | NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + return keyManagerFactory; + } + + static TrustManagerFactory getTestTrustManagerFactory(String keystorePath) { + try { + final KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); + trustStore.load(SecureNetty4GrpcServerTransport.class.getResourceAsStream(keystorePath), TEST_PASS.toCharArray()); + TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + trustManagerFactory.init(trustStore); + return trustManagerFactory; + } catch (KeyStoreException | CertificateException | NoSuchAlgorithmException | IOException e) { + throw new RuntimeException(e); + } + } + + static SecureAuxTransportSettingsProvider getSecureSettingsProvider( + String clientAuth, + KeyManagerFactory keyMngerFactory, + TrustManagerFactory trustMngerFactory + ) { + return new SecureAuxTransportSettingsProvider() { + @Override + public Optional parameters() { + return Optional.of(new SecureAuxTransportSettingsProvider.SecureAuxTransportParameters() { + @Override + public Optional sslProvider() { + return Optional.of(PROVIDER); + } + + @Override + public Optional clientAuth() { + return Optional.of(clientAuth); + } + + @Override + public Collection protocols() { + return List.of(DEFAULT_SSL_PROTOCOLS); + } + + @Override + public Collection cipherSuites() { + return List.of(DEFAULT_CIPHERS); + } + + @Override + public Optional keyManagerFactory() { + return Optional.of(keyMngerFactory); + } + + @Override + public Optional trustManagerFactory() { + return Optional.of(trustMngerFactory); + } + }); + } + }; + } + + static SecureAuxTransportSettingsProvider getServerClientAuthRequired() { + return getSecureSettingsProvider( + ClientAuth.REQUIRE.name().toUpperCase(Locale.ROOT), + getTestKeyManagerFactory(SERVER_KEYSTORE), + getTestTrustManagerFactory(CLIENT_KEYSTORE) + ); + } + + static SecureAuxTransportSettingsProvider getServerClientAuthOptional() { + return getSecureSettingsProvider( + ClientAuth.OPTIONAL.name().toUpperCase(Locale.ROOT), + getTestKeyManagerFactory(SERVER_KEYSTORE), + getTestTrustManagerFactory(CLIENT_KEYSTORE) + ); + } + + static SecureAuxTransportSettingsProvider getServerClientAuthNone() { + return getSecureSettingsProvider( + ClientAuth.NONE.name().toUpperCase(Locale.ROOT), + getTestKeyManagerFactory(SERVER_KEYSTORE), + InsecureTrustManagerFactory.INSTANCE + ); + } +} diff --git a/plugins/transport-grpc/src/test/resources/README.txt b/plugins/transport-grpc/src/test/resources/README.txt new file mode 100644 index 0000000000000..d2315aea07404 --- /dev/null +++ b/plugins/transport-grpc/src/test/resources/README.txt @@ -0,0 +1,26 @@ +#!/usr/bin/env bash +# +# This is README describes how the certificates in this directory were created. +# This file can also be executed as a script +# + +# 1. Create server & client certificate key + +openssl req -x509 -sha256 -newkey rsa:2048 -keyout server.key -out server.crt -days 8192 -nodes +openssl req -x509 -sha256 -newkey rsa:2048 -keyout client.key -out client.crt -days 8192 -nodes + +# 2. Export the certificates in pkcs12 format + +openssl pkcs12 -export -in server.crt -inkey server.key -out server.p12 -name netty4-server-secure -password pass:password +openssl pkcs12 -export -in client.crt -inkey client.key -out client.p12 -name netty4-client-secure -password pass:password + +# 3. Import the certificate into JDK keystore (PKCS12 type) + +keytool -importkeystore -srcstorepass password -destkeystore netty4-server-secure.jks -srckeystore server.p12 -srcstoretype PKCS12 -alias netty4-server-secure -deststorepass password +keytool -importkeystore -srcstorepass password -destkeystore netty4-client-secure.jks -srckeystore client.p12 -srcstoretype PKCS12 -alias netty4-client-secure -deststorepass password + +# 4. Clean up - Clean up pkcs12 keystores and private keys +rm client.key +rm client.p12 +rm server.key +rm server.p12 diff --git a/plugins/transport-grpc/src/test/resources/netty4-client-secure.jks b/plugins/transport-grpc/src/test/resources/netty4-client-secure.jks new file mode 100644 index 0000000000000..3497de56fc956 Binary files /dev/null and b/plugins/transport-grpc/src/test/resources/netty4-client-secure.jks differ diff --git a/plugins/transport-grpc/src/test/resources/netty4-server-secure.jks b/plugins/transport-grpc/src/test/resources/netty4-server-secure.jks new file mode 100644 index 0000000000000..5e7d09ded52d0 Binary files /dev/null and b/plugins/transport-grpc/src/test/resources/netty4-server-secure.jks differ diff --git a/server/src/main/java/org/opensearch/common/network/NetworkModule.java b/server/src/main/java/org/opensearch/common/network/NetworkModule.java index 5d55fb52c323d..222f344437ef7 100644 --- a/server/src/main/java/org/opensearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/opensearch/common/network/NetworkModule.java @@ -55,6 +55,7 @@ import org.opensearch.http.HttpServerTransport; import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.plugins.SecureAuxTransportSettingsProvider; import org.opensearch.plugins.SecureHttpTransportSettingsProvider; import org.opensearch.plugins.SecureSettingsFactory; import org.opensearch.plugins.SecureTransportSettingsProvider; @@ -210,6 +211,18 @@ public NetworkModule( ); } + final Collection secureAuxTransportSettingsProviders = secureSettingsFactories.stream() + .map(p -> p.getSecureAuxTransportSettingsProvider(settings)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + + if (secureAuxTransportSettingsProviders.size() > 1) { + throw new IllegalArgumentException( + "there is more than one secure auxiliary transport settings provider: " + secureAuxTransportSettingsProviders + ); + } + for (NetworkPlugin plugin : plugins) { Map> httpTransportFactory = plugin.getHttpTransports( settings, @@ -274,6 +287,24 @@ public NetworkModule( } } + // Register any secure auxiliary transports if available + if (secureAuxTransportSettingsProviders.isEmpty() == false) { + final SecureAuxTransportSettingsProvider secureSettingProvider = secureAuxTransportSettingsProviders.iterator().next(); + + final Map> secureAuxTransportFactory = plugin.getSecureAuxTransports( + settings, + threadPool, + circuitBreakerService, + networkService, + clusterSettings, + secureSettingProvider, + tracer + ); + for (Map.Entry> entry : secureAuxTransportFactory.entrySet()) { + registerAuxTransport(entry.getKey(), entry.getValue()); + } + } + // Register any secure transports if available if (secureTransportSettingsProviders.isEmpty() == false) { final SecureTransportSettingsProvider secureSettingProvider = secureTransportSettingsProviders.iterator().next(); diff --git a/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java b/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java index 4442189373c93..b294c64e5cdce 100644 --- a/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java @@ -42,6 +42,7 @@ import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.common.transport.BoundTransportAddress; import org.opensearch.core.indices.breaker.CircuitBreakerService; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.http.HttpServerTransport; @@ -75,6 +76,7 @@ public interface NetworkPlugin { * bootstrap. To allow pluggable AuxTransports access to configurable port ranges we require the port range be provided * through an {@link org.opensearch.common.settings.Setting.AffixSetting} of the form 'AUX_SETTINGS_PREFIX.{aux-transport-key}.ports'. */ + @ExperimentalApi abstract class AuxTransport extends AbstractLifecycleComponent { public static final String AUX_SETTINGS_PREFIX = "aux.transport."; public static final String AUX_TRANSPORT_TYPES_KEY = AUX_SETTINGS_PREFIX + "types"; @@ -91,6 +93,9 @@ abstract class AuxTransport extends AbstractLifecycleComponent { Function.identity(), Setting.Property.NodeScope ); + + // public for tests + public abstract BoundTransportAddress getBoundAddress(); } /** @@ -159,6 +164,23 @@ default Map> getHttpTransports( return Collections.emptyMap(); } + /** + * Returns a map of secure {@link AuxTransport} suppliers. + * See {@link org.opensearch.plugins.NetworkPlugin.AuxTransport#AUX_TRANSPORT_TYPES_SETTING} to configure a specific implementation. + */ + @ExperimentalApi + default Map> getSecureAuxTransports( + Settings settings, + ThreadPool threadPool, + CircuitBreakerService circuitBreakerService, + NetworkService networkService, + ClusterSettings clusterSettings, + SecureAuxTransportSettingsProvider secureAuxTransportSettingsProvider, + Tracer tracer + ) { + return Collections.emptyMap(); + } + /** * Returns a map of secure {@link Transport} suppliers. * See {@link org.opensearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation. diff --git a/server/src/main/java/org/opensearch/plugins/SecureAuxTransportSettingsProvider.java b/server/src/main/java/org/opensearch/plugins/SecureAuxTransportSettingsProvider.java new file mode 100644 index 0000000000000..d554479ca746e --- /dev/null +++ b/server/src/main/java/org/opensearch/plugins/SecureAuxTransportSettingsProvider.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugins; + +import org.opensearch.common.annotation.ExperimentalApi; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; + +import java.util.Collection; +import java.util.Optional; + +/** + * A security settings provider for auxiliary transports. + * As auxiliary transports are pluggable security params are provided in a generic way for transports + * to construct a ssl context for their particular transport implementation. + * @opensearch.experimental + */ +@ExperimentalApi +public interface SecureAuxTransportSettingsProvider { + /** + * Parameters that can be provided by the {@link SecureAuxTransportSettingsProvider}. + * Includes all fields required to build a ssl context. + * Note that these fields are dynamic. + * A new {@link SecureAuxTransportSettingsProvider.SecureAuxTransportParameters} may be returned on each call. + * @return an instance of {@link SecureAuxTransportSettingsProvider.SecureAuxTransportParameters} + */ + default Optional parameters() { + return Optional.empty(); + } + + /** + * Contains params required to construct a generic ssl context. + */ + @ExperimentalApi + interface SecureAuxTransportParameters { + Optional keyManagerFactory(); + + Optional sslProvider(); + + Optional clientAuth(); + + Collection protocols(); + + Collection cipherSuites(); + + Optional trustManagerFactory(); + } +} diff --git a/server/src/main/java/org/opensearch/plugins/SecureSettingsFactory.java b/server/src/main/java/org/opensearch/plugins/SecureSettingsFactory.java index ec2276ecc62ef..0fdf4b6927eb0 100644 --- a/server/src/main/java/org/opensearch/plugins/SecureSettingsFactory.java +++ b/server/src/main/java/org/opensearch/plugins/SecureSettingsFactory.java @@ -33,4 +33,11 @@ public interface SecureSettingsFactory { * @return optionally, the instance of the {@link SecureHttpTransportSettingsProvider} */ Optional getSecureHttpTransportSettingsProvider(Settings settings); + + /** + * Creates (or provides pre-created) instance of the {@link SecureAuxTransportSettingsProvider} + * @param settings settings + * @return optionally, the instance of the {@link SecureAuxTransportSettingsProvider} + */ + Optional getSecureAuxTransportSettingsProvider(Settings settings); } diff --git a/server/src/main/java/org/opensearch/transport/TransportAdapterProvider.java b/server/src/main/java/org/opensearch/transport/TransportAdapterProvider.java index 36dbd5a699b40..7e39445b1699c 100644 --- a/server/src/main/java/org/opensearch/transport/TransportAdapterProvider.java +++ b/server/src/main/java/org/opensearch/transport/TransportAdapterProvider.java @@ -32,7 +32,7 @@ public interface TransportAdapterProvider { * Provides a new transport adapter of required transport adapter class and transport instance. * @param transport adapter class * @param settings settings - * @param transport HTTP transport instance + * @param transport transport instance * @param adapterClass required transport adapter class * @return the non-empty {@link Optional} if the transport adapter could be created, empty one otherwise */ diff --git a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java index 447377e372e61..c07fa0e183c00 100644 --- a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java +++ b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java @@ -47,6 +47,7 @@ import org.opensearch.http.HttpStats; import org.opensearch.http.NullDispatcher; import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.plugins.SecureAuxTransportSettingsProvider; import org.opensearch.plugins.SecureHttpTransportSettingsProvider; import org.opensearch.plugins.SecureSettingsFactory; import org.opensearch.plugins.SecureTransportSettingsProvider; @@ -130,6 +131,12 @@ public Optional buildHttpServerExceptionHandler( } }); } + + @Override + public Optional getSecureAuxTransportSettingsProvider(Settings settings) { + return Optional.of(new SecureAuxTransportSettingsProvider() { + }); + } }; }