Skip to content

Commit a9dff62

Browse files
KAFKA-19955: Fix performance regression in server-side assignors (#21058)
The server-side assignors are designed to re-use the previous assignment maps where possible and clone them for modification lazily. #20097 introduced a bug where the assignments would always be re-wrapped in an unmodifiable map and so we would end up cloning them repeatedly in the assignors when multiple changes need to be made to a member assignment. Fix the bug by removing the unmodifiable map wrapping. Reviewers: David Jacot <[email protected]>
1 parent 917d695 commit a9dff62

File tree

2 files changed

+52
-2
lines changed

2 files changed

+52
-2
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/MemberAssignmentImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.apache.kafka.common.Uuid;
2020
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
2121

22-
import java.util.Collections;
2322
import java.util.Map;
2423
import java.util.Objects;
2524
import java.util.Set;
@@ -28,10 +27,12 @@
2827
* The partition assignment for a modern group member.
2928
*
3029
* @param partitions The partitions assigned to this member keyed by topicId.
30+
* The map will not be made immutable, since the server-side assignors rely on
31+
* being able to mutate the map while building new assignments.
3132
*/
3233
public record MemberAssignmentImpl(Map<Uuid, Set<Integer>> partitions) implements MemberAssignment {
3334
public MemberAssignmentImpl {
34-
partitions = Collections.unmodifiableMap(Objects.requireNonNull(partitions));
35+
Objects.requireNonNull(partitions);
3536
}
3637

3738
/**
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.coordinator.group.modern;
18+
19+
import org.apache.kafka.common.Uuid;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
import java.util.HashMap;
24+
import java.util.HashSet;
25+
import java.util.Set;
26+
27+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
28+
29+
public class MemberAssignmentImplTest {
30+
31+
@Test
32+
public void testPartitionsMutable() {
33+
// We depend on the the map inside MemberAssignmentImpl remaining mutable in the server-side
34+
// assignors, otherwise we end up deep copying the map unnecessarily.
35+
Uuid topicId1 = Uuid.randomUuid();
36+
Uuid topicId2 = Uuid.randomUuid();
37+
38+
HashMap<Uuid, Set<Integer>> partitions = new HashMap<>();
39+
partitions.put(topicId1, new HashSet<>());
40+
41+
MemberAssignmentImpl memberAssignment = new MemberAssignmentImpl(partitions);
42+
43+
// The map should remain mutable.
44+
assertDoesNotThrow(() -> memberAssignment.partitions().put(topicId2, Set.of(3, 4, 5)));
45+
46+
// The sets inside the map should remain mutable.
47+
assertDoesNotThrow(() -> memberAssignment.partitions().get(topicId1).add(3));
48+
}
49+
}

0 commit comments

Comments
 (0)