diff --git a/CHANGELOG.md b/CHANGELOG.md index fb4d2bafb9ee7..0b0d26445924a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add `cluster.initial_cluster_manager_nodes` to testClusters OVERRIDABLE_SETTINGS ([#20348](https://github.com/opensearch-project/OpenSearch/pull/20348)) - Add BigInteger support for unsigned_long fields in gRPC transport ([#20346](https://github.com/opensearch-project/OpenSearch/pull/20346)) - Install demo security information when running ./gradlew run -PinstalledPlugins="['opensearch-security']" ([#20372](https://github.com/opensearch-project/OpenSearch/pull/20372)) +- Migrate gRPC transport executor from FixedExecutorBuilder to ForkJoinPoolExecutorBuilder for improved performance ([#19685](https://github.com/opensearch-project/OpenSearch/issues/19685)) ### Fixed - Fix bug of warm index: FullFileCachedIndexInput was closed error ([#20055](https://github.com/opensearch-project/OpenSearch/pull/20055)) diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java index 33cf26ba6c3b6..78e9fb2b736ff 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java @@ -28,7 +28,7 @@ import org.opensearch.script.ScriptService; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ExecutorBuilder; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ForkJoinPoolExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.AuxTransport; import org.opensearch.transport.client.Client; @@ -296,16 +296,16 @@ public List> getSettings() { * Returns the executor builders for this plugin's custom thread pools. * Creates a dedicated thread pool for gRPC request processing that integrates * with OpenSearch's thread pool monitoring and management system. + * Uses ForkJoinPool for improved performance through work-stealing and better + * load balancing. * * @param settings the current settings * @return executor builders for this plugin's custom thread pools */ @Override public List> getExecutorBuilders(Settings settings) { - final int executorCount = SETTING_GRPC_EXECUTOR_COUNT.get(settings); - return List.of( - new FixedExecutorBuilder(settings, GRPC_THREAD_POOL_NAME, executorCount, 1000, "thread_pool." + GRPC_THREAD_POOL_NAME) - ); + final int parallelism = SETTING_GRPC_EXECUTOR_COUNT.get(settings); + return List.of(new ForkJoinPoolExecutorBuilder(GRPC_THREAD_POOL_NAME, parallelism, "thread_pool." + GRPC_THREAD_POOL_NAME)); } /** diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportTests.java index fd3feb8a38fa3..477ed939b02c0 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportTests.java @@ -16,7 +16,7 @@ import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ExecutorBuilder; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ForkJoinPoolExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.grpc.ssl.NettyGrpcClient; import org.hamcrest.MatcherAssert; @@ -48,7 +48,7 @@ public void setup() { .put("node.name", "test-node") .put(Netty4GrpcServerTransport.SETTING_GRPC_EXECUTOR_COUNT.getKey(), 4) .build(); - ExecutorBuilder grpcExecutorBuilder = new FixedExecutorBuilder(settings, "grpc", 4, 1000, "thread_pool.grpc"); + ExecutorBuilder grpcExecutorBuilder = new ForkJoinPoolExecutorBuilder("grpc", 4, "thread_pool.grpc"); threadPool = new ThreadPool(settings, grpcExecutorBuilder); services = List.of(); @@ -236,7 +236,8 @@ public void testWithCustomExecutorCount() { ExecutorService executor = transport.getGrpcExecutorForTesting(); assertNotNull("gRPC executor should be created", executor); // Note: The executor is now managed by OpenSearch's ThreadPool system - // We can't easily verify the thread count as it's encapsulated within OpenSearch's executor implementation + // We can't easily verify the thread count as it's encapsulated within + // OpenSearch's executor implementation transport.stop(); } @@ -253,7 +254,8 @@ public void testDefaultExecutorCount() { ExecutorService executor = transport.getGrpcExecutorForTesting(); assertNotNull("gRPC executor should be created", executor); // Note: The executor is now managed by OpenSearch's ThreadPool system - // The actual thread count is configured via the FixedExecutorBuilder in the test setup + // The actual parallelism is configured via the ForkJoinPoolExecutorBuilder in + // the test setup transport.stop(); } @@ -292,7 +294,8 @@ public void testExecutorShutdownOnStop() { assertFalse("Executor should not be shutdown initially", executor.isShutdown()); transport.stop(); - // Note: The executor is managed by OpenSearch's ThreadPool and is not shutdown when transport stops + // Note: The executor is managed by OpenSearch's ThreadPool and is not shutdown + // when transport stops assertNotNull("Executor should still exist after transport stop", executor); transport.close(); @@ -359,7 +362,8 @@ public void testStartFailureTriggersCleanup() { // Start should fail expectThrows(Exception.class, transport::start); - // Resources should be cleaned up after failure - the implementation calls doStop() in the finally block + // Resources should be cleaned up after failure - the implementation calls + // doStop() in the finally block ExecutorService executor = transport.getGrpcExecutorForTesting(); EventLoopGroup bossGroup = transport.getBossEventLoopGroupForTesting(); EventLoopGroup workerGroup = transport.getWorkerEventLoopGroupForTesting(); @@ -463,7 +467,8 @@ public void testShutdownTimeoutHandling() throws InterruptedException { // Normal shutdown should work transport.stop(); - // Verify everything is shutdown (except executor which is managed by OpenSearch's ThreadPool) + // Verify everything is shutdown (except executor which is managed by + // OpenSearch's ThreadPool) assertNotNull("Executor should still exist", executor); assertTrue("Boss group should be shutdown", bossGroup.isShutdown()); assertTrue("Worker group should be shutdown", workerGroup.isShutdown()); @@ -478,7 +483,8 @@ public void testResourceCleanupOnClose() { transport.start(); transport.stop(); - // doClose should handle cleanup gracefully even if resources are already shutdown + // doClose should handle cleanup gracefully even if resources are already + // shutdown transport.close(); // Multiple closes should be safe diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransportTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransportTests.java index aeb59e247dcb2..a9d051954c1b0 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransportTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransportTests.java @@ -13,7 +13,7 @@ import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ExecutorBuilder; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ForkJoinPoolExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.grpc.interceptor.GrpcInterceptorChain; import org.junit.After; @@ -50,7 +50,7 @@ public void setup() { // Create a ThreadPool with the gRPC executor Settings settings = Settings.builder().put("node.name", "test-node").put("grpc.netty.executor_count", 4).build(); - ExecutorBuilder grpcExecutorBuilder = new FixedExecutorBuilder(settings, "grpc", 4, 1000, "thread_pool.grpc"); + ExecutorBuilder grpcExecutorBuilder = new ForkJoinPoolExecutorBuilder("grpc", 4, "thread_pool.grpc"); threadPool = new ThreadPool(settings, grpcExecutorBuilder); serverInterceptor = new GrpcInterceptorChain(threadPool.getThreadContext(), Collections.emptyList()); }