Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Adding logic for histogram aggregation using skiplist ([#19130](https://github.com/opensearch-project/OpenSearch/pull/19130))
- Add skip_list param for date, scaled float and token count fields ([#19142](https://github.com/opensearch-project/OpenSearch/pull/19142))
- Implement GRPC MatchPhrase, MultiMatch queries ([#19449](https://github.com/opensearch-project/OpenSearch/pull/19449))
- Optimize gRPC transport thread management for improved throughput ([#19278](https://github.com/opensearch-project/OpenSearch/pull/19278))

### Changed
- Refactor `if-else` chains to use `Java 17 pattern matching switch expressions`(([#18965](https://github.com/opensearch-project/OpenSearch/pull/18965))
Expand Down
14 changes: 13 additions & 1 deletion modules/transport-grpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ setting 'aux.transport.secure-transport-grpc.port', '9400-9500' //optional
| **grpc.host** | List of addresses the gRPC server will bind to. | `["0.0.0.0"]` | `[]` |
| **grpc.bind_host** | List of addresses to bind the gRPC server to. Can be distinct from publish hosts. | `["0.0.0.0", "::"]` | Value of `grpc.host` |
| **grpc.publish_host** | List of hostnames or IPs published to peers for client connections. | `["thisnode.example.com"]` | Value of `grpc.host` |
| **grpc.netty.worker_count** | Number of Netty worker threads for the gRPC server. Controls concurrency and parallelism. | `2` | Number of processors |
| **grpc.netty.worker_count** | Number of Netty worker threads for the gRPC server. Controls network I/O concurrency. | `2` | Number of processors |
| **grpc.netty.executor_count** | Number of threads in the ForkJoinPool for processing gRPC service calls. Controls request processing parallelism. | `32` | 2 × Number of processors |
| **grpc.netty.max_concurrent_connection_calls** | Maximum number of simultaneous in-flight requests allowed per client connection. | `200` | `100` |
| **grpc.netty.max_connection_age** | Maximum age a connection is allowed before being gracefully closed. Supports time units like `ms`, `s`, `m`. | `500ms` | Not set (no limit) |
| **grpc.netty.max_connection_idle** | Maximum duration a connection can be idle before being closed. Supports time units like `ms`, `s`, `m`. | `2m` | Not set (no limit) |
Expand All @@ -50,13 +51,24 @@ setting 'grpc.host', '["0.0.0.0"]'
setting 'grpc.bind_host', '["0.0.0.0", "::", "10.0.0.1"]'
setting 'grpc.publish_host', '["thisnode.example.com"]'
setting 'grpc.netty.worker_count', '2'
setting 'grpc.netty.executor_count', '32'
setting 'grpc.netty.max_concurrent_connection_calls', '200'
setting 'grpc.netty.max_connection_age', '500ms'
setting 'grpc.netty.max_connection_idle', '2m'
setting 'grpc.netty.max_msg_size', '10mb'
setting 'grpc.netty.keepalive_timeout', '1s'
```

## Thread Pool Monitoring

The dedicated thread pool used for gRPC request processing is registered as a standard OpenSearch thread pool named `grpc`, controlled by the `grpc.netty.executor_count` setting.

The gRPC thread pool stats can be monitored using:

```bash
curl -X GET "localhost:9200/_nodes/stats/thread_pool?filter_path=nodes.*.thread_pool.grpc"
```

## Testing

### Unit Tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.AuxTransport;
import org.opensearch.transport.client.Client;
Expand All @@ -49,6 +51,7 @@

import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.GRPC_TRANSPORT_SETTING_KEY;
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_BIND_HOST;
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_EXECUTOR_COUNT;
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_HOST;
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_KEEPALIVE_TIMEOUT;
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_MAX_CONCURRENT_CONNECTION_CALLS;
Expand All @@ -69,6 +72,9 @@ public final class GrpcPlugin extends Plugin implements NetworkPlugin, Extensibl

private static final Logger logger = LogManager.getLogger(GrpcPlugin.class);

/** The name of the gRPC thread pool */
public static final String GRPC_THREAD_POOL_NAME = "grpc";

private Client client;
private final List<QueryBuilderProtoConverter> queryConverters = new ArrayList<>();
private QueryBuilderProtoConverterRegistryImpl queryRegistry;
Expand Down Expand Up @@ -163,7 +169,7 @@ public Map<String, Supplier<AuxTransport>> getAuxTransports(
);
return Collections.singletonMap(
GRPC_TRANSPORT_SETTING_KEY,
() -> new Netty4GrpcServerTransport(settings, grpcServices, networkService)
() -> new Netty4GrpcServerTransport(settings, grpcServices, networkService, threadPool)
);
}

Expand Down Expand Up @@ -206,7 +212,13 @@ public Map<String, Supplier<AuxTransport>> getSecureAuxTransports(
);
return Collections.singletonMap(
GRPC_SECURE_TRANSPORT_SETTING_KEY,
() -> new SecureNetty4GrpcServerTransport(settings, grpcServices, networkService, secureAuxTransportSettingsProvider)
() -> new SecureNetty4GrpcServerTransport(
settings,
grpcServices,
networkService,
threadPool,
secureAuxTransportSettingsProvider
)
);
}

Expand Down Expand Up @@ -235,6 +247,7 @@ public List<Setting<?>> getSettings() {
SETTING_GRPC_PUBLISH_HOST,
SETTING_GRPC_BIND_HOST,
SETTING_GRPC_WORKER_COUNT,
SETTING_GRPC_EXECUTOR_COUNT,
SETTING_GRPC_MAX_CONCURRENT_CONNECTION_CALLS,
SETTING_GRPC_MAX_MSG_SIZE,
SETTING_GRPC_MAX_CONNECTION_AGE,
Expand All @@ -243,6 +256,22 @@ public List<Setting<?>> getSettings() {
);
}

/**
* Returns the executor builders for this plugin's custom thread pools.
* Creates a dedicated thread pool for gRPC request processing that integrates
* with OpenSearch's thread pool monitoring and management system.
*
* @param settings the current settings
* @return executor builders for this plugin's custom thread pools
*/
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
final int executorCount = SETTING_GRPC_EXECUTOR_COUNT.get(settings);
return List.of(
new FixedExecutorBuilder(settings, GRPC_THREAD_POOL_NAME, executorCount, 1000, "thread_pool." + GRPC_THREAD_POOL_NAME)
);
}

/**
* Creates components used by the plugin.
* Stores the client for later use in creating gRPC services, and the query registry which registers the types of supported GRPC Search queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.AuxTransport;
import org.opensearch.transport.BindTransportException;

Expand All @@ -32,6 +33,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -118,6 +120,16 @@ public class Netty4GrpcServerTransport extends AuxTransport {
Setting.Property.NodeScope
);

/**
* Configure size of executor thread pool for handling gRPC calls.
*/
public static final Setting<Integer> SETTING_GRPC_EXECUTOR_COUNT = new Setting<>(
"grpc.netty.executor_count",
Comment thread
andrross marked this conversation as resolved.
(s) -> Integer.toString(OpenSearchExecutors.allocatedProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "grpc.netty.executor_count"),
Setting.Property.NodeScope
);

/**
* Controls the number of allowed simultaneous in flight requests a single client connection may send.
*/
Expand Down Expand Up @@ -189,10 +201,12 @@ public class Netty4GrpcServerTransport extends AuxTransport {
protected final Settings settings;

private final NetworkService networkService;
private final ThreadPool threadPool;
private final List<BindableService> services;
private final String[] bindHosts;
private final String[] publishHosts;
private final int nettyEventLoopThreads;
private final int executorThreads;
private final long maxInboundMessageSize;
private final long maxConcurrentConnectionCalls;
private final TimeValue maxConnectionAge;
Expand All @@ -202,19 +216,28 @@ public class Netty4GrpcServerTransport extends AuxTransport {
private final List<UnaryOperator<NettyServerBuilder>> serverBuilderConfigs = new ArrayList<>();

private volatile BoundTransportAddress boundAddress;
private volatile EventLoopGroup eventLoopGroup;
private volatile EventLoopGroup bossEventLoopGroup;
private volatile EventLoopGroup workerEventLoopGroup;
private volatile ExecutorService grpcExecutor;

/**
* Creates a new Netty4GrpcServerTransport instance.
* @param settings the configured settings.
* @param services the gRPC compatible services to be registered with the server.
* @param networkService the bind/publish addresses.
* @param threadPool the thread pool for gRPC request processing.
*/
public Netty4GrpcServerTransport(Settings settings, List<BindableService> services, NetworkService networkService) {
public Netty4GrpcServerTransport(
Settings settings,
List<BindableService> services,
NetworkService networkService,
ThreadPool threadPool
) {
logger.debug("Initializing Netty4GrpcServerTransport with settings = {}", settings);
this.settings = Objects.requireNonNull(settings);
this.services = Objects.requireNonNull(services);
this.networkService = Objects.requireNonNull(networkService);
this.threadPool = Objects.requireNonNull(threadPool);
final List<String> grpcBindHost = SETTING_GRPC_BIND_HOST.get(settings);
this.bindHosts = (grpcBindHost.isEmpty() ? NetworkService.GLOBAL_NETWORK_BIND_HOST_SETTING.get(settings) : grpcBindHost).toArray(
Strings.EMPTY_ARRAY
Expand All @@ -224,6 +247,7 @@ public Netty4GrpcServerTransport(Settings settings, List<BindableService> servic
.toArray(Strings.EMPTY_ARRAY);
this.port = SETTING_GRPC_PORT.get(settings);
this.nettyEventLoopThreads = SETTING_GRPC_WORKER_COUNT.get(settings);
this.executorThreads = SETTING_GRPC_EXECUTOR_COUNT.get(settings);
this.maxInboundMessageSize = SETTING_GRPC_MAX_MSG_SIZE.get(settings).getBytes();
this.maxConcurrentConnectionCalls = SETTING_GRPC_MAX_CONCURRENT_CONNECTION_CALLS.get(settings);
this.maxConnectionAge = SETTING_GRPC_MAX_CONNECTION_AGE.get(settings);
Expand All @@ -232,12 +256,22 @@ public Netty4GrpcServerTransport(Settings settings, List<BindableService> servic
this.portSettingKey = SETTING_GRPC_PORT.getKey();
}

/**
* Returns the setting key used to identify this transport type.
*
* @return the gRPC transport setting key
*/
@Override
public String settingKey() {
return GRPC_TRANSPORT_SETTING_KEY;
}

// public for tests
/**
* Returns the bound transport addresses for this gRPC server.
* This method is public for testing purposes.
*
* @return the bound transport address containing all bound addresses and publish address
*/
@Override
public BoundTransportAddress getBoundAddress() {
return this.boundAddress;
Expand All @@ -259,10 +293,16 @@ protected void addServerConfig(UnaryOperator<NettyServerBuilder> configModifier)
protected void doStart() {
boolean success = false;
try {
this.eventLoopGroup = new NioEventLoopGroup(nettyEventLoopThreads, daemonThreadFactory(settings, "grpc_event_loop"));
// Create separate boss and worker event loop groups for better isolation
this.bossEventLoopGroup = new NioEventLoopGroup(1, daemonThreadFactory(settings, "grpc_boss"));
this.workerEventLoopGroup = new NioEventLoopGroup(nettyEventLoopThreads, daemonThreadFactory(settings, "grpc_worker"));

// Use OpenSearch's managed thread pool for gRPC request processing
this.grpcExecutor = threadPool.executor("grpc");

bindServer();
success = true;
logger.info("Started gRPC server on port {}", port);
logger.info("Started gRPC server on port {} with {} executor threads", port, executorThreads);
} finally {
if (!success) {
doStop();
Expand All @@ -289,12 +329,25 @@ protected void doStop() {
}
}
}
if (eventLoopGroup != null) {

// Note: grpcExecutor is managed by OpenSearch's ThreadPool, so we don't shut it down here

// Shutdown event loop groups
if (bossEventLoopGroup != null) {
try {
eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await();
bossEventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Failed to shut down event loop group");
logger.warn("Failed to shut down boss event loop group");
}
}

if (workerEventLoopGroup != null) {
try {
workerEventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Failed to shut down worker event loop group");
}
}
}
Expand All @@ -305,7 +358,12 @@ protected void doStop() {
*/
@Override
protected void doClose() {
eventLoopGroup.close();
if (bossEventLoopGroup != null) {
bossEventLoopGroup.close();
}
if (workerEventLoopGroup != null) {
workerEventLoopGroup.close();
}
}

private void bindServer() {
Expand Down Expand Up @@ -356,9 +414,9 @@ private TransportAddress bindAddress(InetAddress hostAddress, PortsRange portRan
try {
final InetSocketAddress address = new InetSocketAddress(hostAddress, portNumber);
final NettyServerBuilder serverBuilder = NettyServerBuilder.forAddress(address)
.directExecutor()
.bossEventLoopGroup(eventLoopGroup)
.workerEventLoopGroup(eventLoopGroup)
.executor(grpcExecutor)
.bossEventLoopGroup(bossEventLoopGroup)
.workerEventLoopGroup(workerEventLoopGroup)
.maxInboundMessageSize((int) maxInboundMessageSize)
.maxConcurrentCallsPerConnection((int) maxConcurrentConnectionCalls)
.maxConnectionAge(maxConnectionAge.duration(), maxConnectionAge.timeUnit())
Expand Down Expand Up @@ -391,4 +449,17 @@ private TransportAddress bindAddress(InetAddress hostAddress, PortsRange portRan

return addr.get();
}

// Package-private methods for testing
ExecutorService getGrpcExecutorForTesting() {
return grpcExecutor;
}

EventLoopGroup getBossEventLoopGroupForTesting() {
return bossEventLoopGroup;
}

EventLoopGroup getWorkerEventLoopGroupForTesting() {
return workerEventLoopGroup;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.PortsRange;
import org.opensearch.plugins.SecureAuxTransportSettingsProvider;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.grpc.Netty4GrpcServerTransport;

import javax.net.ssl.SSLContext;
Expand Down Expand Up @@ -73,15 +74,17 @@ public Collection<String> cipherSuites() {
* @param settings the configured settings.
* @param services the gRPC compatible services to be registered with the server.
* @param networkService the bind/publish addresses.
* @param threadPool the thread pool for managing gRPC executor and monitoring.
* @param secureTransportSettingsProvider TLS configuration settings.
*/
public SecureNetty4GrpcServerTransport(
Settings settings,
List<BindableService> services,
NetworkService networkService,
ThreadPool threadPool,
SecureAuxTransportSettingsProvider secureTransportSettingsProvider
) {
super(settings, services, networkService);
super(settings, services, networkService, threadPool);
this.port = SecureNetty4GrpcServerTransport.SETTING_GRPC_SECURE_PORT.get(settings);
this.portSettingKey = SecureNetty4GrpcServerTransport.SETTING_GRPC_SECURE_PORT.getKey();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.GRPC_TRANSPORT_SETTING_KEY;
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_BIND_HOST;
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_EXECUTOR_COUNT;
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_HOST;
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_KEEPALIVE_TIMEOUT;
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_MAX_CONCURRENT_CONNECTION_CALLS;
Expand Down Expand Up @@ -111,6 +112,7 @@ public void testGetSettings() {
assertTrue("SETTING_GRPC_PUBLISH_HOST should be included", settings.contains(SETTING_GRPC_PUBLISH_HOST));
assertTrue("SETTING_GRPC_BIND_HOST should be included", settings.contains(SETTING_GRPC_BIND_HOST));
assertTrue("SETTING_GRPC_WORKER_COUNT should be included", settings.contains(SETTING_GRPC_WORKER_COUNT));
assertTrue("SETTING_GRPC_EXECUTOR_COUNT should be included", settings.contains(SETTING_GRPC_EXECUTOR_COUNT));
assertTrue("SETTING_GRPC_PUBLISH_PORT should be included", settings.contains(SETTING_GRPC_PUBLISH_PORT));
assertTrue(
"SETTING_GRPC_MAX_CONCURRENT_CONNECTION_CALLS should be included",
Expand All @@ -122,7 +124,7 @@ public void testGetSettings() {
assertTrue("SETTING_GRPC_KEEPALIVE_TIMEOUT should be included", settings.contains(SETTING_GRPC_KEEPALIVE_TIMEOUT));

// Verify the number of settings
assertEquals("Should return 12 settings", 12, settings.size());
assertEquals("Should return 13 settings", 13, settings.size());
}

public void testGetQueryUtilsBeforeCreateComponents() {
Expand Down
Loading
Loading