Skip to content

Commit 10ebba5

Browse files
MINOR: Add tests for assignor stability and assignment reuse (#21069)
Add tests for common assignor properties: * Assignors are expected to be stable. Once a balanced assignment has been computed, running the assignor a second time should not change the assignment. * The uniform assignors are expected to re-use the existing assignment maps when member assignments are unchanged. Reviewers: David Jacot <[email protected]>
1 parent 54e629e commit 10ebba5

File tree

4 files changed

+265
-66
lines changed

4 files changed

+265
-66
lines changed
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.coordinator.group.assignor;
18+
19+
import org.apache.kafka.common.Uuid;
20+
import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
21+
import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
22+
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
23+
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
24+
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
25+
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
26+
import org.apache.kafka.coordinator.group.modern.Assignment;
27+
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
28+
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
29+
import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
30+
import org.apache.kafka.image.MetadataImage;
31+
32+
import java.util.HashMap;
33+
import java.util.LinkedHashMap;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.Optional;
37+
import java.util.Set;
38+
39+
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment;
40+
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
41+
import static org.junit.jupiter.api.Assertions.assertSame;
42+
43+
public class CommonAssignorTests {
44+
private static final Uuid TOPIC_1_UUID = Uuid.randomUuid();
45+
private static final String TOPIC_1_NAME = "topic1";
46+
private static final Uuid TOPIC_2_UUID = Uuid.randomUuid();
47+
private static final String TOPIC_2_NAME = "topic2";
48+
private static final Uuid TOPIC_3_UUID = Uuid.randomUuid();
49+
private static final String TOPIC_3_NAME = "topic3";
50+
private static final String MEMBER_A = "A";
51+
private static final String MEMBER_B = "B";
52+
private static final String MEMBER_C = "C";
53+
54+
/**
55+
* Tests that an assignor reuses the same assignment maps when the assignment is unchanged.
56+
* @param assignor The assignor.
57+
* @param subscriptionType The subscription type.
58+
* @param rackAware Whether to test with rack awareness.
59+
*/
60+
public static void testAssignmentReuse(PartitionAssignor assignor, SubscriptionType subscriptionType, boolean rackAware) {
61+
MetadataImage metadataImage = new MetadataImageBuilder()
62+
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 2)
63+
.addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 5)
64+
.addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 7)
65+
.addRacks()
66+
.build();
67+
68+
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
69+
new KRaftCoordinatorMetadataImage(metadataImage)
70+
);
71+
72+
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
73+
members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
74+
rackAware ? Optional.of("rack1") : Optional.empty(),
75+
Optional.empty(),
76+
Set.of(TOPIC_1_UUID, TOPIC_2_UUID, TOPIC_3_UUID),
77+
Assignment.EMPTY
78+
));
79+
members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
80+
rackAware ? Optional.of("rack2") : Optional.empty(),
81+
Optional.empty(),
82+
Set.of(TOPIC_1_UUID, TOPIC_2_UUID, TOPIC_3_UUID),
83+
Assignment.EMPTY
84+
));
85+
members.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl(
86+
rackAware ? Optional.of("rack3") : Optional.empty(),
87+
Optional.empty(),
88+
Set.of(TOPIC_1_UUID, TOPIC_2_UUID, TOPIC_3_UUID),
89+
Assignment.EMPTY
90+
));
91+
92+
GroupSpec groupSpec = new GroupSpecImpl(
93+
members,
94+
subscriptionType,
95+
Map.of()
96+
);
97+
98+
GroupAssignment firstAssignment = assignor.assign(
99+
groupSpec,
100+
subscribedTopicMetadata
101+
);
102+
103+
Map<String, MemberSubscriptionAndAssignmentImpl> membersWithAssignment = new LinkedHashMap<>();
104+
for (Map.Entry<String, MemberSubscriptionAndAssignmentImpl> entry : members.entrySet()) {
105+
String memberId = entry.getKey();
106+
MemberSubscriptionAndAssignmentImpl memberSubscriptionAndAssignment = entry.getValue();
107+
membersWithAssignment.put(memberId, new MemberSubscriptionAndAssignmentImpl(
108+
memberSubscriptionAndAssignment.rackId(),
109+
memberSubscriptionAndAssignment.instanceId(),
110+
memberSubscriptionAndAssignment.subscribedTopicIds(),
111+
new Assignment(firstAssignment.members().get(memberId).partitions())
112+
));
113+
}
114+
GroupSpec groupSpecWithAssignment = new GroupSpecImpl(
115+
membersWithAssignment,
116+
subscriptionType,
117+
invertedTargetAssignment(membersWithAssignment)
118+
);
119+
120+
GroupAssignment secondAssignment = assignor.assign(
121+
groupSpecWithAssignment,
122+
subscribedTopicMetadata
123+
);
124+
125+
for (String memberId : members.keySet()) {
126+
assertSame(firstAssignment.members().get(memberId).partitions(), secondAssignment.members().get(memberId).partitions());
127+
}
128+
}
129+
130+
/**
131+
* Tests that an assignor produces the same assignment when the members are iterated in
132+
* different orders.
133+
* @param assignor The assignor.
134+
* @param subscriptionType The subscription type.
135+
* @param rackAware Whether to test with rack awareness.
136+
*/
137+
public static void testReassignmentStickiness(PartitionAssignor assignor, SubscriptionType subscriptionType, boolean rackAware) {
138+
MetadataImage metadataImage = new MetadataImageBuilder()
139+
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 2)
140+
.addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 5)
141+
.addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 7)
142+
.addRacks()
143+
.build();
144+
145+
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
146+
new KRaftCoordinatorMetadataImage(metadataImage)
147+
);
148+
149+
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
150+
members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
151+
Optional.empty(),
152+
Optional.empty(),
153+
Set.of(TOPIC_1_UUID, TOPIC_2_UUID, TOPIC_3_UUID),
154+
Assignment.EMPTY
155+
));
156+
members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
157+
// We want there to be multiple valid assignments, otherwise we aren't really
158+
// testing stickiness. Only give a single member a rack, so that the other members
159+
// are interchangeable.
160+
rackAware ? Optional.of("rack1") : Optional.empty(),
161+
Optional.empty(),
162+
Set.of(TOPIC_1_UUID, TOPIC_2_UUID, TOPIC_3_UUID),
163+
Assignment.EMPTY
164+
));
165+
members.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl(
166+
Optional.empty(),
167+
Optional.empty(),
168+
Set.of(TOPIC_1_UUID, TOPIC_2_UUID, TOPIC_3_UUID),
169+
Assignment.EMPTY
170+
));
171+
172+
GroupSpec groupSpec = new GroupSpecImpl(
173+
members,
174+
subscriptionType,
175+
Map.of()
176+
);
177+
178+
GroupAssignment firstAssignment = assignor.assign(
179+
groupSpec,
180+
subscribedTopicMetadata
181+
);
182+
183+
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
184+
firstAssignment.members().forEach((memberId, memberAssignment) ->
185+
expectedAssignment.put(memberId, memberAssignment.partitions())
186+
);
187+
188+
// Try running the assignor with the members in different orders. The assignment should be
189+
// the same every time.
190+
List<List<String>> memberIdOrders = List.of(
191+
List.of(MEMBER_A, MEMBER_B, MEMBER_C),
192+
List.of(MEMBER_A, MEMBER_C, MEMBER_B),
193+
List.of(MEMBER_B, MEMBER_A, MEMBER_C),
194+
List.of(MEMBER_B, MEMBER_C, MEMBER_A),
195+
List.of(MEMBER_C, MEMBER_A, MEMBER_B),
196+
List.of(MEMBER_C, MEMBER_B, MEMBER_A)
197+
);
198+
for (List<String> memberIdOrder : memberIdOrders) {
199+
Map<String, MemberSubscriptionAndAssignmentImpl> membersWithAssignment = new LinkedHashMap<>();
200+
for (String memberId : memberIdOrder) {
201+
MemberSubscriptionAndAssignmentImpl memberSubscriptionAndAssignment = members.get(memberId);
202+
membersWithAssignment.put(memberId, new MemberSubscriptionAndAssignmentImpl(
203+
memberSubscriptionAndAssignment.rackId(),
204+
memberSubscriptionAndAssignment.instanceId(),
205+
memberSubscriptionAndAssignment.subscribedTopicIds(),
206+
new Assignment(firstAssignment.members().get(memberId).partitions())
207+
));
208+
}
209+
GroupSpec groupSpecWithAssignment = new GroupSpecImpl(
210+
membersWithAssignment,
211+
subscriptionType,
212+
invertedTargetAssignment(membersWithAssignment)
213+
);
214+
215+
GroupAssignment secondAssignment = assignor.assign(
216+
groupSpecWithAssignment,
217+
subscribedTopicMetadata
218+
);
219+
220+
// The second assignment should be the same as the first
221+
assertAssignment(expectedAssignment, secondAssignment);
222+
}
223+
}
224+
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java

