Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class ExtensionsOrchestrator implements ReportingService<PluginsAndModule
public static final String REQUEST_EXTENSION_LOCAL_NODE = "internal:discovery/localnode";
public static final String REQUEST_EXTENSION_CLUSTER_SETTINGS = "internal:discovery/clustersettings";
public static final String REQUEST_EXTENSION_REGISTER_REST_ACTIONS = "internal:discovery/registerrestactions";
public static final String REQUEST_EXTENSION_REGISTER_TRANSPORT_ACTIONS = "internal:discovery/registertransportactions";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we having separate action for extensions?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an open issue: #4334. Lets do a sweep all together in a dedicated PR.

public static final String REQUEST_OPENSEARCH_NAMED_WRITEABLE_REGISTRY = "internal:discovery/namedwriteableregistry";
public static final String REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE = "internal:discovery/parsenamedwriteable";
public static final String REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION = "internal:extensions/restexecuteonextensiontaction";
Expand Down Expand Up @@ -186,6 +187,14 @@ private void registerRequestHandler() {
ExtensionRequest::new,
((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request)))
);
transportService.registerRequestHandler(
REQUEST_EXTENSION_REGISTER_TRANSPORT_ACTIONS,
ThreadPool.Names.GENERIC,
false,
false,
RegisterTransportActionsRequest::new,
((request, channel, task) -> channel.sendResponse(handleRegisterTransportActionsRequest(request)))
);
}

@Override
Expand Down Expand Up @@ -308,6 +317,22 @@ public String executor() {
}
}

/**
* Handles a {@link RegisterTransportActionsRequest}.
*
* @param transportActionsRequest The request to handle.
* @return A {@link ExtensionBooleanResponse} indicating success.
* @throws Exception if the request is not handled properly.
*/
TransportResponse handleRegisterTransportActionsRequest(RegisterTransportActionsRequest transportActionsRequest) throws Exception {
/*
* TODO: https://github.com/opensearch-project/opensearch-sdk-java/issues/107
* Register these new Transport Actions with ActionModule
* and add support for NodeClient to recognise these actions when making transport calls.
*/
return new ExtensionBooleanResponse(true);
}

/**
* Handles an {@link ExtensionRequest}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportRequest;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* Request to register extension Transport actions
*
* @opensearch.internal
*/
public class RegisterTransportActionsRequest extends TransportRequest {
private Map<String, Class> transportActions;

public RegisterTransportActionsRequest(Map<String, Class> transportActions) {
this.transportActions = new HashMap<>(transportActions);
}

public RegisterTransportActionsRequest(StreamInput in) throws IOException {
super(in);
Map<String, Class> actions = new HashMap<>();
int actionCount = in.readVInt();
for (int i = 0; i < actionCount; i++) {
try {
String actionName = in.readString();
Class transportAction = Class.forName(in.readString());
actions.put(actionName, transportAction);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Could not read transport action");
}
}
this.transportActions = actions;
}

public Map<String, Class> getTransportActions() {
return transportActions;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(this.transportActions.size());
for (Map.Entry<String, Class> action : transportActions.entrySet()) {
out.writeString(action.getKey());
out.writeString(action.getValue().getName());
}
}

@Override
public String toString() {
return "TransportActionsRequest{actions=" + transportActions + "}";
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
RegisterTransportActionsRequest that = (RegisterTransportActionsRequest) obj;
return Objects.equals(transportActions, that.transportActions);
}

@Override
public int hashCode() {
return Objects.hash(transportActions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,7 @@ public void testRegisterHandler() throws Exception {
);

extensionsOrchestrator.initializeServicesAndRestHandler(restController, mockTransportService, clusterService);
verify(mockTransportService, times(4)).registerRequestHandler(anyString(), anyString(), anyBoolean(), anyBoolean(), any(), any());

verify(mockTransportService, times(5)).registerRequestHandler(anyString(), anyString(), anyBoolean(), anyBoolean(), any(), any());
}

private static class Example implements NamedWriteable {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions;

import org.junit.Before;
import org.opensearch.common.collect.Map;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;

public class RegisterTransportActionsRequestTests extends OpenSearchTestCase {
private RegisterTransportActionsRequest originalRequest;

@Before
public void setup() {
this.originalRequest = new RegisterTransportActionsRequest(Map.of("testAction", Map.class));
}

public void testRegisterTransportActionsRequest() throws IOException {
BytesStreamOutput output = new BytesStreamOutput();
originalRequest.writeTo(output);
StreamInput input = output.bytes().streamInput();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Comment) I realize that close() is a noop here so these don't need to be closed, but some IDE linters will complain that a resource isn't closed. Personal preference of mine. :)

RegisterTransportActionsRequest parsedRequest = new RegisterTransportActionsRequest(input);
assertEquals(parsedRequest.getTransportActions(), originalRequest.getTransportActions());
assertEquals(parsedRequest.getTransportActions().get("testAction"), originalRequest.getTransportActions().get("testAction"));
assertEquals(parsedRequest.getTransportActions().size(), originalRequest.getTransportActions().size());
assertEquals(parsedRequest.hashCode(), originalRequest.hashCode());
assertTrue(originalRequest.equals(parsedRequest));
}

public void testToString() {
assertEquals(originalRequest.toString(), "TransportActionsRequest{actions={testAction=class org.opensearch.common.collect.Map}}");
}
}