Skip to content

Commit 3072fa2

Browse files
imRishNashking94
authored andcommitted
Fail weight update when decommission ongoing and fail decommission when attribute not weighed away (opensearch-project#4839)
* Add checks for decommission before setting weights Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
1 parent d22a252 commit 3072fa2

6 files changed

Lines changed: 263 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
9292
- Refactor Base Action class javadocs to OpenSearch.API ([#4732](https://github.com/opensearch-project/OpenSearch/pull/4732))
9393
- Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459))
9494
- Refactored BalancedAllocator.Balancer to LocalShardsBalancer ([#4761](https://github.com/opensearch-project/OpenSearch/pull/4761))
95+
- Fail weight update when decommission ongoing and fail decommission when attribute not weighed away ([#4839](https://github.com/opensearch-project/OpenSearch/pull/4839))
9596
### Deprecated
9697
### Removed
9798
- Remove deprecated code to add node name into log pattern of log4j property file ([#4568](https://github.com/opensearch-project/OpenSearch/pull/4568))

server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@
2020
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction;
2121
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest;
2222
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse;
23+
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
24+
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
2325
import org.opensearch.cluster.ClusterState;
2426
import org.opensearch.cluster.decommission.DecommissionAttribute;
2527
import org.opensearch.cluster.decommission.DecommissionStatus;
28+
import org.opensearch.cluster.decommission.DecommissioningFailedException;
2629
import org.opensearch.cluster.node.DiscoveryNode;
2730
import org.opensearch.cluster.node.DiscoveryNodeRole;
31+
import org.opensearch.cluster.routing.WeightedRouting;
2832
import org.opensearch.cluster.service.ClusterService;
2933
import org.opensearch.common.Priority;
3034
import org.opensearch.common.settings.Settings;
@@ -37,6 +41,7 @@
3741
import java.util.Collections;
3842
import java.util.Iterator;
3943
import java.util.List;
44+
import java.util.Map;
4045
import java.util.concurrent.ExecutionException;
4146

4247
import static org.opensearch.test.NodeRoles.onlyRole;
@@ -102,6 +107,17 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx
102107

103108
ensureStableCluster(6);
104109

110+
logger.info("--> setting shard routing weights for weighted round robin");
111+
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
112+
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
113+
114+
ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin()
115+
.cluster()
116+
.prepareWeightedRouting()
117+
.setWeightedRouting(weightedRouting)
118+
.get();
119+
assertTrue(weightedRoutingResponse.isAcknowledged());
120+
105121
logger.info("--> starting decommissioning nodes in zone {}", 'c');
106122
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c");
107123
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
@@ -162,4 +178,57 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx
162178
// as by then all nodes should have joined the cluster
163179
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
164180
}
181+
182+
public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception {
183+
Settings commonSettings = Settings.builder()
184+
.put("cluster.routing.allocation.awareness.attributes", "zone")
185+
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
186+
.build();
187+
// Start 3 cluster manager eligible nodes
188+
internalCluster().startClusterManagerOnlyNodes(3, Settings.builder().put(commonSettings).build());
189+
// start 3 data nodes
190+
internalCluster().startDataOnlyNodes(3, Settings.builder().put(commonSettings).build());
191+
ensureStableCluster(6);
192+
ClusterHealthResponse health = client().admin()
193+
.cluster()
194+
.prepareHealth()
195+
.setWaitForEvents(Priority.LANGUID)
196+
.setWaitForGreenStatus()
197+
.setWaitForNodes(Integer.toString(6))
198+
.execute()
199+
.actionGet();
200+
assertFalse(health.isTimedOut());
201+
202+
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c");
203+
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
204+
assertBusy(() -> {
205+
DecommissioningFailedException ex = expectThrows(
206+
DecommissioningFailedException.class,
207+
() -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet()
208+
);
209+
assertTrue(
210+
ex.getMessage()
211+
.contains("no weights are set to the attribute. Please set appropriate weights before triggering decommission action")
212+
);
213+
});
214+
215+
logger.info("--> setting shard routing weights for weighted round robin");
216+
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 1.0);
217+
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
218+
219+
ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin()
220+
.cluster()
221+
.prepareWeightedRouting()
222+
.setWeightedRouting(weightedRouting)
223+
.get();
224+
assertTrue(weightedRoutingResponse.isAcknowledged());
225+
226+
assertBusy(() -> {
227+
DecommissioningFailedException ex = expectThrows(
228+
DecommissioningFailedException.class,
229+
() -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet()
230+
);
231+
assertTrue(ex.getMessage().contains("weight for decommissioned attribute is expected to be [0.0] but found [1.0]"));
232+
});
233+
}
165234
}

server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import org.opensearch.cluster.ClusterStateUpdateTask;
2121
import org.opensearch.cluster.NotClusterManagerException;
2222
import org.opensearch.cluster.metadata.Metadata;
23+
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
2324
import org.opensearch.cluster.node.DiscoveryNode;
25+
import org.opensearch.cluster.routing.WeightedRouting;
2426
import org.opensearch.cluster.routing.allocation.AllocationService;
2527
import org.opensearch.cluster.service.ClusterService;
2628
import org.opensearch.common.Priority;
@@ -129,6 +131,8 @@ public ClusterState execute(ClusterState currentState) {
129131
DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata();
130132
// check that request is eligible to proceed
131133
ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute);
134+
// ensure attribute is weighed away
135+
ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute);
132136
decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute);
133137
logger.info("registering decommission metadata [{}] to execute action", decommissionAttributeMetadata.toString());
134138
return ClusterState.builder(currentState)
@@ -413,6 +417,30 @@ private static void validateAwarenessAttribute(
413417
}
414418
}
415419

420+
private static void ensureToBeDecommissionedAttributeWeighedAway(ClusterState state, DecommissionAttribute decommissionAttribute) {
421+
WeightedRoutingMetadata weightedRoutingMetadata = state.metadata().weightedRoutingMetadata();
422+
if (weightedRoutingMetadata == null) {
423+
throw new DecommissioningFailedException(
424+
decommissionAttribute,
425+
"no weights are set to the attribute. Please set appropriate weights before triggering decommission action"
426+
);
427+
}
428+
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
429+
if (weightedRouting.attributeName().equals(decommissionAttribute.attributeName()) == false) {
430+
throw new DecommissioningFailedException(
431+
decommissionAttribute,
432+
"no weights are specified to attribute [" + decommissionAttribute.attributeName() + "]"
433+
);
434+
}
435+
Double attributeValueWeight = weightedRouting.weights().get(decommissionAttribute.attributeValue());
436+
if (attributeValueWeight == null || attributeValueWeight.equals(0.0) == false) {
437+
throw new DecommissioningFailedException(
438+
decommissionAttribute,
439+
"weight for decommissioned attribute is expected to be [0.0] but found [" + attributeValueWeight + "]"
440+
);
441+
}
442+
}
443+
416444
private static void ensureEligibleRequest(
417445
DecommissionAttributeMetadata decommissionAttributeMetadata,
418446
DecommissionAttribute requestedDecommissionAttribute

server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.opensearch.cluster.ClusterState;
2020
import org.opensearch.cluster.ClusterStateUpdateTask;
2121
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
22+
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
23+
import org.opensearch.cluster.decommission.DecommissionStatus;
2224
import org.opensearch.cluster.metadata.Metadata;
2325
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
2426
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
@@ -68,6 +70,8 @@ public void registerWeightedRoutingMetadata(
6870
clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) {
6971
@Override
7072
public ClusterState execute(ClusterState currentState) {
73+
// verify currently no decommission action is ongoing
74+
ensureNoOngoingDecommissionAction(currentState);
7175
Metadata metadata = currentState.metadata();
7276
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
7377
WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE);
@@ -154,4 +158,15 @@ public void verifyAwarenessAttribute(String attributeName) {
154158
throw validationException;
155159
}
156160
}
161+
162+
public void ensureNoOngoingDecommissionAction(ClusterState state) {
163+
DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata();
164+
if (decommissionAttributeMetadata != null && decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED) == false) {
165+
throw new IllegalStateException(
166+
"a decommission action is ongoing with status ["
167+
+ decommissionAttributeMetadata.status().status()
168+
+ "], cannot update weight during this state"
169+
);
170+
}
171+
}
157172
}

