Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed

### Fixed
- [WLM] add a check to stop workload group deletion having rules ([#19502](https://github.com/opensearch-project/OpenSearch/pull/19502))

### Dependencies
- Bump `org.apache.zookeeper:zookeeper` from 3.9.3 to 3.9.4 ([#19535](https://github.com/opensearch-project/OpenSearch/pull/19535))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ public Collection<Object> createComponents(
wlmClusterSettingValuesProvider,
featureType
);
return List.of(refreshMechanism);
return List.of(refreshMechanism, rulePersistenceService, featureType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public Collection<Object> createComponents(
wlmClusterSettingValuesProvider,
featureType
);
return List.of(refreshMechanism);
return List.of(refreshMechanism, featureType, rulePersistenceService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,34 @@

package org.opensearch.plugin.wlm.action;

import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.AcknowledgedResponse;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.WorkloadGroup;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.plugin.wlm.rule.WorkloadGroupFeatureType;
import org.opensearch.plugin.wlm.service.WorkloadGroupPersistenceService;
import org.opensearch.rule.RulePersistenceService;
import org.opensearch.rule.action.GetRuleRequest;
import org.opensearch.rule.action.GetRuleResponse;
import org.opensearch.rule.autotagging.FeatureType;
import org.opensearch.rule.autotagging.Rule;
import org.opensearch.rule.service.IndexStoredRulePersistenceService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
* Transport action for delete WorkloadGroup
Expand All @@ -35,6 +47,8 @@ public class TransportDeleteWorkloadGroupAction extends TransportClusterManagerN
AcknowledgedResponse> {

private final WorkloadGroupPersistenceService workloadGroupPersistenceService;
private final RulePersistenceService rulePersistenceService;
private final FeatureType featureType;

/**
* Constructor for TransportDeleteWorkloadGroupAction
Expand All @@ -45,6 +59,8 @@ public class TransportDeleteWorkloadGroupAction extends TransportClusterManagerN
* @param threadPool - a {@link ThreadPool} object
* @param indexNameExpressionResolver - a {@link IndexNameExpressionResolver} object
* @param workloadGroupPersistenceService - a {@link WorkloadGroupPersistenceService} object
* @param persistenceService - a {@link IndexStoredRulePersistenceService} instance
* @param featureType - workloadManagement feature type
*/
@Inject
public TransportDeleteWorkloadGroupAction(
Expand All @@ -53,7 +69,9 @@ public TransportDeleteWorkloadGroupAction(
ActionFilters actionFilters,
ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver,
WorkloadGroupPersistenceService workloadGroupPersistenceService
WorkloadGroupPersistenceService workloadGroupPersistenceService,
IndexStoredRulePersistenceService persistenceService,
WorkloadGroupFeatureType featureType
) {
super(
DeleteWorkloadGroupAction.NAME,
Expand All @@ -65,6 +83,8 @@ public TransportDeleteWorkloadGroupAction(
indexNameExpressionResolver
);
this.workloadGroupPersistenceService = workloadGroupPersistenceService;
this.rulePersistenceService = persistenceService;
this.featureType = featureType;
}

@Override
Expand All @@ -73,12 +93,18 @@ protected void clusterManagerOperation(
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) throws Exception {
workloadGroupPersistenceService.deleteInClusterStateMetadata(request, listener);
threadPool.executor(executor()).submit(() -> {
try {
checkNoAssociatedRulesExist(request, listener, state);
} catch (Exception e) {
listener.onFailure(e);
}
});
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
return ThreadPool.Names.GET;
}

@Override
Expand All @@ -90,4 +116,48 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException {
protected ClusterBlockException checkBlock(DeleteWorkloadGroupRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

private void checkNoAssociatedRulesExist(
DeleteWorkloadGroupRequest request,
ActionListener<AcknowledgedResponse> listener,
ClusterState state
) {
Collection<WorkloadGroup> workloadGroups = WorkloadGroupPersistenceService.getFromClusterStateMetadata(request.getName(), state);
if (workloadGroups.isEmpty()) {
throw new ResourceNotFoundException("No WorkloadGroup exists with the provided name: " + request.getName());
}

WorkloadGroup workloadGroup = workloadGroups.iterator().next();
rulePersistenceService.getRule(
new GetRuleRequest(null, Collections.emptyMap(), null, featureType),
new ActionListener<GetRuleResponse>() {
@Override
public void onResponse(GetRuleResponse getRuleResponse) {
List<Rule> associatedRules = getRuleResponse.getRules()
.stream()
.filter(rule -> rule.getFeatureValue().equals(workloadGroup.get_id()))
.toList();

if (!associatedRules.isEmpty()) {
listener.onFailure(
new IllegalStateException(
workloadGroup.getName()
+ " workload group has rules with ids: "
+ associatedRules
+ " ."
+ "Please delete them first otherwise system will be an inconsistent state."
)
);
return;
}
workloadGroupPersistenceService.deleteInClusterStateMetadata(request, listener);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,18 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) {
*/
ClusterState deleteWorkloadGroupInClusterState(final String name, final ClusterState currentClusterState) {
final Metadata metadata = currentClusterState.metadata();
final WorkloadGroup workloadGroupToRemove = metadata.workloadGroups()
final WorkloadGroup workloadGroupToRemove = getWorkloadGroup(name, metadata);

return ClusterState.builder(currentClusterState).metadata(Metadata.builder(metadata).remove(workloadGroupToRemove).build()).build();
}

private static WorkloadGroup getWorkloadGroup(String name, Metadata metadata) {
return metadata.workloadGroups()
.values()
.stream()
.filter(workloadGroup -> workloadGroup.getName().equals(name))
.findAny()
.orElseThrow(() -> new ResourceNotFoundException("No WorkloadGroup exists with the provided name: " + name));

return ClusterState.builder(currentClusterState).metadata(Metadata.builder(metadata).remove(workloadGroupToRemove).build()).build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.function.Supplier;

import static org.opensearch.plugin.wlm.WorkloadManagementPlugin.PRINCIPAL_ATTRIBUTE_NAME;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -187,8 +188,7 @@ public void testCreateComponentsReturnsRefreshMechanism() {
mockRepositoriesServiceSupplier
);

assertEquals(1, components.size());
assertTrue(components.iterator().next() instanceof RefreshBasedSyncMechanism);
assertThat(components.stream().filter(c -> c instanceof RefreshBasedSyncMechanism).count(), equalTo(1L));
}

public void testSetAttributesWithMock() {
Expand Down
Loading
Loading