Lines changed: 14 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.kafka.image.MetadataImage;
3030

3131
import org.junit.jupiter.api.Test;
32+
import org.junit.jupiter.params.ParameterizedTest;
33+
import org.junit.jupiter.params.provider.ValueSource;
3234

3335
import java.util.ArrayList;
3436
import java.util.HashMap;
@@ -61,6 +63,18 @@ public class OptimizedUniformAssignmentBuilderTest {
6163
private final String memberB = "B";
6264
private final String memberC = "C";
6365

66+
@ParameterizedTest
67+
@ValueSource(booleans = {false, true})
68+
public void testAssignmentReuse(boolean rackAware) {
69+
CommonAssignorTests.testAssignmentReuse(assignor, HOMOGENEOUS, rackAware);
70+
}
71+
72+
@ParameterizedTest
73+
@ValueSource(booleans = {false, true})
74+
public void testReassignmentStickiness(boolean rackAware) {
75+
CommonAssignorTests.testReassignmentStickiness(assignor, HOMOGENEOUS, rackAware);
76+
}
77+
6478
@Test
6579
public void testOneMemberNoTopicSubscription() {
6680
MetadataImage metadataImage = new MetadataImageBuilder()
@@ -566,72 +580,6 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith
566580
checkValidityAndBalance(members, computedAssignment);
567581
}
568582

569-
@Test
570-
public void testReassignmentStickinessWhenAlreadyBalanced() {
571-
MetadataImage metadataImage = new MetadataImageBuilder()
572-
.addTopic(topic1Uuid, topic1Name, 5)
573-
.build();
574-
575-
// A TreeMap ensures that memberA is first in the iteration order.
576-
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
577-
578-
// Two members must have extra partitions. In the previous assignment, they were members A
579-
// and C.
580-
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
581-
Optional.empty(),
582-
Optional.empty(),
583-
Set.of(topic1Uuid),
584-
new Assignment(mkAssignment(
585-
mkTopicAssignment(topic1Uuid, 0, 3)
586-
))
587-
));
588-
589-
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
590-
Optional.empty(),
591-
Optional.empty(),
592-
Set.of(topic1Uuid, topic2Uuid),
593-
new Assignment(mkAssignment(
594-
mkTopicAssignment(topic1Uuid, 1)
595-
))
596-
));
597-
598-
members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
599-
Optional.empty(),
600-
Optional.empty(),
601-
Set.of(topic1Uuid, topic2Uuid),
602-
new Assignment(mkAssignment(
603-
mkTopicAssignment(topic1Uuid, 2, 4)
604-
))
605-
));
606-
607-
// Members A and C should keep their partitions.
608-
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
609-
expectedAssignment.put(memberA, mkAssignment(
610-
mkTopicAssignment(topic1Uuid, 0, 3)
611-
));
612-
expectedAssignment.put(memberB, mkAssignment(
613-
mkTopicAssignment(topic1Uuid, 1)
614-
));
615-
expectedAssignment.put(memberC, mkAssignment(
616-
mkTopicAssignment(topic1Uuid, 2, 4)
617-
));
618-
619-
GroupSpec groupSpec = new GroupSpecImpl(
620-
members,
621-
HOMOGENEOUS,
622-
invertedTargetAssignment(members)
623-
);
624-
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(new KRaftCoordinatorMetadataImage(metadataImage));
625-
626-
GroupAssignment computedAssignment = assignor.assign(
627-
groupSpec,
628-
subscribedTopicMetadata
629-
);
630-
631-
assertAssignment(expectedAssignment, computedAssignment);
632-
checkValidityAndBalance(members, computedAssignment);
633-
}
634-
635583
/**
636584
* Verifies that the given assignment is valid with respect to the given subscriptions.
637585
* Validity requirements:

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.kafka.image.MetadataImage;
3535

3636
import org.junit.jupiter.api.Test;
37+
import org.junit.jupiter.params.ParameterizedTest;
38+
import org.junit.jupiter.params.provider.CsvSource;
3739

3840
import java.util.HashMap;
3941
import java.util.Map;
@@ -61,6 +63,17 @@ public class RangeAssignorTest {
6163
private final String memberB = "B";
6264
private final String memberC = "C";
6365

66+
@ParameterizedTest
67+
@CsvSource({
68+
"HOMOGENEOUS, false",
69+
"HOMOGENEOUS, true",
70+
"HETEROGENEOUS, false",
71+
"HETEROGENEOUS, true"
72+
})
73+
public void testReassignmentStickiness(SubscriptionType subscriptionType, boolean rackAware) {
74+
CommonAssignorTests.testReassignmentStickiness(assignor, subscriptionType, rackAware);
75+
}
76+
6477
@Test
6578
public void testOneMemberNoTopic() {
6679
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.kafka.image.MetadataImage;
3131

3232
import org.junit.jupiter.api.Test;
33+
import org.junit.jupiter.params.ParameterizedTest;
34+
import org.junit.jupiter.params.provider.ValueSource;
3335

3436
import java.util.ArrayList;
3537
import java.util.Collection;
@@ -89,6 +91,18 @@ public Collection<String> memberIds() {
8991
}
9092
}
9193

94+
@ParameterizedTest
95+
@ValueSource(booleans = {false, true})
96+
public void testAssignmentReuse(boolean rackAware) {
97+
CommonAssignorTests.testAssignmentReuse(assignor, HETEROGENEOUS, rackAware);
98+
}
99+
100+
@ParameterizedTest
101+
@ValueSource(booleans = {false, true})
102+
public void testReassignmentStickiness(boolean rackAware) {
103+
CommonAssignorTests.testReassignmentStickiness(assignor, HETEROGENEOUS, rackAware);
104+
}
105+
92106
@Test
93107
public void testTwoMembersNoTopicSubscription() {
94108
MetadataImage metadataImage = new MetadataImageBuilder()

0 commit comments

Comments
 (0)