Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add `cluster.initial_cluster_manager_nodes` to testClusters OVERRIDABLE_SETTINGS ([#20348](https://github.com/opensearch-project/OpenSearch/pull/20348))
- Add BigInteger support for unsigned_long fields in gRPC transport ([#20346](https://github.com/opensearch-project/OpenSearch/pull/20346))
- Install demo security information when running ./gradlew run -PinstalledPlugins="['opensearch-security']" ([#20372](https://github.com/opensearch-project/OpenSearch/pull/20372))
- Migrate gRPC transport executor from FixedExecutorBuilder to ForkJoinPoolExecutorBuilder for improved performance ([#19685](https://github.com/opensearch-project/OpenSearch/issues/19685))

### Fixed
- Fix bug of warm index: FullFileCachedIndexInput was closed error ([#20055](https://github.com/opensearch-project/OpenSearch/pull/20055))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ForkJoinPoolExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.AuxTransport;
import org.opensearch.transport.client.Client;
Expand Down Expand Up @@ -296,16 +296,16 @@ public List<Setting<?>> getSettings() {
* Returns the executor builders for this plugin's custom thread pools.
* Creates a dedicated thread pool for gRPC request processing that integrates
* with OpenSearch's thread pool monitoring and management system.
* Uses ForkJoinPool for improved performance through work-stealing and better
* load balancing.
*
* @param settings the current settings
* @return executor builders for this plugin's custom thread pools
*/
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
final int executorCount = SETTING_GRPC_EXECUTOR_COUNT.get(settings);
return List.of(
new FixedExecutorBuilder(settings, GRPC_THREAD_POOL_NAME, executorCount, 1000, "thread_pool." + GRPC_THREAD_POOL_NAME)
);
final int parallelism = SETTING_GRPC_EXECUTOR_COUNT.get(settings);
return List.of(new ForkJoinPoolExecutorBuilder(GRPC_THREAD_POOL_NAME, parallelism, "thread_pool." + GRPC_THREAD_POOL_NAME));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ForkJoinPoolExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.grpc.ssl.NettyGrpcClient;
import org.hamcrest.MatcherAssert;
Expand Down Expand Up @@ -48,7 +48,7 @@ public void setup() {
.put("node.name", "test-node")
.put(Netty4GrpcServerTransport.SETTING_GRPC_EXECUTOR_COUNT.getKey(), 4)
.build();
ExecutorBuilder<?> grpcExecutorBuilder = new FixedExecutorBuilder(settings, "grpc", 4, 1000, "thread_pool.grpc");
ExecutorBuilder<?> grpcExecutorBuilder = new ForkJoinPoolExecutorBuilder("grpc", 4, "thread_pool.grpc");
threadPool = new ThreadPool(settings, grpcExecutorBuilder);

services = List.of();
Expand Down Expand Up @@ -236,7 +236,8 @@ public void testWithCustomExecutorCount() {
ExecutorService executor = transport.getGrpcExecutorForTesting();
assertNotNull("gRPC executor should be created", executor);
// Note: The executor is now managed by OpenSearch's ThreadPool system
// We can't easily verify the thread count as it's encapsulated within OpenSearch's executor implementation
// We can't easily verify the thread count as it's encapsulated within
// OpenSearch's executor implementation

transport.stop();
}
Expand All @@ -253,7 +254,8 @@ public void testDefaultExecutorCount() {
ExecutorService executor = transport.getGrpcExecutorForTesting();
assertNotNull("gRPC executor should be created", executor);
// Note: The executor is now managed by OpenSearch's ThreadPool system
// The actual thread count is configured via the FixedExecutorBuilder in the test setup
// The actual parallelism is configured via the ForkJoinPoolExecutorBuilder in
// the test setup

transport.stop();
}
Expand Down Expand Up @@ -292,7 +294,8 @@ public void testExecutorShutdownOnStop() {
assertFalse("Executor should not be shutdown initially", executor.isShutdown());

transport.stop();
// Note: The executor is managed by OpenSearch's ThreadPool and is not shutdown when transport stops
// Note: The executor is managed by OpenSearch's ThreadPool and is not shutdown
// when transport stops
assertNotNull("Executor should still exist after transport stop", executor);

transport.close();
Expand Down Expand Up @@ -359,7 +362,8 @@ public void testStartFailureTriggersCleanup() {
// Start should fail
expectThrows(Exception.class, transport::start);

// Resources should be cleaned up after failure - the implementation calls doStop() in the finally block
// Resources should be cleaned up after failure - the implementation calls
// doStop() in the finally block
ExecutorService executor = transport.getGrpcExecutorForTesting();
EventLoopGroup bossGroup = transport.getBossEventLoopGroupForTesting();
EventLoopGroup workerGroup = transport.getWorkerEventLoopGroupForTesting();
Expand Down Expand Up @@ -463,7 +467,8 @@ public void testShutdownTimeoutHandling() throws InterruptedException {
// Normal shutdown should work
transport.stop();

// Verify everything is shutdown (except executor which is managed by OpenSearch's ThreadPool)
// Verify everything is shutdown (except executor which is managed by
// OpenSearch's ThreadPool)
assertNotNull("Executor should still exist", executor);
assertTrue("Boss group should be shutdown", bossGroup.isShutdown());
assertTrue("Worker group should be shutdown", workerGroup.isShutdown());
Expand All @@ -478,7 +483,8 @@ public void testResourceCleanupOnClose() {
transport.start();
transport.stop();

// doClose should handle cleanup gracefully even if resources are already shutdown
// doClose should handle cleanup gracefully even if resources are already
// shutdown
transport.close();

// Multiple closes should be safe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ForkJoinPoolExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.grpc.interceptor.GrpcInterceptorChain;
import org.junit.After;
Expand Down Expand Up @@ -50,7 +50,7 @@ public void setup() {

// Create a ThreadPool with the gRPC executor
Settings settings = Settings.builder().put("node.name", "test-node").put("grpc.netty.executor_count", 4).build();
ExecutorBuilder<?> grpcExecutorBuilder = new FixedExecutorBuilder(settings, "grpc", 4, 1000, "thread_pool.grpc");
ExecutorBuilder<?> grpcExecutorBuilder = new ForkJoinPoolExecutorBuilder("grpc", 4, "thread_pool.grpc");
threadPool = new ThreadPool(settings, grpcExecutorBuilder);
serverInterceptor = new GrpcInterceptorChain(threadPool.getThreadContext(), Collections.emptyList());
}
Expand Down
Loading