Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add NodeResourceUsageStats to ClusterInfo ([#18480](https://github.com/opensearch-project/OpenSearch/issues/18472))
- Introduce SecureHttpTransportParameters experimental API (to complement SecureTransportParameters counterpart) ([#18572](https://github.com/opensearch-project/OpenSearch/issues/18572))
- Create equivalents of JSM's AccessController in the java agent ([#18346](https://github.com/opensearch-project/OpenSearch/issues/18346))
- [WLM] Add WLM mode validation for workload group CRUD requests ([#18652](https://github.com/opensearch-project/OpenSearch/issues/18652))
- Introduced a new cluster-level API to fetch remote store metadata (segments and translogs) for each shard of an index. ([#18257](https://github.com/opensearch-project/OpenSearch/pull/18257))
- Add last index request timestamp columns to the `_cat/indices` API. ([10766](https://github.com/opensearch-project/OpenSearch/issues/10766))
- Introduce a new pull-based ingestion plugin for file-based indexing (for local testing) ([#18591](https://github.com/opensearch-project/OpenSearch/pull/18591))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,18 @@
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.test.rest.OpenSearchRestTestCase;
import org.junit.Before;

import java.io.IOException;
import java.util.Locale;

public class WorkloadManagementRestIT extends OpenSearchRestTestCase {

@Before
public void enableWlmMode() throws Exception {
setWlmMode("enabled");
}

public void testCreate() throws Exception {
Response response = performOperation("PUT", "_wlm/workload_group", getCreateJson("analytics", "enforced", 0.4, 0.2));
assertEquals(response.getStatusLine().getStatusCode(), 200);
Expand Down Expand Up @@ -129,6 +136,16 @@ public void testCRUD() throws Exception {
performOperation("DELETE", "_wlm/workload_group/users3", null);
}

public void testOperationWhenWlmDisabled() throws Exception {
setWlmMode("disabled");
assertThrows(
ResponseException.class,
() -> performOperation("PUT", "_wlm/workload_group", getCreateJson("analytics", "enforced", 0.4, 0.2))
);
assertThrows(ResponseException.class, () -> performOperation("DELETE", "_wlm/workload_group/analytics4", null));
assertOK(performOperation("GET", "_wlm/workload_group/", null));
}

static String getCreateJson(String name, String resiliencyMode, double cpu, double memory) {
return "{\n"
+ " \"name\": \""
Expand Down Expand Up @@ -171,4 +188,19 @@ Response performOperation(String method, String uriPath, String json) throws IOE
}
return client().performRequest(request);
}

private void setWlmMode(String mode) throws Exception {
String settingJson = String.format(Locale.ROOT, """
{
"persistent": {
"wlm.workload_group.mode": "%s"
}
}
""", mode);

Request request = new Request("PUT", "/_cluster/settings");
request.setJsonEntity(settingJson);
Response response = client().performRequest(request);
assertEquals(200, response.getStatusLine().getStatusCode());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.wlm.WlmMode;
import org.opensearch.wlm.WorkloadManagementSettings;

/**
* Central provider for maintaining and supplying the current values of wlm cluster settings.
* This class listens for updates to relevant settings and provides the latest setting values.
*/
public class WlmClusterSettingValuesProvider {

private volatile WlmMode wlmMode;

/**
* Constructor for WlmClusterSettingValuesProvider
* @param settings OpenSearch settings
* @param clusterSettings Cluster settings to register update listener
*/
public WlmClusterSettingValuesProvider(Settings settings, ClusterSettings clusterSettings) {
this.wlmMode = WorkloadManagementSettings.WLM_MODE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(WorkloadManagementSettings.WLM_MODE_SETTING, this::setWlmMode);
}

/**
* Check if WLM mode is ENABLED
* Throws an IllegalStateException if WLM mode is DISABLED or MONITOR ONLY.
* @param operationDescription A short text describing the operation, e.g. "create workload group".
*/
public void ensureWlmEnabled(String operationDescription) {
if (wlmMode != WlmMode.ENABLED) {
throw new IllegalStateException(
"Cannot "
+ operationDescription
+ " because workload management mode is disabled or monitor_only."
+ "To enable this feature, set [wlm.workload_group.mode] to 'enabled' in cluster settings."
);
}
}

/**
* Set the latest WLM mode.
* @param mode The wlm mode to set
*/
private void setWlmMode(WlmMode mode) {
this.wlmMode = mode;
}

Check warning on line 56 in plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WlmClusterSettingValuesProvider.java

View check run for this annotation

Codecov / codecov/patch

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WlmClusterSettingValuesProvider.java#L55-L56

Added lines #L55 - L56 were not covered by tests

/**
* Get the latest WLM mode.
*/
public WlmMode getWlmMode() {
return wlmMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class WorkloadManagementPlugin extends Plugin implements ActionPlugin, Sy
private static FeatureType featureType;
private static RulePersistenceService rulePersistenceService;
private static RuleRoutingService ruleRoutingService;
private WlmClusterSettingValuesProvider wlmClusterSettingValuesProvider;
private AutoTaggingActionFilter autoTaggingActionFilter;

/**
Expand All @@ -112,6 +113,10 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
wlmClusterSettingValuesProvider = new WlmClusterSettingValuesProvider(
clusterService.getSettings(),
clusterService.getClusterSettings()
);
featureType = new WorkloadGroupFeatureType(new WorkloadGroupFeatureValueValidator(clusterService));
RuleEntityParser parser = new XContentRuleParser(featureType);
AttributeValueStoreFactory attributeValueStoreFactory = new AttributeValueStoreFactory(
Expand All @@ -132,12 +137,10 @@ public Collection<Object> createComponents(
RefreshBasedSyncMechanism refreshMechanism = new RefreshBasedSyncMechanism(
threadPool,
clusterService.getSettings(),
clusterService.getClusterSettings(),
parser,
ruleProcessingService,
featureType,
rulePersistenceService,
new RuleEventClassifier(Collections.emptySet(), ruleProcessingService)
new RuleEventClassifier(Collections.emptySet(), ruleProcessingService),
wlmClusterSettingValuesProvider
);

autoTaggingActionFilter = new AutoTaggingActionFilter(ruleProcessingService, threadPool);
Expand Down Expand Up @@ -181,10 +184,10 @@ public List<RestHandler> getRestHandlers(
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(
new RestCreateWorkloadGroupAction(),
new RestCreateWorkloadGroupAction(wlmClusterSettingValuesProvider),
new RestGetWorkloadGroupAction(),
new RestDeleteWorkloadGroupAction(),
new RestUpdateWorkloadGroupAction()
new RestDeleteWorkloadGroupAction(wlmClusterSettingValuesProvider),
new RestUpdateWorkloadGroupAction(wlmClusterSettingValuesProvider)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.plugin.wlm.WlmClusterSettingValuesProvider;
import org.opensearch.plugin.wlm.action.CreateWorkloadGroupAction;
import org.opensearch.plugin.wlm.action.CreateWorkloadGroupRequest;
import org.opensearch.plugin.wlm.action.CreateWorkloadGroupResponse;
Expand All @@ -35,10 +36,15 @@
*/
public class RestCreateWorkloadGroupAction extends BaseRestHandler {

private final WlmClusterSettingValuesProvider nonPluginSettingValuesProvider;

/**
* Constructor for RestCreateWorkloadGroupAction
* @param nonPluginSettingValuesProvider the settings provider to access the current WLM mode
*/
public RestCreateWorkloadGroupAction() {}
public RestCreateWorkloadGroupAction(WlmClusterSettingValuesProvider nonPluginSettingValuesProvider) {
this.nonPluginSettingValuesProvider = nonPluginSettingValuesProvider;
}

@Override
public String getName() {
Expand All @@ -55,6 +61,7 @@

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
nonPluginSettingValuesProvider.ensureWlmEnabled(getName());

Check warning on line 64 in plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestCreateWorkloadGroupAction.java

View check run for this annotation

Codecov / codecov/patch

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestCreateWorkloadGroupAction.java#L64

Added line #L64 was not covered by tests
try (XContentParser parser = request.contentParser()) {
CreateWorkloadGroupRequest createWorkloadGroupRequest = CreateWorkloadGroupRequest.fromXContent(parser);
return channel -> client.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.plugin.wlm.rest;

import org.opensearch.plugin.wlm.WlmClusterSettingValuesProvider;
import org.opensearch.plugin.wlm.action.DeleteWorkloadGroupAction;
import org.opensearch.plugin.wlm.action.DeleteWorkloadGroupRequest;
import org.opensearch.rest.BaseRestHandler;
Expand All @@ -27,10 +28,15 @@
*/
public class RestDeleteWorkloadGroupAction extends BaseRestHandler {

private final WlmClusterSettingValuesProvider nonPluginSettingValuesProvider;

/**
* Constructor for RestDeleteWorkloadGroupAction
* @param nonPluginSettingValuesProvider the settings provider to access the current WLM mode
*/
public RestDeleteWorkloadGroupAction() {}
public RestDeleteWorkloadGroupAction(WlmClusterSettingValuesProvider nonPluginSettingValuesProvider) {
this.nonPluginSettingValuesProvider = nonPluginSettingValuesProvider;
}

@Override
public String getName() {
Expand All @@ -47,6 +53,7 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
nonPluginSettingValuesProvider.ensureWlmEnabled(getName());
DeleteWorkloadGroupRequest deleteWorkloadGroupRequest = new DeleteWorkloadGroupRequest(request.param("name"));
deleteWorkloadGroupRequest.clusterManagerNodeTimeout(
request.paramAsTime("cluster_manager_timeout", deleteWorkloadGroupRequest.clusterManagerNodeTimeout())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.plugin.wlm.WlmClusterSettingValuesProvider;
import org.opensearch.plugin.wlm.action.UpdateWorkloadGroupAction;
import org.opensearch.plugin.wlm.action.UpdateWorkloadGroupRequest;
import org.opensearch.plugin.wlm.action.UpdateWorkloadGroupResponse;
Expand All @@ -35,10 +36,15 @@
*/
public class RestUpdateWorkloadGroupAction extends BaseRestHandler {

private final WlmClusterSettingValuesProvider nonPluginSettingValuesProvider;

/**
* Constructor for RestUpdateWorkloadGroupAction
* @param nonPluginSettingValuesProvider the settings provider to access the current WLM mode
*/
public RestUpdateWorkloadGroupAction() {}
public RestUpdateWorkloadGroupAction(WlmClusterSettingValuesProvider nonPluginSettingValuesProvider) {
this.nonPluginSettingValuesProvider = nonPluginSettingValuesProvider;
}

@Override
public String getName() {
Expand All @@ -55,6 +61,7 @@

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
nonPluginSettingValuesProvider.ensureWlmEnabled(getName());

Check warning on line 64 in plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestUpdateWorkloadGroupAction.java

View check run for this annotation

Codecov / codecov/patch

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestUpdateWorkloadGroupAction.java#L64

Added line #L64 was not covered by tests
try (XContentParser parser = request.contentParser()) {
UpdateWorkloadGroupRequest updateWorkloadGroupRequest = UpdateWorkloadGroupRequest.fromXContent(parser, request.param("name"));
return channel -> client.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.plugin.wlm.WlmClusterSettingValuesProvider;
import org.opensearch.plugin.wlm.rule.sync.detect.RuleEvent;
import org.opensearch.plugin.wlm.rule.sync.detect.RuleEventClassifier;
import org.opensearch.rule.InMemoryRuleProcessingService;
import org.opensearch.rule.RuleEntityParser;
import org.opensearch.rule.RulePersistenceService;
import org.opensearch.rule.action.GetRuleRequest;
import org.opensearch.rule.action.GetRuleResponse;
Expand All @@ -28,7 +26,6 @@
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.wlm.WlmMode;
import org.opensearch.wlm.WorkloadManagementSettings;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -65,12 +62,10 @@ public class RefreshBasedSyncMechanism extends AbstractLifecycleComponent {
private final ThreadPool threadPool;
private long refreshInterval;
private volatile Scheduler.Cancellable scheduledFuture;
private final RuleEntityParser parser;
private final InMemoryRuleProcessingService ruleProcessingService;
private final RulePersistenceService rulePersistenceService;
private final RuleEventClassifier ruleEventClassifier;
private final FeatureType featureType;
private WlmMode wlmMode;
private final WlmClusterSettingValuesProvider nonPluginSettingValuesProvider;
// This var keeps the Rules which were present during last run of this service
private Set<Rule> lastRunIndexedRules;
private static final Logger logger = LogManager.getLogger(RefreshBasedSyncMechanism.class);
Expand All @@ -80,41 +75,34 @@ public class RefreshBasedSyncMechanism extends AbstractLifecycleComponent {
*
* @param threadPool
* @param settings
* @param clusterSettings
* @param parser
* @param ruleProcessingService
* @param featureType
* @param rulePersistenceService
* @param ruleEventClassifier
* @param nonPluginSettingValuesProvider
*/
public RefreshBasedSyncMechanism(
ThreadPool threadPool,
Settings settings,
ClusterSettings clusterSettings,
RuleEntityParser parser,
InMemoryRuleProcessingService ruleProcessingService,
FeatureType featureType,
RulePersistenceService rulePersistenceService,
RuleEventClassifier ruleEventClassifier
RuleEventClassifier ruleEventClassifier,
WlmClusterSettingValuesProvider nonPluginSettingValuesProvider
) {
this.threadPool = threadPool;
refreshInterval = RULE_SYNC_REFRESH_INTERVAL_SETTING.get(settings);
this.parser = parser;
this.ruleProcessingService = ruleProcessingService;
this.featureType = featureType;
this.rulePersistenceService = rulePersistenceService;
this.lastRunIndexedRules = new HashSet<>();
this.ruleEventClassifier = ruleEventClassifier;
wlmMode = WorkloadManagementSettings.WLM_MODE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(WorkloadManagementSettings.WLM_MODE_SETTING, this::setWlmMode);
this.nonPluginSettingValuesProvider = nonPluginSettingValuesProvider;
}

/**
* synchronized check is needed in case two scheduled runs happen concurrently though highly improbable
* but theoretically possible
*/
synchronized void doRun() {
if (wlmMode != WlmMode.ENABLED) {
if (nonPluginSettingValuesProvider.getWlmMode() != WlmMode.ENABLED) {
return;
}

Expand Down Expand Up @@ -161,8 +149,4 @@ protected void doClose() throws IOException {
scheduledFuture.cancel();
}
}

void setWlmMode(WlmMode mode) {
this.wlmMode = mode;
}
}
Loading
Loading