Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Indexed IP field supports `terms_query` with more than 1025 IP masks [#16391](https://github.com/opensearch-project/OpenSearch/pull/16391)
- Make entries for dependencies from server/build.gradle to gradle version catalog ([#16707](https://github.com/opensearch-project/OpenSearch/pull/16707))
- Add listenable TransportRequestHandler in TransportNodesAction ([#15166](https://github.com/opensearch-project/OpenSearch/pull/15166))

### Deprecated
- Performing update operation with default pipeline or final pipeline is deprecated ([#16712](https://github.com/opensearch-project/OpenSearch/pull/16712))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,50 @@ protected TransportNodesAction(
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler());
}

/**
* @param actionName action name
* @param threadPool thread-pool
* @param clusterService cluster service
* @param transportService transport service
* @param actionFilters action filters
* @param request node request writer
* @param nodeRequest node request reader
* @param nodeExecutor executor to execute node action on
* @param finalExecutor executor to execute final collection of all responses on
* @param listenableHandler true if the handler should be a listenable handler
* @param nodeResponseClass class of the node responses
*/
protected TransportNodesAction(
String actionName,
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
Writeable.Reader<NodesRequest> request,
Writeable.Reader<NodeRequest> nodeRequest,
String nodeExecutor,
String finalExecutor,
boolean listenableHandler,
Class<NodeResponse> nodeResponseClass
) {
super(actionName, transportService, actionFilters, request);
this.threadPool = threadPool;
this.clusterService = Objects.requireNonNull(clusterService);
this.transportService = Objects.requireNonNull(transportService);
this.nodeResponseClass = Objects.requireNonNull(nodeResponseClass);

this.transportNodeAction = actionName + "[n]";
this.finalExecutor = finalExecutor;
if (listenableHandler) {
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new ListenableNodeTransportHandler());
} else {
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler());
}
}

/**
* Same as {@link #TransportNodesAction(String, ThreadPool, ClusterService, TransportService, ActionFilters, Writeable.Reader,
* Writeable.Reader, String, String, Class)} but executes final response collection on the transport thread except for when the final
* Writeable.Reader, String, String, boolean, Class)} but executes final response collection on the transport thread except for when the final
* node response is received from the local node, in which case {@code nodeExecutor} is used.
* This constructor should only be used for actions for which the creation of the final response is fast enough to be safely executed
* on a transport thread.
Expand All @@ -144,6 +185,7 @@ protected TransportNodesAction(
nodeRequest,
nodeExecutor,
ThreadPool.Names.SAME,
false,
nodeResponseClass
);
}
Expand Down Expand Up @@ -196,6 +238,8 @@ protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray<?

protected abstract NodeResponse nodeOperation(NodeRequest request);

protected void nodeOperation(NodeRequest request, ActionListener<NodeResponse> actionListener) {}

protected NodeResponse nodeOperation(NodeRequest request, Task task) {
return nodeOperation(request);
}
Expand Down Expand Up @@ -335,4 +379,14 @@ public void messageReceived(NodeRequest request, TransportChannel channel, Task
}
}

class ListenableNodeTransportHandler implements TransportRequestHandler<NodeRequest> {

@Override
public void messageReceived(NodeRequest request, TransportChannel channel, Task task) {
ActionListener<NodeResponse> listener = ActionListener.wrap(channel::sendResponse, e -> {
TransportChannel.sendErrorResponse(channel, actionName, request, e);
});
nodeOperation(request, listener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,19 @@
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeService;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;
import org.junit.After;
Expand All @@ -74,9 +77,12 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.mockito.ArgumentCaptor;

import static org.opensearch.test.ClusterServiceUtils.createClusterService;
import static org.opensearch.test.ClusterServiceUtils.setState;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class TransportNodesActionTests extends OpenSearchTestCase {

Expand Down Expand Up @@ -198,6 +204,28 @@ public void testTransportNodesActionWithDiscoveryNodesReset() {
capturedTransportNodeRequestList.forEach(capturedRequest -> assertNull(capturedRequest.testNodesRequest.concreteNodes()));
}

public void testCreateTransportNodesActionWithListenableHandler() {
TransportNodesAction action = getListenableHandlerTestTransportNodesAction();
assertTrue(
transport.getRequestHandlers()
.getHandler(action.actionName + "[n]")
.getHandler() instanceof TransportNodesAction.ListenableNodeTransportHandler
);
}

public void testMessageReceivedInListenableNodeTransportHandler() throws Exception {
TransportNodesAction action = getListenableHandlerTestTransportNodesAction();
TransportChannel transportChannel = mock(TransportChannel.class);
transport.getRequestHandlers()
.getHandler(action.actionName + "[n]")
.getHandler()
.messageReceived(new TestNodeRequest(), transportChannel, mock(Task.class));
ArgumentCaptor<TestNodeResponse> argCaptor = ArgumentCaptor.forClass(TestNodeResponse.class);
verify(transportChannel).sendResponse(argCaptor.capture());
TestNodeResponse response = argCaptor.getValue();
assertNotNull(response);
}

private <T> List<T> mockList(Supplier<T> supplier, int size) {
List<T> failures = new ArrayList<>(size);
for (int i = 0; i < size; ++i) {
Expand Down Expand Up @@ -290,6 +318,19 @@ public TestTransportNodesAction getTestTransportNodesAction() {
);
}

public TestTransportNodesAction getListenableHandlerTestTransportNodesAction() {
return new TestTransportNodesAction(
THREAD_POOL,
clusterService,
transportService,
new ActionFilters(Collections.emptySet()),
TestNodesRequest::new,
TestNodeRequest::new,
ThreadPool.Names.SAME,
true
);
}

public DataNodesOnlyTransportNodesAction getDataNodesOnlyTransportNodesAction(TransportService transportService) {
return new DataNodesOnlyTransportNodesAction(
THREAD_POOL,
Expand Down Expand Up @@ -335,6 +376,31 @@ private static class TestTransportNodesAction extends TransportNodesAction<
);
}

TestTransportNodesAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
Writeable.Reader<TestNodesRequest> request,
Writeable.Reader<TestNodeRequest> nodeRequest,
String nodeExecutor,
boolean listenableHandler
) {
super(
"indices:admin/test",
threadPool,
clusterService,
transportService,
actionFilters,
request,
nodeRequest,
nodeExecutor,
nodeExecutor,
listenableHandler,
TestNodeResponse.class
);
}

@Override
protected TestNodesResponse newResponse(
TestNodesRequest request,
Expand All @@ -359,6 +425,11 @@ protected TestNodeResponse nodeOperation(TestNodeRequest request) {
return new TestNodeResponse();
}

@Override
protected void nodeOperation(TestNodeRequest request, ActionListener<TestNodeResponse> actionListener) {
actionListener.onResponse(new TestNodeResponse());
}

}

private static class DataNodesOnlyTransportNodesAction extends TestTransportNodesAction {
Expand Down