server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,17 @@
2222
import org.opensearch.cluster.ClusterState;
2323
import org.opensearch.cluster.coordination.CoordinationMetadata;
2424
import org.opensearch.cluster.metadata.Metadata;
25+
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
2526
import org.opensearch.cluster.node.DiscoveryNode;
2627
import org.opensearch.cluster.node.DiscoveryNodeRole;
2728
import org.opensearch.cluster.node.DiscoveryNodes;
29+
import org.opensearch.cluster.routing.WeightedRouting;
2830
import org.opensearch.cluster.routing.allocation.AllocationService;
2931
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
3032
import org.opensearch.cluster.service.ClusterService;
3133
import org.opensearch.common.settings.ClusterSettings;
3234
import org.opensearch.common.settings.Settings;
35+
import org.opensearch.test.ClusterServiceUtils;
3336
import org.opensearch.test.OpenSearchTestCase;
3437
import org.opensearch.test.transport.MockTransport;
3538
import org.opensearch.threadpool.TestThreadPool;
@@ -169,6 +172,56 @@ public void onFailure(Exception e) {
169172
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
170173
}
171174

175+
public void testDecommissionNotStartedWithoutWeighingAwayAttribute_1() throws InterruptedException {
176+
Map<String, Double> weights = Map.of("zone_1", 1.0, "zone_2", 1.0, "zone_3", 0.0);
177+
setWeightedRoutingWeights(weights);
178+
final CountDownLatch countDownLatch = new CountDownLatch(1);
179+
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1");
180+
ActionListener<DecommissionResponse> listener = new ActionListener<DecommissionResponse>() {
181+
@Override
182+
public void onResponse(DecommissionResponse decommissionResponse) {
183+
fail("on response shouldn't have been called");
184+
}
185+
186+
@Override
187+
public void onFailure(Exception e) {
188+
assertTrue(e instanceof DecommissioningFailedException);
189+
assertThat(
190+
e.getMessage(),
191+
Matchers.containsString("weight for decommissioned attribute is expected to be [0.0] but found [1.0]")
192+
);
193+
countDownLatch.countDown();
194+
}
195+
};
196+
decommissionService.startDecommissionAction(decommissionAttribute, listener);
197+
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
198+
}
199+
200+
public void testDecommissionNotStartedWithoutWeighingAwayAttribute_2() throws InterruptedException {
201+
final CountDownLatch countDownLatch = new CountDownLatch(1);
202+
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1");
203+
ActionListener<DecommissionResponse> listener = new ActionListener<DecommissionResponse>() {
204+
@Override
205+
public void onResponse(DecommissionResponse decommissionResponse) {
206+
fail("on response shouldn't have been called");
207+
}
208+
209+
@Override
210+
public void onFailure(Exception e) {
211+
assertTrue(e instanceof DecommissioningFailedException);
212+
assertThat(
213+
e.getMessage(),
214+
Matchers.containsString(
215+
"no weights are set to the attribute. Please set appropriate weights before triggering decommission action"
216+
)
217+
);
218+
countDownLatch.countDown();
219+
}
220+
};
221+
decommissionService.startDecommissionAction(decommissionAttribute, listener);
222+
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
223+
}
224+
172225
@SuppressWarnings("unchecked")
173226
public void testDecommissioningFailedWhenAnotherAttributeDecommissioningSuccessful() throws InterruptedException {
174227
final CountDownLatch countDownLatch = new CountDownLatch(1);
@@ -286,6 +339,17 @@ public void onFailure(Exception e) {
286339
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
287340
}
288341

342+
private void setWeightedRoutingWeights(Map<String, Double> weights) {
343+
ClusterState clusterState = clusterService.state();
344+
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
345+
WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting);
346+
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata());
347+
metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata);
348+
clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build();
349+
ClusterState.Builder builder = ClusterState.builder(clusterState);
350+
ClusterServiceUtils.setState(clusterService, builder);
351+
}
352+
289353
private ClusterState addDataNodes(ClusterState clusterState, String zone, String... nodeIds) {
290354
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes());
291355
org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.add(newDataNode(nodeId, singletonMap("zone", zone))));

0 commit comments

Comments
 (0)