Skip to content

Commit e278611

Browse files
dreamer-89mch2
authored andcommitted
[Segment Replication] Introduce primary weight factor for primary shards distribution (opensearch-project#6017)
* Add integration test to show shard allocation Signed-off-by: Suraj Singh <surajrider@gmail.com> * Update WeightFunction to consider primary shards for uniform primary distribution Signed-off-by: Suraj Singh <surajrider@gmail.com> * Include primary shard weight for all shard types Signed-off-by: Suraj Singh <surajrider@gmail.com> * Update integration test to show docrep & segrep indices Signed-off-by: Suraj Singh <surajrider@gmail.com> * Add settings updater and update TriConsumer functional interface declaration Signed-off-by: Suraj Singh <surajrider@gmail.com> * Add balance configuration test Signed-off-by: Suraj Singh <surajrider@gmail.com> * Fix failing unit tests and merge conflicts Signed-off-by: Suraj Singh <surajrider@gmail.com> * Rename primary balance factor Signed-off-by: Suraj Singh <surajrider@gmail.com> * Update integration tests to avoid green state timeouts Signed-off-by: Suraj Singh <surajrider@gmail.com> * PR feedback Signed-off-by: Suraj Singh <surajrider@gmail.com> --------- Signed-off-by: Suraj Singh <surajrider@gmail.com>
1 parent 979becb commit e278611

15 files changed

Lines changed: 852 additions & 24 deletions

File tree

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
package org.opensearch.common.collect;
9+
10+
import java.util.Objects;
11+
12+
/**
13+
* A container for 3 elements, similar to {@link org.opensearch.common.collect.Tuple}
14+
*
15+
* @opensearch.internal
16+
*/
17+
public class Triplet<V1, V2, V3> {
18+
19+
public static <V1, V2, V3> Triplet<V1, V2, V3> tuple(V1 v1, V2 v2, V3 v3) {
20+
return new Triplet<>(v1, v2, v3);
21+
}
22+
23+
private final V1 v1;
24+
private final V2 v2;
25+
26+
private final V3 v3;
27+
28+
@Override
29+
public boolean equals(Object o) {
30+
if (this == o) return true;
31+
if (o == null || getClass() != o.getClass()) return false;
32+
Triplet<?, ?, ?> triplet = (Triplet<?, ?, ?>) o;
33+
return Objects.equals(v1, triplet.v1) && Objects.equals(v2, triplet.v2) && Objects.equals(v3, triplet.v3);
34+
}
35+
36+
@Override
37+
public int hashCode() {
38+
return Objects.hash(v1, v2, v3);
39+
}
40+
41+
public Triplet(V1 v1, V2 v2, V3 v3) {
42+
this.v1 = v1;
43+
this.v2 = v2;
44+
this.v3 = v3;
45+
}
46+
47+
public V1 v1() {
48+
return v1;
49+
}
50+
51+
public V2 v2() {
52+
return v2;
53+
}
54+
55+
public V3 v3() {
56+
return v3;
57+
}
58+
59+
@Override
60+
public String toString() {
61+
return "Tuple [v1=" + v1 + ", v2=" + v2 + ", v3=" + v3 + "]";
62+
}
63+
}
Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,307 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.replication;
10+
11+
import org.opensearch.cluster.ClusterState;
12+
import org.opensearch.cluster.metadata.IndexMetadata;
13+
import org.opensearch.cluster.routing.RoutingNode;
14+
import org.opensearch.cluster.routing.RoutingNodes;
15+
import org.opensearch.cluster.routing.ShardRouting;
16+
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
17+
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.index.IndexModule;
19+
import org.opensearch.indices.replication.common.ReplicationType;
20+
import org.opensearch.test.InternalTestCluster;
21+
import org.opensearch.test.OpenSearchIntegTestCase;
22+
23+
import java.util.ArrayList;
24+
import java.util.Formatter;
25+
import java.util.List;
26+
import java.util.Locale;
27+
import java.util.Map;
28+
import java.util.TreeMap;
29+
import java.util.concurrent.TimeUnit;
30+
31+
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
32+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
33+
34+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
35+
public class SegmentReplicationAllocationIT extends SegmentReplicationBaseIT {
36+
37+
private void createIndex(String idxName, int shardCount, int replicaCount, boolean isSegRep) {
38+
Settings.Builder builder = Settings.builder()
39+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shardCount)
40+
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
41+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT);
42+
if (isSegRep) {
43+
builder = builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
44+
} else {
45+
builder = builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT);
46+
}
47+
prepareCreate(idxName, builder).get();
48+
}
49+
50+
/**
51+
* This test verifies primary shard allocation is balanced.
52+
*/
53+
public void testShardAllocation() throws Exception {
54+
internalCluster().startClusterManagerOnlyNode();
55+
final int maxReplicaCount = 2;
56+
final int maxShardCount = 5;
57+
final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10);
58+
final int numberOfIndices = randomIntBetween(5, 10);
59+
60+
final List<String> nodeNames = new ArrayList<>();
61+
logger.info("--> Creating {} nodes", nodeCount);
62+
for (int i = 0; i < nodeCount; i++) {
63+
nodeNames.add(internalCluster().startNode());
64+
}
65+
assertAcked(
66+
client().admin()
67+
.cluster()
68+
.prepareUpdateSettings()
69+
.setPersistentSettings(
70+
Settings.builder().put(BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING.getKey(), "1.0f")
71+
)
72+
);
73+
74+
int shardCount, replicaCount, totalShardCount = 0, totalReplicaCount = 0;
75+
ShardAllocations shardAllocations = new ShardAllocations();
76+
ClusterState state;
77+
for (int i = 0; i < numberOfIndices; i++) {
78+
shardCount = randomIntBetween(1, maxShardCount);
79+
totalShardCount += shardCount;
80+
replicaCount = randomIntBetween(0, maxReplicaCount);
81+
totalReplicaCount += replicaCount;
82+
createIndex("test" + i, shardCount, replicaCount, i % 2 == 0);
83+
logger.info("--> Creating index {} with shard count {} and replica count {}", "test" + i, shardCount, replicaCount);
84+
assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS);
85+
state = client().admin().cluster().prepareState().execute().actionGet().getState();
86+
shardAllocations.printShardDistribution(state);
87+
}
88+
state = client().admin().cluster().prepareState().execute().actionGet().getState();
89+
RoutingNodes nodes = state.getRoutingNodes();
90+
final float avgNumShards = (float) (totalShardCount) / (float) (nodes.size());
91+
final int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f)));
92+
final int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f)));
93+
94+
for (RoutingNode node : nodes) {
95+
assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards);
96+
assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards);
97+
}
98+
}
99+
100+
/**
101+
* This test verifies shard allocation with changes to cluster config i.e. node add, removal keeps the primary shard
102+
* allocation balanced.
103+
*/
104+
public void testAllocationWithDisruption() throws Exception {
105+
internalCluster().startClusterManagerOnlyNode();
106+
final int maxReplicaCount = 2;
107+
final int maxShardCount = 5;
108+
final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10);
109+
final int numberOfIndices = randomIntBetween(1, 10);
110+
111+
logger.info("--> Creating {} nodes", nodeCount);
112+
final List<String> nodeNames = new ArrayList<>();
113+
for (int i = 0; i < nodeCount; i++) {
114+
nodeNames.add(internalCluster().startNode());
115+
}
116+
assertAcked(
117+
client().admin()
118+
.cluster()
119+
.prepareUpdateSettings()
120+
.setPersistentSettings(
121+
Settings.builder()
122+
.put(BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING.getKey(), "1.0f")
123+
.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), "0.0f")
124+
.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), "0.0f")
125+
.build()
126+
)
127+
);
128+
129+
int shardCount, replicaCount, totalShardCount = 0, totalReplicaCount = 0;
130+
ShardAllocations shardAllocations = new ShardAllocations();
131+
ClusterState state;
132+
for (int i = 0; i < numberOfIndices; i++) {
133+
shardCount = randomIntBetween(1, maxShardCount);
134+
totalShardCount += shardCount;
135+
replicaCount = randomIntBetween(1, maxReplicaCount);
136+
totalReplicaCount += replicaCount;
137+
logger.info("--> Creating index test{} with primary {} and replica {}", i, shardCount, replicaCount);
138+
createIndex("test" + i, shardCount, replicaCount, i % 2 == 0);
139+
assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS);
140+
if (logger.isTraceEnabled()) {
141+
state = client().admin().cluster().prepareState().execute().actionGet().getState();
142+
shardAllocations.printShardDistribution(state);
143+
}
144+
}
145+
state = client().admin().cluster().prepareState().execute().actionGet().getState();
146+
float avgNumShards = (float) (totalShardCount) / (float) (state.getRoutingNodes().size());
147+
int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f)));
148+
int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f)));
149+
150+
for (RoutingNode node : state.getRoutingNodes()) {
151+
assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards);
152+
assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards);
153+
}
154+
155+
final int additionalNodeCount = randomIntBetween(1, 5);
156+
logger.info("--> Adding {} nodes", additionalNodeCount);
157+
158+
internalCluster().startNodes(additionalNodeCount);
159+
assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS);
160+
state = client().admin().cluster().prepareState().execute().actionGet().getState();
161+
avgNumShards = (float) (totalShardCount) / (float) (state.getRoutingNodes().size());
162+
minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f)));
163+
maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f)));
164+
shardAllocations.printShardDistribution(state);
165+
for (RoutingNode node : state.getRoutingNodes()) {
166+
assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards);
167+
assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards);
168+
}
169+
170+
logger.info("--> Stop one third nodes");
171+
for (int i = 1; i < nodeCount; i += 3) {
172+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeNames.get(i)));
173+
// give replica a chance to promote as primary before terminating node containing the replica
174+
assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS);
175+
}
176+
state = client().admin().cluster().prepareState().execute().actionGet().getState();
177+
avgNumShards = (float) (totalShardCount) / (float) (state.getRoutingNodes().size());
178+
minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f)));
179+
maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f)));
180+
shardAllocations.printShardDistribution(state);
181+
182+
for (RoutingNode node : state.getRoutingNodes()) {
183+
assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards);
184+
assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards);
185+
}
186+
}
187+
188+
/**
189+
* This class is created for debugging purpose to show shard allocation across nodes. It keeps cluster state which
190+
* is used to build the node's shard allocation
191+
*/
192+
private class ShardAllocations {
193+
ClusterState state;
194+
195+
public static final String separator = "===================================================";
196+
public static final String ONE_LINE_RETURN = "\n";
197+
public static final String TWO_LINE_RETURN = "\n\n";
198+
199+
/**
200+
Store shard primary/replica shard count against a node for segrep indices.
201+
String: NodeId
202+
int[]: tuple storing primary shard count in 0th index and replica's in 1
203+
*/
204+
TreeMap<String, int[]> nodeToSegRepCountMap = new TreeMap<>();
205+
/**
206+
Store shard primary/replica shard count against a node for docrep indices.
207+
String: NodeId
208+
int[]: tuple storing primary shard count in 0th index and replica's in 1
209+
*/
210+
TreeMap<String, int[]> nodeToDocRepCountMap = new TreeMap<>();
211+
212+
/**
213+
* Helper map containing NodeName to NodeId
214+
*/
215+
TreeMap<String, String> nameToNodeId = new TreeMap<>();
216+
217+
/*
218+
Unassigned array containing primary at 0, replica at 1
219+
*/
220+
int[] unassigned = new int[2];
221+
222+
int[] totalShards = new int[2];
223+
224+
public final String printShardAllocationWithHeader(int[] docrep, int[] segrep) {
225+
StringBuffer sb = new StringBuffer();
226+
Formatter formatter = new Formatter(sb, Locale.getDefault());
227+
formatter.format("%-20s %-20s %-20s %-20s\n", "P", docrep[0] + segrep[0], docrep[0], segrep[0]);
228+
formatter.format("%-20s %-20s %-20s %-20s\n", "R", docrep[1] + segrep[1], docrep[1], segrep[1]);
229+
return sb.toString();
230+
}
231+
232+
public void reset() {
233+
nodeToSegRepCountMap.clear();
234+
nodeToDocRepCountMap.clear();
235+
nameToNodeId.clear();
236+
totalShards[0] = totalShards[1] = 0;
237+
unassigned[0] = unassigned[1] = 0;
238+
}
239+
240+
public void setState(ClusterState state) {
241+
this.reset();
242+
this.state = state;
243+
buildMap();
244+
}
245+
246+
private void buildMap() {
247+
for (RoutingNode node : state.getRoutingNodes()) {
248+
nameToNodeId.putIfAbsent(node.node().getName(), node.nodeId());
249+
nodeToSegRepCountMap.putIfAbsent(node.nodeId(), new int[] { 0, 0 });
250+
nodeToDocRepCountMap.putIfAbsent(node.nodeId(), new int[] { 0, 0 });
251+
}
252+
for (ShardRouting shardRouting : state.routingTable().allShards()) {
253+
// Fetch shard to update. Initialize local array
254+
if (isIndexSegRep(shardRouting.getIndexName())) {
255+
updateMap(nodeToSegRepCountMap, shardRouting);
256+
} else {
257+
updateMap(nodeToDocRepCountMap, shardRouting);
258+
}
259+
}
260+
}
261+
262+
void updateMap(TreeMap<String, int[]> mapToUpdate, ShardRouting shardRouting) {
263+
int[] shard;
264+
shard = shardRouting.assignedToNode() ? mapToUpdate.get(shardRouting.currentNodeId()) : unassigned;
265+
// Update shard type count
266+
if (shardRouting.primary()) {
267+
shard[0]++;
268+
totalShards[0]++;
269+
} else {
270+
shard[1]++;
271+
totalShards[1]++;
272+
}
273+
// For assigned shards, put back counter
274+
if (shardRouting.assignedToNode()) mapToUpdate.put(shardRouting.currentNodeId(), shard);
275+
}
276+
277+
boolean isIndexSegRep(String indexName) {
278+
return state.metadata()
279+
.index(indexName)
280+
.getSettings()
281+
.get(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey())
282+
.equals(ReplicationType.SEGMENT.toString());
283+
}
284+
285+
@Override
286+
public String toString() {
287+
StringBuffer sb = new StringBuffer();
288+
sb.append(TWO_LINE_RETURN + separator + ONE_LINE_RETURN);
289+
Formatter formatter = new Formatter(sb, Locale.getDefault());
290+
for (Map.Entry<String, String> entry : nameToNodeId.entrySet()) {
291+
String nodeId = nameToNodeId.get(entry.getKey());
292+
formatter.format("%-20s %-20s %-20s %-20s\n", entry.getKey().toUpperCase(Locale.getDefault()), "TOTAL", "DOCREP", "SEGREP");
293+
sb.append(printShardAllocationWithHeader(nodeToDocRepCountMap.get(nodeId), nodeToSegRepCountMap.get(nodeId)));
294+
}
295+
sb.append(ONE_LINE_RETURN);
296+
formatter.format("%-20s %-20s %-20s\n\n", "Unassigned ", unassigned[0], unassigned[1]);
297+
formatter.format("%-20s %-20s %-20s\n\n", "Total Shards", totalShards[0], totalShards[1]);
298+
return sb.toString();
299+
}
300+
301+
public void printShardDistribution(ClusterState state) {
302+
this.setState(state);
303+
logger.info("--> Shard distribution {}", this);
304+
}
305+
}
306+
307+
}

server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,23 @@ public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
329329
return shards;
330330
}
331331

332+
/**
333+
* Determine the primary shards of an index with a specific state
334+
* @param states set of states which should be listed
335+
* @return a list of shards
336+
*/
337+
public List<ShardRouting> primaryShardsWithState(ShardRoutingState... states) {
338+
List<ShardRouting> shards = new ArrayList<>();
339+
for (ShardRouting shardEntry : this) {
340+
for (ShardRoutingState state : states) {
341+
if (shardEntry.state() == state && shardEntry.primary() == true) {
342+
shards.add(shardEntry);
343+
}
344+
}
345+
}
346+
return shards;
347+
}
348+
332349
/**
333350
* Determine the shards of an index with a specific state
334351
* @param index id of the index

0 commit comments

Comments
 (0)