Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ During the bootstrap of OpenSearch node, it class loads all the code under `~/pl
![](Docs/Extensions.png)

Extensions are independent processes which are built using `opensearch-sdk-java`. They communicate with OpenSearch via [transport](https://github.com/opensearch-project/OpenSearch/tree/main/modules/transport-netty4) protocol which today is used to communicate between OpenSearch nodes.

Extensions are designed to extend features via transport APIs which are exposed using extension points of OpenSearch.

### Discovery
Extensions are discovered and configured via `extensions.yml`, same way we currently have `plugin-descriptor.properties` which is read by OpenSearch during the node bootstrap. `ExtensionsOrchestrator` reads through the config file at `~/extensions` and registers extensions within OpenSearch.

Extensions are discovered and configured via `extensions.yml`, the same way we currently have `plugin-descriptor.properties` which is read by OpenSearch during the node bootstrap. `ExtensionsOrchestrator` reads through the config file at `~/extensions` and registers extensions within OpenSearch.

Here is an example extension configuration `extensions.yml`:

```
Expand All @@ -51,16 +54,41 @@ extensions:
opensearchVersion: '3.0.0' // OpenSearch compatibility
```


### Communication
As we are running extensions on the port defined in the `extensions.yml`, the communication between OpenSearch and Extensions happens using a ServerSocket which binds the port and the host address. OpenSearch will initialize the extensions during the bootstrap by making a request to all the extensions running on different ports and thus creating a medium for the future requests.

### OpenSearch SDK Java
Currently, plugins relies on extension points to communicate with OpenSearch. To turn plugins into extensions, all the extension points should be converted into Transport APIs which will be present in the SDK. Plugins need to integrate SDK, call those APIs, and later SDK will take care of the communication and the required attributes from OpenSearch.
Extensions will use a ServerSocket which binds them listen on a host address and port defined in their configuration file. Each type of incoming request will invoke code from an associated handler.

OpenSearch will have its own configuration file, presently `extensions.yml`, matching these addresses and ports. On startup, the ExtensionsOrchestrator will use the node's TransportService to communicate its requests to each extension, with the first request initializing the extension and validating the host and port.

Immediately following initialization, each extension will establish a connection to OpenSearch on its own transport service, and send its REST API (a list of methods and URIs to which it will respond). These will be registered with the RestController.

When OpenSearch receives a registered method and URI, it will send the request to the Extension. The extension will appropriately handle the request, using the API to determine which Action to execute.

### OpenSearch SDK for Java

Currently, plugins rely on extension points to communicate with OpenSearch. These are represented as Actions. To turn plugins into extensions, the Extension must assemble a list of all methods and URIs to communicate to OpenSearch, where they will be registered; upon receiving a matching request from a user these will be forwarded back to the Extension and the Extension will further need to handle these registered methods and URIs with an appropriate Action.

### Extension Walk Through

1. Extensions are started up and must be running before OpenSearch is started. (In the future, there will be a facility to refresh the extension list during operation and handle network communication interruptions.)

2. OpenSearch is started. During its bootstrap, the `ExtensionsOrchestrator` is initialized, reading a list of extensions present in `extensions.yml`.

3. The Node bootstrapping OpenSearch sends its `RestController`, `TransportService`, and `ClusterService` objects to the `ExtensionsOrchestrator` which initializes a `RestActionsRequestHandler` object. This completes the `ExtensionsOrchestrator` initialization.

4. The `ExtensionsOrchestrator` iterates over its configured list of extensions, sending an initialization request to each one, tracking those that respond, and initializing the `ExtensionNamedWriteableRegistry`.

5. After each Extension responds to the initialization request, it sends its REST API, a list of methods and URIs.

6. The `RestActionsRequestHandler` registers these method/URI combinations in the `RestController` as the `routes()` that extension will handle. This step relies on a globally unique identifier for the extension which users will use in REST requests, presently the Extension's `uniqueId`.

At a later time:

7. Users send REST requests to OpenSearch.

8. If the requests match the registered path/URI and `routes()` of an extension, the `RestRequest` is forwarded to the Extension, and the user receives an ACCEPTED (202) response.

### Settings
Walking through a similar example as plugin above, after extension registration is done, extension makes an API call to register custom settings to OpenSearch.
`ExtensionsOrchestrator` receives the requests, forwards it to `SettingsModule` to register a new setting and wala, the user is now able to toggle the setting via `_settings` Rest API.
9. Upon receipt of the `RestRequest`, the extension matches it to the appropriate Action and executes it.

## FAQ

Expand Down
32 changes: 31 additions & 1 deletion src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.NamedWriteableRegistryParseRequest;
import org.opensearch.extensions.OpenSearchRequest;
import org.opensearch.extensions.rest.RegisterRestActionsRequest;
import org.opensearch.extensions.rest.RestExecuteOnExtensionRequest;
import org.opensearch.extensions.rest.RestExecuteOnExtensionResponse;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Settings;
Expand All @@ -27,7 +30,6 @@
import org.opensearch.discovery.InitializeExtensionsResponse;
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.extensions.RegisterRestActionsRequest;
import org.opensearch.index.IndicesModuleRequest;
import org.opensearch.index.IndicesModuleResponse;
import org.opensearch.indices.IndicesModule;
Expand Down Expand Up @@ -183,6 +185,24 @@ ExtensionBooleanResponse handleIndicesModuleNameRequest(IndicesModuleRequest ind
return indicesModuleNameResponse;
}

/**
* Handles a request from OpenSearch to execute a REST request on the extension.
*
* @param request The REST request to execute
* @return A response acknowledging the request.
*/
RestExecuteOnExtensionResponse handleRestExecuteOnExtensionRequest(RestExecuteOnExtensionRequest request) {
String message = "The extension would have just executed " + request.getMethod() + " " + request.getUri();
// TODO: logic matching the method/URI which came from the extension API to the appropriate
// action class, e.g., see AD plugin's actions in org.opensearch.ad.rest package.
// This should probably be stored locally in a map before we send the register API requests to OpenSearch.
// Tricky part is how to match up API names (in text) with actions (Class Names). Could use reflection
// or just register like the existing plugin code does. TBD future code!
// For now we just log (locally) and respond to OpenSearch that we received enough info to correlate such an action
logger.info(message);
return new RestExecuteOnExtensionResponse(message);
}

/**
* Initializes a Netty4Transport object. This object will be wrapped in a {@link TransportService} object.
*
Expand Down Expand Up @@ -308,6 +328,7 @@ public void startTransportService(TransportService transportService) {
((request, channel, task) -> channel.sendResponse(handleIndicesModuleRequest(request, transportService)))

);

transportService.registerRequestHandler(
ExtensionsOrchestrator.INDICES_EXTENSION_NAME_ACTION_NAME,
ThreadPool.Names.GENERIC,
Expand All @@ -317,6 +338,15 @@ public void startTransportService(TransportService transportService) {
((request, channel, task) -> channel.sendResponse(handleIndicesModuleNameRequest(request)))
);

transportService.registerRequestHandler(
ExtensionsOrchestrator.REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION,
ThreadPool.Names.GENERIC,
false,
false,
RestExecuteOnExtensionRequest::new,
((request, channel, task) -> channel.sendResponse(handleRestExecuteOnExtensionRequest(request)))
);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.extensions.RegisterRestActionsResponse;
import org.opensearch.extensions.rest.RegisterRestActionsResponse;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
Expand All @@ -27,7 +27,7 @@ public class RegisterRestActionsResponseHandler implements TransportResponseHand

@Override
public void handleResponse(RegisterRestActionsResponse response) {
logger.info("received {}", response);
logger.info("received {}", response.getResponse());
}

@Override
Expand Down
14 changes: 13 additions & 1 deletion src/test/java/org/opensearch/sdk/TestExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.opensearch.extensions.DiscoveryExtension;
import org.opensearch.extensions.ExtensionsOrchestrator.OpenSearchRequestType;
import org.opensearch.extensions.OpenSearchRequest;
import org.opensearch.extensions.rest.RestExecuteOnExtensionRequest;
import org.opensearch.extensions.rest.RestExecuteOnExtensionResponse;
import org.opensearch.rest.RestRequest.Method;
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.LocalNodeResponseHandler;
Expand Down Expand Up @@ -94,7 +97,7 @@ public void testTransportServiceAcceptedIncomingRequests() {
public void testRegisterRequestHandler() {

extensionsRunner.startTransportService(transportService);
verify(transportService, times(5)).registerRequestHandler(anyString(), anyString(), anyBoolean(), anyBoolean(), any(), any());
verify(transportService, times(6)).registerRequestHandler(anyString(), anyString(), anyBoolean(), anyBoolean(), any(), any());
}

@Test
Expand Down Expand Up @@ -141,6 +144,15 @@ public void testHandleOpenSearchRequest() throws Exception {
// Add additional OpenSearch request handler tests here for each default extension point
}

@Test
public void testHandleRestExecuteOnExtensionRequest() throws Exception {

RestExecuteOnExtensionRequest request = new RestExecuteOnExtensionRequest(Method.GET, "/foo");
RestExecuteOnExtensionResponse response = extensionsRunner.handleRestExecuteOnExtensionRequest(request);
assertTrue(response.getResponse().contains("GET"));
assertTrue(response.getResponse().contains("/foo"));
}

@Test
public void testClusterStateRequest() {

Expand Down