Skip to content

Commit 051fb66

Browse files
authored
KAFKA-19938: Fix combined node log dir format in KafkaClusterTestKit (#21017)
The previous logic did not format all log directories on combined nodes. I was only formatting the metadata log directory which is the expected result, all log directories should be formatted. I stumble on this issue while implementing [KIP-1066](https://cwiki.apache.org/confluence/x/Lg_TEg) Reviewers: PoAn Yang <[email protected]>
1 parent e4795bf commit 051fb66

File tree

3 files changed

+73
-43
lines changed

3 files changed

+73
-43
lines changed

test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@
7575
import java.util.concurrent.Future;
7676
import java.util.concurrent.TimeUnit;
7777
import java.util.concurrent.atomic.AtomicReference;
78-
import java.util.stream.Collectors;
7978

8079
import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
8180
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIRS_CONFIG;
@@ -441,12 +440,13 @@ public void format() throws Exception {
441440
List<Future<?>> futures = new ArrayList<>();
442441
try {
443442
for (ControllerServer controller : controllers.values()) {
444-
futures.add(executorService.submit(() -> formatNode(controller.sharedServer().metaPropsEnsemble(), true)));
443+
futures.add(executorService.submit(() -> formatNode(controller.sharedServer().metaPropsEnsemble())));
445444
}
446445
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
447446
BrokerServer broker = entry.getValue();
448-
futures.add(executorService.submit(() -> formatNode(broker.sharedServer().metaPropsEnsemble(),
449-
!nodes.isCombined(nodes().brokerNodes().get(entry.getKey()).id()))));
447+
if (!nodes.isCombined(nodes().brokerNodes().get(entry.getKey()).id())) {
448+
futures.add(executorService.submit(() -> formatNode(broker.sharedServer().metaPropsEnsemble())));
449+
}
450450
}
451451
for (Future<?> future: futures) {
452452
future.get();
@@ -460,33 +460,22 @@ public void format() throws Exception {
460460
}
461461

462462
private void formatNode(
463-
MetaPropertiesEnsemble ensemble,
464-
boolean writeMetadataDirectory
463+
MetaPropertiesEnsemble ensemble
465464
) {
466465
try {
467466
final var nodeId = ensemble.nodeId().getAsInt();
468467
Formatter formatter = new Formatter();
469468
formatter.setNodeId(nodeId);
470469
formatter.setClusterId(ensemble.clusterId().get());
471-
if (writeMetadataDirectory) {
472-
formatter.setDirectories(ensemble.logDirProps().keySet());
473-
} else {
474-
formatter.setDirectories(ensemble.logDirProps().keySet().stream().
475-
filter(d -> !ensemble.metadataLogDir().get().equals(d)).
476-
collect(Collectors.toSet()));
477-
}
470+
formatter.setDirectories(ensemble.logDirProps().keySet());
478471
if (formatter.directories().isEmpty()) {
479472
return;
480473
}
481474
formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
482475
formatter.setUnstableFeatureVersionsEnabled(true);
483476
formatter.setIgnoreFormatted(false);
484477
formatter.setControllerListenerName(controllerListenerName);
485-
if (writeMetadataDirectory) {
486-
formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
487-
} else {
488-
formatter.setMetadataLogDirectory(Optional.empty());
489-
}
478+
formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
490479
StringBuilder dynamicVotersBuilder = new StringBuilder();
491480
String prefix = "";
492481
if (standalone) {

test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ public TestKitNodes build() {
196196
baseDirectory.toFile().getAbsolutePath(),
197197
clusterId,
198198
brokerNodeIds.contains(id),
199-
perServerProperties.getOrDefault(id, Map.of())
199+
perServerProperties.getOrDefault(id, Map.of()),
200+
numDisksPerBroker
200201
);
201202
controllerNodes.put(id, controllerNode);
202203
}
@@ -346,21 +347,36 @@ private static TestKitNode buildControllerNode(int id,
346347
String baseDirectory,
347348
String clusterId,
348349
boolean combined,
349-
Map<String, String> propertyOverrides) {
350+
Map<String, String> propertyOverrides,
351+
int numDisksPerController) {
352+
List<String> logDataDirectories = combined
353+
? IntStream
354+
.range(0, numDisksPerController)
355+
.mapToObj(i -> String.format("combined_%d_%d", id, i))
356+
.map(logDir -> {
357+
if (Paths.get(logDir).isAbsolute()) {
358+
return logDir;
359+
}
360+
return new File(baseDirectory, logDir).getAbsolutePath();
361+
})
362+
.toList()
363+
: List.of(new File(baseDirectory, String.format("controller_%d", id)).getAbsolutePath());
350364
String metadataDirectory = new File(baseDirectory,
351365
combined ? String.format("combined_%d_0", id) : String.format("controller_%d", id)).getAbsolutePath();
352366
MetaPropertiesEnsemble.Copier copier = new MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
353367

354368
copier.setMetaLogDir(Optional.of(metadataDirectory));
355-
copier.setLogDirProps(
356-
metadataDirectory,
357-
new MetaProperties.Builder()
358-
.setVersion(MetaPropertiesVersion.V1)
359-
.setClusterId(clusterId)
360-
.setNodeId(id)
361-
.setDirectoryId(copier.generateValidDirectoryId())
362-
.build()
363-
);
369+
for (String logDir : logDataDirectories) {
370+
copier.setLogDirProps(
371+
logDir,
372+
new MetaProperties.Builder()
373+
.setVersion(MetaPropertiesVersion.V1)
374+
.setClusterId(clusterId)
375+
.setNodeId(id)
376+
.setDirectoryId(copier.generateValidDirectoryId())
377+
.build()
378+
);
379+
}
364380

365381
return new TestKitNode() {
366382
private final MetaPropertiesEnsemble ensemble = copier.copy();

test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,22 @@
1717

1818
package org.apache.kafka.common.test;
1919

20+
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
21+
2022
import org.junit.jupiter.api.Test;
2123
import org.junit.jupiter.params.ParameterizedTest;
24+
import org.junit.jupiter.params.provider.CsvSource;
2225
import org.junit.jupiter.params.provider.ValueSource;
2326

27+
import java.nio.file.Files;
2428
import java.nio.file.Path;
2529
import java.nio.file.Paths;
2630
import java.util.HashMap;
31+
import java.util.HashSet;
2732
import java.util.Map;
2833
import java.util.Set;
2934
import java.util.stream.Collectors;
35+
import java.util.stream.IntStream;
3036

3137
import static org.junit.jupiter.api.Assertions.assertEquals;
3238
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -87,37 +93,56 @@ public void testCreateClusterWithBadPerServerProperties() {
8793
}
8894

8995
@ParameterizedTest
90-
@ValueSource(booleans = {true, false})
91-
public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) throws Exception {
96+
@CsvSource({
97+
"true,1,1,2", /* 1 combined node */
98+
"true,5,7,2", /* 5 combined nodes + 2 controllers */
99+
"true,7,5,2", /* 7 combined nodes */
100+
"false,1,1,2", /* 1 broker + 1 controller */
101+
"false,5,7,2", /* 5 brokers + 7 controllers */
102+
"false,7,5,2", /* 7 brokers + 5 controllers */
103+
})
104+
public void testCreateClusterFormatAndCloseWithMultipleLogDirs(boolean combined, int numBrokers, int numControllers, int numDisks) throws Exception {
92105
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
93106
new TestKitNodes.Builder().
94-
setNumBrokerNodes(5).
95-
setNumDisksPerBroker(2).
107+
setNumBrokerNodes(numBrokers).
108+
setNumDisksPerBroker(numDisks).
96109
setCombined(combined).
97-
setNumControllerNodes(3).build()).build()) {
110+
setNumControllerNodes(numControllers).build()).build()) {
98111

99112
TestKitNodes nodes = cluster.nodes();
100-
assertEquals(5, nodes.brokerNodes().size());
101-
assertEquals(3, nodes.controllerNodes().size());
113+
assertEquals(numBrokers, nodes.brokerNodes().size());
114+
assertEquals(numControllers, nodes.controllerNodes().size());
102115

116+
Set<String> logDirs = new HashSet<>();
103117
nodes.brokerNodes().forEach((brokerId, node) -> {
104-
assertEquals(2, node.logDataDirectories().size());
105-
Set<String> expected = Set.of(String.format("broker_%d_data0", brokerId), String.format("broker_%d_data1", brokerId));
106-
if (nodes.isCombined(node.id())) {
107-
expected = Set.of(String.format("combined_%d_0", brokerId), String.format("combined_%d_1", brokerId));
108-
}
118+
assertEquals(numDisks, node.logDataDirectories().size());
119+
Set<String> expectedDisks = IntStream.range(0, numDisks)
120+
.mapToObj(i -> {
121+
if (nodes.isCombined(node.id())) {
122+
return String.format("combined_%d_%d", brokerId, i);
123+
} else {
124+
return String.format("broker_%d_data%d", brokerId, i);
125+
}
126+
}).collect(Collectors.toSet());
109127
assertEquals(
110-
expected,
128+
expectedDisks,
111129
node.logDataDirectories().stream()
112130
.map(p -> Paths.get(p).getFileName().toString())
113131
.collect(Collectors.toSet())
114132
);
133+
logDirs.addAll(node.logDataDirectories());
115134
});
116135

117136
nodes.controllerNodes().forEach((controllerId, node) -> {
118-
String expected = combined ? String.format("combined_%d_0", controllerId) : String.format("controller_%d", controllerId);
137+
String expected = nodes.isCombined(node.id()) ? String.format("combined_%d_0", controllerId) : String.format("controller_%d", controllerId);
119138
assertEquals(expected, Paths.get(node.metadataDirectory()).getFileName().toString());
139+
logDirs.addAll(node.logDataDirectories());
120140
});
141+
142+
cluster.format();
143+
logDirs.forEach(logDir ->
144+
assertTrue(Files.exists(Paths.get(logDir, MetaPropertiesEnsemble.META_PROPERTIES_NAME)))
145+
);
121146
}
122147
}
123148

0 commit comments

Comments
 (0)