Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.discovery.InitializeExtensionsRequest;
import org.opensearch.extensions.ExtensionActionListenerOnFailureRequest;
import org.opensearch.extensions.DiscoveryExtension;
import org.opensearch.extensions.EnvironmentSettingsRequest;
import org.opensearch.extensions.AddSettingsUpdateConsumerRequest;
import org.opensearch.extensions.UpdateSettingsRequest;
import org.opensearch.extensions.ExtensionsOrchestrator.RequestType;
Expand Down Expand Up @@ -368,24 +367,31 @@ public void sendActionListenerOnFailureRequest(TransportService transportService
}

/**
* Requests the environment setting values from OpenSearch for the corresponding component settings. The result will be handled by a {@link EnvironmentSettingsResponseHandler}.
* Requests the environment settings from OpenSearch. The result will be handled by a {@link EnvironmentSettingsResponseHandler}.
*
* @param componentSettings The component setting that correspond to the values provided by the environment settings
* @param transportService The TransportService defining the connection to OpenSearch.
* @return A Setting object from the OpenSearch Node environment
*/
public void sendEnvironmentSettingsRequest(TransportService transportService, List<Setting<?>> componentSettings) {
public Settings sendEnvironmentSettingsRequest(TransportService transportService) {
logger.info("Sending Environment Settings request to OpenSearch");
EnvironmentSettingsResponseHandler environmentSettingsResponseHandler = new EnvironmentSettingsResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_ENVIRONMENT_SETTINGS,
new EnvironmentSettingsRequest(componentSettings),
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_ENVIRONMENT_SETTINGS),
environmentSettingsResponseHandler
);
// Wait on environment settings response
environmentSettingsResponseHandler.awaitResponse();
} catch (InterruptedException e) {
logger.info("Failed to recieve Environment Settings response from OpenSearch", e);
} catch (Exception e) {
logger.info("Failed to send Environment Settings request to OpenSearch", e);
}

// At this point, response handler has read in the environment settings
return environmentSettingsResponseHandler.getEnvironmentSettings();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,48 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.env.EnvironmentSettingsResponse;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* This class handles the response from OpenSearch to a {@link ExtensionsRunner#sendEnvironmentSettingsRequest} call.
*/
public class EnvironmentSettingsResponseHandler implements TransportResponseHandler<EnvironmentSettingsResponse> {

private static final Logger logger = LogManager.getLogger(EnvironmentSettingsResponseHandler.class);
private final CountDownLatch inProgressLatch;
private Settings environmentSettings;

/**
* Instantiates a new EnvironmentSettingsResponseHandler with a count down latch and an empty Settings object
*/
public EnvironmentSettingsResponseHandler() {
this.inProgressLatch = new CountDownLatch(1);
this.environmentSettings = Settings.EMPTY;
}

@Override
public void handleResponse(EnvironmentSettingsResponse response) {
logger.info("received {}", response);

// Set environmentSettings from response
this.environmentSettings = response.getEnvironmentSettings();
inProgressLatch.countDown();
}

@Override
public void handleException(TransportException exp) {
logger.info("EnvironmentSettingsRequest failed", exp);
inProgressLatch.countDown();
}

@Override
Expand All @@ -43,4 +63,15 @@ public String executor() {
public EnvironmentSettingsResponse read(StreamInput in) throws IOException {
return new EnvironmentSettingsResponse(in);
}

/**
* Invokes await on the EnvironmentSettingsResponseHandler count down latch
*/
public void awaitResponse() throws InterruptedException {
inProgressLatch.await(ExtensionsOrchestrator.EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS);
}

public Settings getEnvironmentSettings() {
return this.environmentSettings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.security.Principal;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -250,9 +248,7 @@ public void testActionListenerOnFailureRequest() {

@Test
public void testEnvironmentSettingsRequest() {

List<Setting<?>> componentSettings = new ArrayList<>();
extensionsRunner.sendEnvironmentSettingsRequest(transportService, componentSettings);
extensionsRunner.sendEnvironmentSettingsRequest(transportService);

verify(transportService, times(1)).sendRequest(any(), anyString(), any(), any(EnvironmentSettingsResponseHandler.class));
}
Expand Down