Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Adding create component extension point support for AD ([#4517](https://github.com/opensearch-project/OpenSearch/pull/4517))
- Add getSettings support for AD([#4519](https://github.com/opensearch-project/OpenSearch/pull/4519))
- Fixed javadoc warning for build failure([#4581](https://github.com/opensearch-project/OpenSearch/pull/4581))
- Added transport actions support for extensions ([#4598](https://github.com/opensearch-project/OpenSearch/pull/4598/))
- Pass REST params and content to extensions ([#4633](https://github.com/opensearch-project/OpenSearch/pull/4633))

## [2.x]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.extensions.action.ExtensionProxyAction;
import org.opensearch.extensions.action.ExtensionTransportAction;
import org.opensearch.index.seqno.RetentionLeaseActions;
import org.opensearch.indices.SystemIndices;
import org.opensearch.indices.breaker.CircuitBreakerService;
Expand Down Expand Up @@ -696,6 +698,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
// Remote Store
actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class);

// ExtensionProxyAction
actions.register(ExtensionProxyAction.INSTANCE, ExtensionTransportAction.class);

// Decommission actions
actions.register(DecommissionAction.INSTANCE, TransportDecommissionAction.class);
actions.register(GetDecommissionStateAction.INSTANCE, TransportGetDecommissionStateAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
/**
* Generic string response indicating the status of some previous request sent to the SDK
*
* @opensearch.internal
* @opensearch.api
*/
public class ExtensionStringResponse extends TransportResponse {
private String response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterSettingsResponse;
import org.opensearch.cluster.LocalNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -39,6 +40,10 @@
import org.opensearch.discovery.InitializeExtensionsRequest;
import org.opensearch.discovery.InitializeExtensionsResponse;
import org.opensearch.extensions.ExtensionsSettings.Extension;
import org.opensearch.extensions.action.ExtensionActionRequest;
import org.opensearch.extensions.action.ExtensionActionResponse;
import org.opensearch.extensions.action.ExtensionTransportActionsHandler;
import org.opensearch.extensions.action.TransportActionRequestFromExtension;
import org.opensearch.extensions.rest.RegisterRestActionsRequest;
import org.opensearch.extensions.rest.RestActionsRequestHandler;
import org.opensearch.extensions.settings.CustomSettingsRequestHandler;
Expand Down Expand Up @@ -83,6 +88,9 @@ public class ExtensionsOrchestrator implements ReportingService<PluginsAndModule
public static final String REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE = "internal:discovery/parsenamedwriteable";
public static final String REQUEST_EXTENSION_ACTION_LISTENER_ON_FAILURE = "internal:extensions/actionlisteneronfailure";
public static final String REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION = "internal:extensions/restexecuteonextensiontaction";
public static final String REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION = "internal:extensions/handle-transportaction";
public static final String TRANSPORT_ACTION_REQUEST_FROM_EXTENSION = "internal:extensions/request-transportaction-from-extension";
public static final int EXTENSION_REQUEST_WAIT_TIMEOUT = 10;

private static final Logger logger = LogManager.getLogger(ExtensionsOrchestrator.class);

Expand Down Expand Up @@ -113,6 +121,7 @@ public static enum OpenSearchRequestType {
}

private final Path extensionsPath;
ExtensionTransportActionsHandler extensionTransportActionsHandler;
// A list of initialized extensions, a subset of the values of map below which includes all extensions
List<DiscoveryExtension> extensionsInitializedList;
// A map of extension uniqueId to full extension details used for node transport here and in the RestActionsRequestHandler
Expand All @@ -126,6 +135,7 @@ public static enum OpenSearchRequestType {
ExtensionActionListenerHandler listenerHandler;
EnvironmentSettingsRequestHandler environmentSettingsRequestHandler;
AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler;
NodeClient client;

/**
* Instantiate a new ExtensionsOrchestrator object to handle requests and responses from extensions. This is called during Node bootstrap.
Expand All @@ -137,12 +147,15 @@ public static enum OpenSearchRequestType {
public ExtensionsOrchestrator(Settings settings, Path extensionsPath) throws IOException {
logger.info("ExtensionsOrchestrator initialized");
this.extensionsPath = extensionsPath;
this.transportService = null;
this.listener = new ExtensionActionListener();
this.extensionsInitializedList = new ArrayList<DiscoveryExtension>();
this.extensionIdMap = new HashMap<String, DiscoveryExtension>();
// will be initialized in initializeServicesAndRestHandler which is called after the Node is initialized
this.transportService = null;
this.clusterService = null;
this.namedWriteableRegistry = null;
this.listener = new ExtensionActionListener();
this.client = null;
this.extensionTransportActionsHandler = null;

/*
* Now Discover extensions
Expand All @@ -160,13 +173,15 @@ public ExtensionsOrchestrator(Settings settings, Path extensionsPath) throws IOE
* @param transportService The Node's transport service.
* @param clusterService The Node's cluster service.
* @param initialEnvironmentSettings The finalized view of settings for the Environment
* @param client The client used to make transport requests
*/
public void initializeServicesAndRestHandler(
RestController restController,
SettingsModule settingsModule,
TransportService transportService,
ClusterService clusterService,
Settings initialEnvironmentSettings
Settings initialEnvironmentSettings,
NodeClient client
) {
this.restActionsRequestHandler = new RestActionsRequestHandler(restController, extensionIdMap, transportService);
this.listenerHandler = new ExtensionActionListenerHandler(listener);
Expand All @@ -179,9 +194,20 @@ public void initializeServicesAndRestHandler(
transportService,
REQUEST_EXTENSION_UPDATE_SETTINGS
);
this.client = client;
this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler(extensionIdMap, transportService, client);
registerRequestHandler();
}

/**
* Handles Transport Request from {@link org.opensearch.extensions.action.ExtensionTransportAction} which was invoked by an extension via {@link ExtensionTransportActionsHandler}.
*
* @param request which was sent by an extension.
*/
public ExtensionActionResponse handleTransportRequest(ExtensionActionRequest request) throws InterruptedException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleActionRequest?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, if you like that better.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like handleTransportAction to differentiate it from handleRestAction etc.

Copy link
Copy Markdown
Member Author

@saratvemulapalli saratvemulapalli Oct 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it as handleTransportAction for now. @owaiskazi19 let us know if you strongly disagree :)

return extensionTransportActionsHandler.sendTransportRequestToExtension(request);
}

private void registerRequestHandler() {
transportService.registerRequestHandler(
REQUEST_EXTENSION_REGISTER_REST_ACTIONS,
Expand Down Expand Up @@ -255,7 +281,19 @@ private void registerRequestHandler() {
false,
false,
RegisterTransportActionsRequest::new,
((request, channel, task) -> channel.sendResponse(handleRegisterTransportActionsRequest(request)))
((request, channel, task) -> channel.sendResponse(
extensionTransportActionsHandler.handleRegisterTransportActionsRequest(request)
))
);
transportService.registerRequestHandler(
TRANSPORT_ACTION_REQUEST_FROM_EXTENSION,
ThreadPool.Names.GENERIC,
false,
false,
TransportActionRequestFromExtension::new,
((request, channel, task) -> channel.sendResponse(
extensionTransportActionsHandler.handleTransportActionRequestFromExtension(request)
))
);
}

Expand Down Expand Up @@ -373,28 +411,12 @@ public String executor() {
new InitializeExtensionsRequest(transportService.getLocalNode(), extension),
extensionResponseHandler
);
inProgressLatch.await(100, TimeUnit.SECONDS);
inProgressLatch.await(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS);
} catch (Exception e) {
logger.error(e.toString());
}
}

/**
* Handles a {@link RegisterTransportActionsRequest}.
*
* @param transportActionsRequest The request to handle.
* @return A {@link ExtensionBooleanResponse} indicating success.
* @throws Exception if the request is not handled properly.
*/
TransportResponse handleRegisterTransportActionsRequest(RegisterTransportActionsRequest transportActionsRequest) throws Exception {
/*
* TODO: https://github.com/opensearch-project/opensearch-sdk-java/issues/107
* Register these new Transport Actions with ActionModule
* and add support for NodeClient to recognise these actions when making transport calls.
*/
return new ExtensionBooleanResponse(true);
}

/**
* Handles an {@link ExtensionRequest}.
*
Expand Down Expand Up @@ -483,7 +505,7 @@ public void beforeIndexRemoved(
/*
* Making async synchronous for now.
*/
inProgressIndexNameLatch.await(100, TimeUnit.SECONDS);
inProgressIndexNameLatch.await(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS);
logger.info("Received ack response from Extension");
} catch (Exception e) {
logger.error(e.toString());
Expand Down Expand Up @@ -517,7 +539,7 @@ public String executor() {
/*
* Making asynchronous for now.
*/
inProgressLatch.await(100, TimeUnit.SECONDS);
inProgressLatch.await(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS);
logger.info("Received response from Extension");
} catch (Exception e) {
logger.error(e.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
* @opensearch.internal
*/
public class RegisterTransportActionsRequest extends TransportRequest {
private String uniqueId;
private Map<String, Class> transportActions;

public RegisterTransportActionsRequest(Map<String, Class> transportActions) {
public RegisterTransportActionsRequest(String uniqueId, Map<String, Class> transportActions) {
this.uniqueId = uniqueId;
this.transportActions = new HashMap<>(transportActions);
}

public RegisterTransportActionsRequest(StreamInput in) throws IOException {
super(in);
this.uniqueId = in.readString();
Map<String, Class> actions = new HashMap<>();
int actionCount = in.readVInt();
for (int i = 0; i < actionCount; i++) {
Expand All @@ -45,13 +48,18 @@ public RegisterTransportActionsRequest(StreamInput in) throws IOException {
this.transportActions = actions;
}

public String getUniqueId() {
return uniqueId;
}

public Map<String, Class> getTransportActions() {
return transportActions;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(uniqueId);
out.writeVInt(this.transportActions.size());
for (Map.Entry<String, Class> action : transportActions.entrySet()) {
out.writeString(action.getKey());
Expand All @@ -61,19 +69,19 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public String toString() {
return "TransportActionsRequest{actions=" + transportActions + "}";
return "TransportActionsRequest{uniqueId=" + uniqueId + ", actions=" + transportActions + "}";
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
RegisterTransportActionsRequest that = (RegisterTransportActionsRequest) obj;
return Objects.equals(transportActions, that.transportActions);
return Objects.equals(uniqueId, that.uniqueId) && Objects.equals(transportActions, that.transportActions);
}

@Override
public int hashCode() {
return Objects.hash(transportActions);
return Objects.hash(uniqueId, transportActions);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions.action;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* This class translates Extension transport request to ActionRequest
* which is internally used to make transport action call.
*
* @opensearch.internal
*/
public class ExtensionActionRequest extends ActionRequest {
/**
* action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}.
*/
private final String action;
/**
* requestBytes is the raw bytes being transported between extensions.
*/
private final byte[] requestBytes;

/**
* ExtensionActionRequest constructor.
*
* @param action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}.
* @param requestBytes is the raw bytes being transported between extensions.
*/
public ExtensionActionRequest(String action, byte[] requestBytes) {
this.action = action;
this.requestBytes = requestBytes;
}

/**
* ExtensionActionRequest constructor from {@link StreamInput}.
*
* @param in bytes stream input used to de-serialize the message.
* @throws IOException when message de-serialization fails.
*/
ExtensionActionRequest(StreamInput in) throws IOException {
super(in);
action = in.readString();
requestBytes = in.readByteArray();
}

public String getAction() {
return action;
}

public byte[] getRequestBytes() {
return requestBytes;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(action);
out.writeByteArray(requestBytes);
}

@Override
public ActionRequestValidationException validate() {
return null;
}
}
Loading