-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[Feature/extensions] Adding Transport Actions support for extensions #4598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a4c164e
1e60cf0
e1e3b02
5bc91ed
3dc6f53
2bb61d1
201d7be
9fd0fd2
a82bffa
8fb2911
f7747a1
1dc24ee
8bdb469
ba8b56d
6b59cd0
02841db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ | |
| import org.opensearch.Version; | ||
| import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; | ||
| import org.opensearch.action.admin.cluster.state.ClusterStateResponse; | ||
| import org.opensearch.client.node.NodeClient; | ||
| import org.opensearch.cluster.ClusterSettingsResponse; | ||
| import org.opensearch.cluster.LocalNodeResponse; | ||
| import org.opensearch.cluster.node.DiscoveryNode; | ||
|
|
@@ -39,6 +40,10 @@ | |
| import org.opensearch.discovery.InitializeExtensionsRequest; | ||
| import org.opensearch.discovery.InitializeExtensionsResponse; | ||
| import org.opensearch.extensions.ExtensionsSettings.Extension; | ||
| import org.opensearch.extensions.action.ExtensionActionRequest; | ||
| import org.opensearch.extensions.action.ExtensionActionResponse; | ||
| import org.opensearch.extensions.action.ExtensionTransportActionsHandler; | ||
| import org.opensearch.extensions.action.TransportActionRequestFromExtension; | ||
| import org.opensearch.extensions.rest.RegisterRestActionsRequest; | ||
| import org.opensearch.extensions.rest.RestActionsRequestHandler; | ||
| import org.opensearch.extensions.settings.CustomSettingsRequestHandler; | ||
|
|
@@ -83,6 +88,9 @@ public class ExtensionsOrchestrator implements ReportingService<PluginsAndModule | |
| public static final String REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE = "internal:discovery/parsenamedwriteable"; | ||
| public static final String REQUEST_EXTENSION_ACTION_LISTENER_ON_FAILURE = "internal:extensions/actionlisteneronfailure"; | ||
| public static final String REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION = "internal:extensions/restexecuteonextensiontaction"; | ||
| public static final String REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION = "internal:extensions/handle-transportaction"; | ||
| public static final String TRANSPORT_ACTION_REQUEST_FROM_EXTENSION = "internal:extensions/request-transportaction-from-extension"; | ||
| public static final int EXTENSION_REQUEST_WAIT_TIMEOUT = 10; | ||
|
|
||
| private static final Logger logger = LogManager.getLogger(ExtensionsOrchestrator.class); | ||
|
|
||
|
|
@@ -113,6 +121,7 @@ public static enum OpenSearchRequestType { | |
| } | ||
|
|
||
| private final Path extensionsPath; | ||
| ExtensionTransportActionsHandler extensionTransportActionsHandler; | ||
| // A list of initialized extensions, a subset of the values of map below which includes all extensions | ||
| List<DiscoveryExtension> extensionsInitializedList; | ||
| // A map of extension uniqueId to full extension details used for node transport here and in the RestActionsRequestHandler | ||
|
|
@@ -126,6 +135,7 @@ public static enum OpenSearchRequestType { | |
| ExtensionActionListenerHandler listenerHandler; | ||
| EnvironmentSettingsRequestHandler environmentSettingsRequestHandler; | ||
| AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler; | ||
| NodeClient client; | ||
|
|
||
| /** | ||
| * Instantiate a new ExtensionsOrchestrator object to handle requests and responses from extensions. This is called during Node bootstrap. | ||
|
|
@@ -137,12 +147,15 @@ public static enum OpenSearchRequestType { | |
| public ExtensionsOrchestrator(Settings settings, Path extensionsPath) throws IOException { | ||
| logger.info("ExtensionsOrchestrator initialized"); | ||
| this.extensionsPath = extensionsPath; | ||
| this.transportService = null; | ||
| this.listener = new ExtensionActionListener(); | ||
| this.extensionsInitializedList = new ArrayList<DiscoveryExtension>(); | ||
| this.extensionIdMap = new HashMap<String, DiscoveryExtension>(); | ||
| // will be initialized in initializeServicesAndRestHandler which is called after the Node is initialized | ||
| this.transportService = null; | ||
| this.clusterService = null; | ||
| this.namedWriteableRegistry = null; | ||
| this.listener = new ExtensionActionListener(); | ||
| this.client = null; | ||
| this.extensionTransportActionsHandler = null; | ||
|
|
||
| /* | ||
| * Now Discover extensions | ||
|
|
@@ -160,13 +173,15 @@ public ExtensionsOrchestrator(Settings settings, Path extensionsPath) throws IOE | |
| * @param transportService The Node's transport service. | ||
| * @param clusterService The Node's cluster service. | ||
| * @param initialEnvironmentSettings The finalized view of settings for the Environment | ||
| * @param client The client used to make transport requests | ||
| */ | ||
| public void initializeServicesAndRestHandler( | ||
| RestController restController, | ||
| SettingsModule settingsModule, | ||
| TransportService transportService, | ||
| ClusterService clusterService, | ||
| Settings initialEnvironmentSettings | ||
| Settings initialEnvironmentSettings, | ||
| NodeClient client | ||
| ) { | ||
| this.restActionsRequestHandler = new RestActionsRequestHandler(restController, extensionIdMap, transportService); | ||
| this.listenerHandler = new ExtensionActionListenerHandler(listener); | ||
|
|
@@ -179,9 +194,20 @@ public void initializeServicesAndRestHandler( | |
| transportService, | ||
| REQUEST_EXTENSION_UPDATE_SETTINGS | ||
| ); | ||
| this.client = client; | ||
| this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler(extensionIdMap, transportService, client); | ||
| registerRequestHandler(); | ||
| } | ||
|
|
||
| /** | ||
| * Handles Transport Request from {@link org.opensearch.extensions.action.ExtensionTransportAction} which was invoked by an extension via {@link ExtensionTransportActionsHandler}. | ||
| * | ||
| * @param request which was sent by an extension. | ||
| */ | ||
| public ExtensionActionResponse handleTransportRequest(ExtensionActionRequest request) throws InterruptedException { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, if you like that better.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll leave it as |
||
| return extensionTransportActionsHandler.sendTransportRequestToExtension(request); | ||
| } | ||
|
|
||
| private void registerRequestHandler() { | ||
| transportService.registerRequestHandler( | ||
| REQUEST_EXTENSION_REGISTER_REST_ACTIONS, | ||
|
|
@@ -255,7 +281,19 @@ private void registerRequestHandler() { | |
| false, | ||
| false, | ||
| RegisterTransportActionsRequest::new, | ||
| ((request, channel, task) -> channel.sendResponse(handleRegisterTransportActionsRequest(request))) | ||
| ((request, channel, task) -> channel.sendResponse( | ||
| extensionTransportActionsHandler.handleRegisterTransportActionsRequest(request) | ||
| )) | ||
| ); | ||
| transportService.registerRequestHandler( | ||
| TRANSPORT_ACTION_REQUEST_FROM_EXTENSION, | ||
| ThreadPool.Names.GENERIC, | ||
| false, | ||
| false, | ||
| TransportActionRequestFromExtension::new, | ||
| ((request, channel, task) -> channel.sendResponse( | ||
| extensionTransportActionsHandler.handleTransportActionRequestFromExtension(request) | ||
| )) | ||
| ); | ||
| } | ||
|
|
||
|
|
@@ -373,28 +411,12 @@ public String executor() { | |
| new InitializeExtensionsRequest(transportService.getLocalNode(), extension), | ||
| extensionResponseHandler | ||
| ); | ||
| inProgressLatch.await(100, TimeUnit.SECONDS); | ||
| inProgressLatch.await(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); | ||
| } catch (Exception e) { | ||
| logger.error(e.toString()); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Handles a {@link RegisterTransportActionsRequest}. | ||
| * | ||
| * @param transportActionsRequest The request to handle. | ||
| * @return A {@link ExtensionBooleanResponse} indicating success. | ||
| * @throws Exception if the request is not handled properly. | ||
| */ | ||
| TransportResponse handleRegisterTransportActionsRequest(RegisterTransportActionsRequest transportActionsRequest) throws Exception { | ||
| /* | ||
| * TODO: https://github.com/opensearch-project/opensearch-sdk-java/issues/107 | ||
| * Register these new Transport Actions with ActionModule | ||
| * and add support for NodeClient to recognise these actions when making transport calls. | ||
| */ | ||
| return new ExtensionBooleanResponse(true); | ||
| } | ||
|
|
||
| /** | ||
| * Handles an {@link ExtensionRequest}. | ||
| * | ||
|
|
@@ -483,7 +505,7 @@ public void beforeIndexRemoved( | |
| /* | ||
| * Making async synchronous for now. | ||
| */ | ||
| inProgressIndexNameLatch.await(100, TimeUnit.SECONDS); | ||
| inProgressIndexNameLatch.await(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); | ||
| logger.info("Received ack response from Extension"); | ||
| } catch (Exception e) { | ||
| logger.error(e.toString()); | ||
|
|
@@ -517,7 +539,7 @@ public String executor() { | |
| /* | ||
| * Making asynchronous for now. | ||
| */ | ||
| inProgressLatch.await(100, TimeUnit.SECONDS); | ||
| inProgressLatch.await(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); | ||
| logger.info("Received response from Extension"); | ||
| } catch (Exception e) { | ||
| logger.error(e.toString()); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| /* | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| package org.opensearch.extensions.action; | ||
|
|
||
| import org.opensearch.action.ActionRequest; | ||
| import org.opensearch.action.ActionRequestValidationException; | ||
| import org.opensearch.common.io.stream.StreamInput; | ||
| import org.opensearch.common.io.stream.StreamOutput; | ||
|
|
||
| import java.io.IOException; | ||
|
|
||
| /** | ||
| * This class translates Extension transport request to ActionRequest | ||
| * which is internally used to make transport action call. | ||
| * | ||
| * @opensearch.internal | ||
| */ | ||
| public class ExtensionActionRequest extends ActionRequest { | ||
| /** | ||
| * action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. | ||
| */ | ||
| private final String action; | ||
| /** | ||
| * requestBytes is the raw bytes being transported between extensions. | ||
| */ | ||
| private final byte[] requestBytes; | ||
|
|
||
| /** | ||
| * ExtensionActionRequest constructor. | ||
| * | ||
| * @param action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. | ||
| * @param requestBytes is the raw bytes being transported between extensions. | ||
| */ | ||
| public ExtensionActionRequest(String action, byte[] requestBytes) { | ||
saratvemulapalli marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| this.action = action; | ||
| this.requestBytes = requestBytes; | ||
| } | ||
|
|
||
| /** | ||
| * ExtensionActionRequest constructor from {@link StreamInput}. | ||
| * | ||
| * @param in bytes stream input used to de-serialize the message. | ||
| * @throws IOException when message de-serialization fails. | ||
| */ | ||
| ExtensionActionRequest(StreamInput in) throws IOException { | ||
| super(in); | ||
| action = in.readString(); | ||
| requestBytes = in.readByteArray(); | ||
| } | ||
|
|
||
| public String getAction() { | ||
| return action; | ||
| } | ||
|
|
||
| public byte[] getRequestBytes() { | ||
| return requestBytes; | ||
| } | ||
|
|
||
| @Override | ||
| public void writeTo(StreamOutput out) throws IOException { | ||
| super.writeTo(out); | ||
| out.writeString(action); | ||
| out.writeByteArray(requestBytes); | ||
| } | ||
|
|
||
| @Override | ||
| public ActionRequestValidationException validate() { | ||
| return null; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.