Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions smallrye-reactive-messaging-kafka-test-companion/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,52 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [ {
"code": "java.field.constantValueChanged",
"old": "field io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension.KAFKA_VERSION",
"new": "field io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension.KAFKA_VERSION",
"justification": "Default kafka version to 4.1.0"
},
{
"code": "java.field.typeChanged",
"old": "field io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension.kafka",
"new": "field io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension.kafka",
"justification": "Kafka broker container is no longer restricted to StrimziKafkaContainer"
},
{
"code": "java.method.visibilityReduced",
"old": "method <T extends io.strimzi.test.container.StrimziKafkaContainer> T io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::configureKafkaContainer(T)",
"new": "method io.smallrye.reactive.messaging.kafka.companion.test.KafkaContainer io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::configureKafkaContainer(io.smallrye.reactive.messaging.kafka.companion.test.KafkaContainer)",
"oldVisibility": "public",
"newVisibility": "private",
"justification": "Kafka broker container is no longer restricted to StrimziKafkaContainer"
},
{
"code": "java.method.returnTypeChanged",
"old": "method io.strimzi.test.container.StrimziKafkaContainer io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::createKafkaContainer()",
"new": "method org.testcontainers.containers.GenericContainer<?> io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::createKafkaContainer()",
"justification": "Kafka broker container is no longer restricted to StrimziKafkaContainer"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter io.strimzi.test.container.StrimziKafkaContainer io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::restart(===io.strimzi.test.container.StrimziKafkaContainer===, int)",
"new": "parameter org.testcontainers.containers.GenericContainer<?> io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::restart(===org.testcontainers.containers.GenericContainer<?>===, int)",
"parameterIndex": "0",
"justification": "Kafka broker container is no longer restricted to StrimziKafkaContainer"
},
{
"code": "java.method.returnTypeChanged",
"old": "method io.strimzi.test.container.StrimziKafkaContainer io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::restart(io.strimzi.test.container.StrimziKafkaContainer, int)",
"new": "method org.testcontainers.containers.GenericContainer<?> io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::restart(org.testcontainers.containers.GenericContainer<?>, int)",
"justification": "Kafka broker container is no longer restricted to StrimziKafkaContainer"
},
{
"code": "java.method.returnTypeChanged",
"old": "method io.strimzi.test.container.StrimziKafkaContainer io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::startKafkaBroker(int)",
"new": "method org.testcontainers.containers.GenericContainer<?> io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::startKafkaBroker(int)",
"justification": "Kafka broker container is no longer restricted to StrimziKafkaContainer"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand All @@ -43,4 +88,4 @@
"minCriticality" : "documented",
"output" : "out"
}
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@
import org.jboss.logging.Logger;
import org.junit.jupiter.api.extension.*;
import org.junit.jupiter.api.extension.ExtensionContext.Store.CloseableResource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.shaded.org.awaitility.core.ConditionTimeoutException;

import io.strimzi.test.container.StrimziKafkaContainer;

/**
* Junit extension for creating Strimzi Kafka broker
* Junit extension for creating Kafka broker
*/
public class KafkaBrokerExtension implements BeforeAllCallback, BeforeEachCallback, ParameterResolver, CloseableResource {
public static final Logger LOGGER = Logger.getLogger(KafkaBrokerExtension.class.getName());

public static final String KAFKA_VERSION = "4.0.0";
public static final String KAFKA_IMAGE = "apache/kafka";
public static final String KAFKA_VERSION = "4.2.0-rc4";
public static final String STRIMZI_VERSION = "4.1.0";

protected StrimziKafkaContainer kafka;
protected GenericContainer<?> kafka;

@Override
public void beforeAll(ExtensionContext context) {
Expand All @@ -50,24 +53,62 @@ public void close() {
stopKafkaBroker();
}

public static StrimziKafkaContainer createKafkaContainer() {
return configureKafkaContainer(new StrimziKafkaContainer());
public static GenericContainer<?> createKafkaContainer() {
String kafkaImage = System.getProperty("kafka-container-image", KAFKA_IMAGE);
String kafkaVersion = System.getProperty("kafka-container-version", KAFKA_VERSION);
String strimziVersion = System.getProperty("strimzi-container-version", STRIMZI_VERSION);
GenericContainer<? extends GenericContainer<?>> kafka;
if (kafkaImage.contains("strimzi")) {
kafka = new StrimziKafkaContainer(kafkaImage + ":latest-kafka-" + strimziVersion);
} else {
kafka = new KafkaContainer(kafkaImage + ":" + kafkaVersion);
}
return configureContainer(kafka);
}

public static <T extends StrimziKafkaContainer> T configureKafkaContainer(T container) {
String kafkaVersion = System.getProperty("kafka-container-version", KAFKA_VERSION);
container.withKafkaVersion(kafkaVersion);
public static <T extends GenericContainer<?>> GenericContainer<?> configureContainer(T container) {
if (container instanceof StrimziKafkaContainer s) {
return configureStrimziContainer(s);
} else if (container instanceof KafkaContainer k) {
return configureKafkaContainer(k);
} else {
return container;
}
}

private static KafkaContainer configureKafkaContainer(KafkaContainer container) {
Map<String, String> envVars = Map.of(
"KAFKA_LOG_CLEANER_ENABLE", "false",
"KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0",
"KAFKA_UNSTABLE_API_VERSIONS_ENABLE", "true",
"KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS", "classic,consumer,share",
"KAFKA_GROUP_SHARE_ENABLE", "true",
// "KAFKA_SHARE_COORDINATOR_APPEND_LINGER_MS", "-1",
// a single node topic needs to have 1 as replication factor
"KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR", "1");
return container.withEnv(envVars);
}

public static StrimziKafkaContainer configureStrimziContainer(StrimziKafkaContainer container) {
String strimziVersion = System.getProperty("strimzi-container-version",
System.getProperty("kafka-container-version", STRIMZI_VERSION));
container.withKafkaVersion(strimziVersion);
Map<String, String> config = new HashMap<>();
config.put("log.cleaner.enable", "false");
config.put("group.initial.rebalance.delay.ms", "0");
config.put("unstable.api.versions.enable", "true");
config.put("group.coordinator.rebalance.protocols", "classic,consumer,share");
config.put("group.share.enable", "true");
// a single node topic needs to have 1 as replication factor
config.put("share.coordinator.state.topic.replication.factor", "1");
container.withKafkaConfigurationMap(config);
return container;
}

public void startKafkaBroker() {
kafka = createKafkaContainer();
kafka.start();
LOGGER.info("Kafka broker started: " + kafka.getBootstrapServers() + " (" + kafka.getMappedPort(9092) + ")");
LOGGER.info("Kafka broker started: " + getBootstrapServers(kafka) + " (" + kafka.getMappedPort(9092) + ")");
await().until(() -> kafka.isRunning());
}

Expand All @@ -80,7 +121,7 @@ public void startKafkaBroker() {
* @param gracePeriodInSecond number of seconds to wait before restarting
* @return the new broker
*/
public static StrimziKafkaContainer restart(StrimziKafkaContainer kafka, int gracePeriodInSecond) {
public static GenericContainer<?> restart(GenericContainer<?> kafka, int gracePeriodInSecond) {
int port = kafka.getMappedPort(9092);
try {
kafka.close();
Expand All @@ -93,8 +134,13 @@ public static StrimziKafkaContainer restart(StrimziKafkaContainer kafka, int gra
return startKafkaBroker(port);
}

public static StrimziKafkaContainer startKafkaBroker(int port) {
StrimziKafkaContainer kafka = createKafkaContainer().withPort(port);
public static GenericContainer<?> startKafkaBroker(int port) {
GenericContainer<?> kafka = createKafkaContainer();
if (kafka instanceof StrimziKafkaContainer strimzi) {
strimzi.withPort(port);
} else if (kafka instanceof KafkaContainer k) {
k.withPort(port);
}
kafka.start();
await().until(kafka::isRunning);
return kafka;
Expand Down Expand Up @@ -132,9 +178,7 @@ public Object resolveParameter(ParameterContext parameterContext, ExtensionConte
if (parameterContext.isAnnotated(KafkaBootstrapServers.class)) {
ExtensionContext.Store globalStore = extensionContext.getRoot().getStore(GLOBAL);
KafkaBrokerExtension extension = (KafkaBrokerExtension) globalStore.get(KafkaBrokerExtension.class);
if (extension.kafka != null) {
return extension.kafka.getBootstrapServers();
}
return getBootstrapServers(extension.kafka);
}
return null;
}
Expand Down Expand Up @@ -162,7 +206,7 @@ private void isBrokerHealthy() {
await().until(() -> kafka.isRunning());
await().catchUncaughtExceptions().until(() -> {
Map<String, Object> config = new HashMap<>();
config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers(kafka));
config.put(CommonClientConfigs.CLIENT_ID_CONFIG, "broker-healthy-admin");
try (AdminClient admin = AdminClient.create(config)) {
Collection<Node> nodes = admin.describeCluster().nodes().get();
Expand All @@ -171,6 +215,17 @@ private void isBrokerHealthy() {
});
}

public static String getBootstrapServers(GenericContainer<?> kafka) {
if (kafka != null) {
if (kafka instanceof StrimziKafkaContainer strimzi) {
return strimzi.getBootstrapServers();
} else if (kafka instanceof KafkaContainer k) {
return k.getBootstrapServers();
}
}
return null;
}

@Target({ ElementType.FIELD, ElementType.PARAMETER })
@Retention(RetentionPolicy.RUNTIME)
public @interface KafkaBootstrapServers {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package io.smallrye.reactive.messaging.kafka.companion.test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

import com.github.dockerjava.api.command.InspectContainerResponse;

/**
* Copied from testcontainers with some modifications
*/
public class KafkaContainer extends GenericContainer<KafkaContainer> {

private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("apache/kafka");

private static final DockerImageName APACHE_KAFKA_NATIVE_IMAGE_NAME = DockerImageName.parse("apache/kafka-native");

private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";

private static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";

private static final String PROTOCOL_PREFIX = "TC";

static final int KAFKA_PORT = 9092;

static final String STARTER_SCRIPT = "/tmp/testcontainers_start.sh";

static final String[] COMMAND = {
"sh",
"-c",
"while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT,
};

static final WaitStrategy WAIT_STRATEGY = Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1);

static Map<String, String> envVars() {
Map<String, String> envVars = new HashMap<>();
envVars.put("CLUSTER_ID", DEFAULT_CLUSTER_ID);

envVars.put(
"KAFKA_LISTENERS",
"PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094");
envVars.put(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT");
envVars.put("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
envVars.put("KAFKA_PROCESS_ROLES", "broker,controller");
envVars.put("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");

envVars.put("KAFKA_NODE_ID", "1");

String controllerQuorumVoters = String.format("%s@localhost:9094", envVars.get("KAFKA_NODE_ID"));
envVars.put("KAFKA_CONTROLLER_QUORUM_VOTERS", controllerQuorumVoters);

envVars.put("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
envVars.put("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
envVars.put("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
envVars.put("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
envVars.put("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
envVars.put("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
return envVars;
}

static void resolveListeners(GenericContainer<?> kafkaContainer, Set<String> listenersSuppliers) {
Set<String> listeners = Arrays
.stream(kafkaContainer.getEnvMap().get("KAFKA_LISTENERS").split(","))
.collect(Collectors.toSet());
Set<String> listenerSecurityProtocolMap = Arrays
.stream(kafkaContainer.getEnvMap().get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP").split(","))
.collect(Collectors.toSet());

List<String> listenersToTransform = new ArrayList<>(listenersSuppliers);
for (int i = 0; i < listenersToTransform.size(); i++) {
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
String listener = listenersToTransform.get(i);
String listenerHost = listener.split(":")[0];
String listenerPort = listener.split(":")[1];
String listenerProtocol = String.format("%s://%s:%s", protocol, listenerHost, listenerPort);
String protocolMap = String.format("%s:PLAINTEXT", protocol);
listeners.add(listenerProtocol);
listenerSecurityProtocolMap.add(protocolMap);

String host = listener.split(":")[0];
kafkaContainer.withNetworkAliases(host);
}

String kafkaListeners = String.join(",", listeners);
String kafkaListenerSecurityProtocolMap = String.join(",", listenerSecurityProtocolMap);

kafkaContainer.getEnvMap().put("KAFKA_LISTENERS", kafkaListeners);
kafkaContainer.getEnvMap().put("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", kafkaListenerSecurityProtocolMap);
}

static List<String> resolveAdvertisedListeners(Set<Supplier<String>> listenerSuppliers) {
List<String> advertisedListeners = new ArrayList<>();
List<Supplier<String>> listenersToTransform = new ArrayList<>(listenerSuppliers);
for (int i = 0; i < listenersToTransform.size(); i++) {
Supplier<String> listenerSupplier = listenersToTransform.get(i);
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
String listener = listenerSupplier.get();
String listenerProtocol = String.format("%s://%s", protocol, listener);
advertisedListeners.add(listenerProtocol);
}
return advertisedListeners;
}

private final Set<String> listeners = new LinkedHashSet<>();

private final Set<Supplier<String>> advertisedListeners = new LinkedHashSet<>();

public KafkaContainer(String imageName) {
this(DockerImageName.parse(imageName));
}

public KafkaContainer(DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME, APACHE_KAFKA_NATIVE_IMAGE_NAME);

withExposedPorts(KAFKA_PORT);
withEnv(envVars());

withCommand(COMMAND);
waitingFor(WAIT_STRATEGY);
}

@Override
protected void configure() {
resolveListeners(this, this.listeners);
}

@Override
protected void containerIsStarting(InspectContainerResponse containerInfo) {
String brokerAdvertisedListener = String.format(
"BROKER://%s:%s",
containerInfo.getConfig().getHostName(),
"9093");
List<String> advertisedListeners = new ArrayList<>();
advertisedListeners.add("PLAINTEXT://" + getBootstrapServers());
advertisedListeners.add(brokerAdvertisedListener);

advertisedListeners.addAll(resolveAdvertisedListeners(this.advertisedListeners));
String kafkaAdvertisedListeners = String.join(",", advertisedListeners);

String command = "#!/bin/bash\n";
// exporting KAFKA_ADVERTISED_LISTENERS with the container hostname
command += String.format("export KAFKA_ADVERTISED_LISTENERS=%s\n", kafkaAdvertisedListeners);

command += "/etc/kafka/docker/run \n";
copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
}

public KafkaContainer withPort(int fixedPort) {
if (fixedPort <= 0) {
throw new IllegalArgumentException("The fixed Kafka port must be greater than 0");
}
addFixedExposedPort(fixedPort, 9092);
return this;
}

public KafkaContainer withEnv(Map<String, String> envVars) {
super.withEnv(envVars);
return this;
}

public String getBootstrapServers() {
return String.format("%s:%s", getHost(), getMappedPort(KAFKA_PORT));
}
}
Loading