diff --git a/smallrye-reactive-messaging-kafka-test-companion/revapi.json b/smallrye-reactive-messaging-kafka-test-companion/revapi.json index 38e24d82df..62373e215e 100644 --- a/smallrye-reactive-messaging-kafka-test-companion/revapi.json +++ b/smallrye-reactive-messaging-kafka-test-companion/revapi.json @@ -24,7 +24,52 @@ "criticality" : "highlight", "minSeverity" : "POTENTIALLY_BREAKING", "minCriticality" : "documented", - "differences" : [ ] + "differences" : [ { + "code": "java.field.constantValueChanged", + "old": "field io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension.KAFKA_VERSION", + "new": "field io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension.KAFKA_VERSION", + "justification": "Default kafka version to 4.1.0" + }, + { + "code": "java.field.typeChanged", + "old": "field io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension.kafka", + "new": "field io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension.kafka", + "justification": "Kafka broker container is no longer restricted to StrimziKafkaContainer" + }, + { + "code": "java.method.visibilityReduced", + "old": "method T io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::configureKafkaContainer(T)", + "new": "method io.smallrye.reactive.messaging.kafka.companion.test.KafkaContainer io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::configureKafkaContainer(io.smallrye.reactive.messaging.kafka.companion.test.KafkaContainer)", + "oldVisibility": "public", + "newVisibility": "private", + "justification": "Kafka broker container is no longer restricted to StrimziKafkaContainer" + }, + { + "code": "java.method.returnTypeChanged", + "old": "method io.strimzi.test.container.StrimziKafkaContainer io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::createKafkaContainer()", + "new": "method org.testcontainers.containers.GenericContainer io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::createKafkaContainer()", + "justification": "Kafka broker container is no longer restricted to StrimziKafkaContainer" + }, + { + "code": "java.method.parameterTypeChanged", + "old": "parameter io.strimzi.test.container.StrimziKafkaContainer io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::restart(===io.strimzi.test.container.StrimziKafkaContainer===, int)", + "new": "parameter org.testcontainers.containers.GenericContainer io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::restart(===org.testcontainers.containers.GenericContainer===, int)", + "parameterIndex": "0", + "justification": "Kafka broker container is no longer restricted to StrimziKafkaContainer" + }, + { + "code": "java.method.returnTypeChanged", + "old": "method io.strimzi.test.container.StrimziKafkaContainer io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::restart(io.strimzi.test.container.StrimziKafkaContainer, int)", + "new": "method org.testcontainers.containers.GenericContainer io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::restart(org.testcontainers.containers.GenericContainer, int)", + "justification": "Kafka broker container is no longer restricted to StrimziKafkaContainer" + }, + { + "code": "java.method.returnTypeChanged", + "old": "method io.strimzi.test.container.StrimziKafkaContainer io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::startKafkaBroker(int)", + "new": "method org.testcontainers.containers.GenericContainer io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension::startKafkaBroker(int)", + "justification": "Kafka broker container is no longer restricted to StrimziKafkaContainer" + } + ] } }, { "extension" : "revapi.reporter.json", @@ -43,4 +88,4 @@ "minCriticality" : "documented", "output" : "out" } -} ] \ No newline at end of file +} ] diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaBrokerExtension.java b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaBrokerExtension.java index 984d431b2c..83962c3387 100644 --- a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaBrokerExtension.java +++ b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaBrokerExtension.java @@ -19,19 +19,22 @@ import org.jboss.logging.Logger; import org.junit.jupiter.api.extension.*; import org.junit.jupiter.api.extension.ExtensionContext.Store.CloseableResource; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.shaded.org.awaitility.core.ConditionTimeoutException; import io.strimzi.test.container.StrimziKafkaContainer; /** - * Junit extension for creating Strimzi Kafka broker + * Junit extension for creating Kafka broker */ public class KafkaBrokerExtension implements BeforeAllCallback, BeforeEachCallback, ParameterResolver, CloseableResource { public static final Logger LOGGER = Logger.getLogger(KafkaBrokerExtension.class.getName()); - public static final String KAFKA_VERSION = "4.0.0"; + public static final String KAFKA_IMAGE = "apache/kafka"; + public static final String KAFKA_VERSION = "4.2.0-rc4"; + public static final String STRIMZI_VERSION = "4.1.0"; - protected StrimziKafkaContainer kafka; + protected GenericContainer kafka; @Override public void beforeAll(ExtensionContext context) { @@ -50,16 +53,54 @@ public void close() { stopKafkaBroker(); } - public static StrimziKafkaContainer createKafkaContainer() { - return configureKafkaContainer(new StrimziKafkaContainer()); + public static GenericContainer createKafkaContainer() { + String kafkaImage = System.getProperty("kafka-container-image", KAFKA_IMAGE); + String kafkaVersion = System.getProperty("kafka-container-version", KAFKA_VERSION); + String strimziVersion = System.getProperty("strimzi-container-version", STRIMZI_VERSION); + GenericContainer> kafka; + if (kafkaImage.contains("strimzi")) { + kafka = new StrimziKafkaContainer(kafkaImage + ":latest-kafka-" + strimziVersion); + } else { + kafka = new KafkaContainer(kafkaImage + ":" + kafkaVersion); + } + return configureContainer(kafka); } - public static T configureKafkaContainer(T container) { - String kafkaVersion = System.getProperty("kafka-container-version", KAFKA_VERSION); - container.withKafkaVersion(kafkaVersion); + public static > GenericContainer configureContainer(T container) { + if (container instanceof StrimziKafkaContainer s) { + return configureStrimziContainer(s); + } else if (container instanceof KafkaContainer k) { + return configureKafkaContainer(k); + } else { + return container; + } + } + + private static KafkaContainer configureKafkaContainer(KafkaContainer container) { + Map envVars = Map.of( + "KAFKA_LOG_CLEANER_ENABLE", "false", + "KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0", + "KAFKA_UNSTABLE_API_VERSIONS_ENABLE", "true", + "KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS", "classic,consumer,share", + "KAFKA_GROUP_SHARE_ENABLE", "true", + // "KAFKA_SHARE_COORDINATOR_APPEND_LINGER_MS", "-1", + // a single node topic needs to have 1 as replication factor + "KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR", "1"); + return container.withEnv(envVars); + } + + public static StrimziKafkaContainer configureStrimziContainer(StrimziKafkaContainer container) { + String strimziVersion = System.getProperty("strimzi-container-version", + System.getProperty("kafka-container-version", STRIMZI_VERSION)); + container.withKafkaVersion(strimziVersion); Map config = new HashMap<>(); config.put("log.cleaner.enable", "false"); config.put("group.initial.rebalance.delay.ms", "0"); + config.put("unstable.api.versions.enable", "true"); + config.put("group.coordinator.rebalance.protocols", "classic,consumer,share"); + config.put("group.share.enable", "true"); + // a single node topic needs to have 1 as replication factor + config.put("share.coordinator.state.topic.replication.factor", "1"); container.withKafkaConfigurationMap(config); return container; } @@ -67,7 +108,7 @@ public static T configureKafkaContainer(T cont public void startKafkaBroker() { kafka = createKafkaContainer(); kafka.start(); - LOGGER.info("Kafka broker started: " + kafka.getBootstrapServers() + " (" + kafka.getMappedPort(9092) + ")"); + LOGGER.info("Kafka broker started: " + getBootstrapServers(kafka) + " (" + kafka.getMappedPort(9092) + ")"); await().until(() -> kafka.isRunning()); } @@ -80,7 +121,7 @@ public void startKafkaBroker() { * @param gracePeriodInSecond number of seconds to wait before restarting * @return the new broker */ - public static StrimziKafkaContainer restart(StrimziKafkaContainer kafka, int gracePeriodInSecond) { + public static GenericContainer restart(GenericContainer kafka, int gracePeriodInSecond) { int port = kafka.getMappedPort(9092); try { kafka.close(); @@ -93,8 +134,13 @@ public static StrimziKafkaContainer restart(StrimziKafkaContainer kafka, int gra return startKafkaBroker(port); } - public static StrimziKafkaContainer startKafkaBroker(int port) { - StrimziKafkaContainer kafka = createKafkaContainer().withPort(port); + public static GenericContainer startKafkaBroker(int port) { + GenericContainer kafka = createKafkaContainer(); + if (kafka instanceof StrimziKafkaContainer strimzi) { + strimzi.withPort(port); + } else if (kafka instanceof KafkaContainer k) { + k.withPort(port); + } kafka.start(); await().until(kafka::isRunning); return kafka; @@ -132,9 +178,7 @@ public Object resolveParameter(ParameterContext parameterContext, ExtensionConte if (parameterContext.isAnnotated(KafkaBootstrapServers.class)) { ExtensionContext.Store globalStore = extensionContext.getRoot().getStore(GLOBAL); KafkaBrokerExtension extension = (KafkaBrokerExtension) globalStore.get(KafkaBrokerExtension.class); - if (extension.kafka != null) { - return extension.kafka.getBootstrapServers(); - } + return getBootstrapServers(extension.kafka); } return null; } @@ -162,7 +206,7 @@ private void isBrokerHealthy() { await().until(() -> kafka.isRunning()); await().catchUncaughtExceptions().until(() -> { Map config = new HashMap<>(); - config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers(kafka)); config.put(CommonClientConfigs.CLIENT_ID_CONFIG, "broker-healthy-admin"); try (AdminClient admin = AdminClient.create(config)) { Collection nodes = admin.describeCluster().nodes().get(); @@ -171,6 +215,17 @@ private void isBrokerHealthy() { }); } + public static String getBootstrapServers(GenericContainer kafka) { + if (kafka != null) { + if (kafka instanceof StrimziKafkaContainer strimzi) { + return strimzi.getBootstrapServers(); + } else if (kafka instanceof KafkaContainer k) { + return k.getBootstrapServers(); + } + } + return null; + } + @Target({ ElementType.FIELD, ElementType.PARAMETER }) @Retention(RetentionPolicy.RUNTIME) public @interface KafkaBootstrapServers { diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaContainer.java b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaContainer.java new file mode 100644 index 0000000000..cf0b9f2b62 --- /dev/null +++ b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaContainer.java @@ -0,0 +1,180 @@ +package io.smallrye.reactive.messaging.kafka.companion.test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.wait.strategy.WaitStrategy; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.utility.DockerImageName; + +import com.github.dockerjava.api.command.InspectContainerResponse; + +/** + * Copied from testcontainers with some modifications + */ +public class KafkaContainer extends GenericContainer { + + private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("apache/kafka"); + + private static final DockerImageName APACHE_KAFKA_NATIVE_IMAGE_NAME = DockerImageName.parse("apache/kafka-native"); + + private static final String DEFAULT_INTERNAL_TOPIC_RF = "1"; + + private static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw"; + + private static final String PROTOCOL_PREFIX = "TC"; + + static final int KAFKA_PORT = 9092; + + static final String STARTER_SCRIPT = "/tmp/testcontainers_start.sh"; + + static final String[] COMMAND = { + "sh", + "-c", + "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT, + }; + + static final WaitStrategy WAIT_STRATEGY = Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1); + + static Map envVars() { + Map envVars = new HashMap<>(); + envVars.put("CLUSTER_ID", DEFAULT_CLUSTER_ID); + + envVars.put( + "KAFKA_LISTENERS", + "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094"); + envVars.put( + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", + "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"); + envVars.put("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); + envVars.put("KAFKA_PROCESS_ROLES", "broker,controller"); + envVars.put("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER"); + + envVars.put("KAFKA_NODE_ID", "1"); + + String controllerQuorumVoters = String.format("%s@localhost:9094", envVars.get("KAFKA_NODE_ID")); + envVars.put("KAFKA_CONTROLLER_QUORUM_VOTERS", controllerQuorumVoters); + + envVars.put("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); + envVars.put("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF); + envVars.put("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); + envVars.put("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF); + envVars.put("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + ""); + envVars.put("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0"); + return envVars; + } + + static void resolveListeners(GenericContainer kafkaContainer, Set listenersSuppliers) { + Set listeners = Arrays + .stream(kafkaContainer.getEnvMap().get("KAFKA_LISTENERS").split(",")) + .collect(Collectors.toSet()); + Set listenerSecurityProtocolMap = Arrays + .stream(kafkaContainer.getEnvMap().get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP").split(",")) + .collect(Collectors.toSet()); + + List listenersToTransform = new ArrayList<>(listenersSuppliers); + for (int i = 0; i < listenersToTransform.size(); i++) { + String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i); + String listener = listenersToTransform.get(i); + String listenerHost = listener.split(":")[0]; + String listenerPort = listener.split(":")[1]; + String listenerProtocol = String.format("%s://%s:%s", protocol, listenerHost, listenerPort); + String protocolMap = String.format("%s:PLAINTEXT", protocol); + listeners.add(listenerProtocol); + listenerSecurityProtocolMap.add(protocolMap); + + String host = listener.split(":")[0]; + kafkaContainer.withNetworkAliases(host); + } + + String kafkaListeners = String.join(",", listeners); + String kafkaListenerSecurityProtocolMap = String.join(",", listenerSecurityProtocolMap); + + kafkaContainer.getEnvMap().put("KAFKA_LISTENERS", kafkaListeners); + kafkaContainer.getEnvMap().put("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", kafkaListenerSecurityProtocolMap); + } + + static List resolveAdvertisedListeners(Set> listenerSuppliers) { + List advertisedListeners = new ArrayList<>(); + List> listenersToTransform = new ArrayList<>(listenerSuppliers); + for (int i = 0; i < listenersToTransform.size(); i++) { + Supplier listenerSupplier = listenersToTransform.get(i); + String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i); + String listener = listenerSupplier.get(); + String listenerProtocol = String.format("%s://%s", protocol, listener); + advertisedListeners.add(listenerProtocol); + } + return advertisedListeners; + } + + private final Set listeners = new LinkedHashSet<>(); + + private final Set> advertisedListeners = new LinkedHashSet<>(); + + public KafkaContainer(String imageName) { + this(DockerImageName.parse(imageName)); + } + + public KafkaContainer(DockerImageName dockerImageName) { + super(dockerImageName); + dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME, APACHE_KAFKA_NATIVE_IMAGE_NAME); + + withExposedPorts(KAFKA_PORT); + withEnv(envVars()); + + withCommand(COMMAND); + waitingFor(WAIT_STRATEGY); + } + + @Override + protected void configure() { + resolveListeners(this, this.listeners); + } + + @Override + protected void containerIsStarting(InspectContainerResponse containerInfo) { + String brokerAdvertisedListener = String.format( + "BROKER://%s:%s", + containerInfo.getConfig().getHostName(), + "9093"); + List advertisedListeners = new ArrayList<>(); + advertisedListeners.add("PLAINTEXT://" + getBootstrapServers()); + advertisedListeners.add(brokerAdvertisedListener); + + advertisedListeners.addAll(resolveAdvertisedListeners(this.advertisedListeners)); + String kafkaAdvertisedListeners = String.join(",", advertisedListeners); + + String command = "#!/bin/bash\n"; + // exporting KAFKA_ADVERTISED_LISTENERS with the container hostname + command += String.format("export KAFKA_ADVERTISED_LISTENERS=%s\n", kafkaAdvertisedListeners); + + command += "/etc/kafka/docker/run \n"; + copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT); + } + + public KafkaContainer withPort(int fixedPort) { + if (fixedPort <= 0) { + throw new IllegalArgumentException("The fixed Kafka port must be greater than 0"); + } + addFixedExposedPort(fixedPort, 9092); + return this; + } + + public KafkaContainer withEnv(Map envVars) { + super.withEnv(envVars); + return this; + } + + public String getBootstrapServers() { + return String.format("%s:%s", getHost(), getMappedPort(KAFKA_PORT)); + } +} diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaToxiproxyExtension.java b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaToxiproxyExtension.java index ce669aee49..096b7537ad 100644 --- a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaToxiproxyExtension.java +++ b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaToxiproxyExtension.java @@ -22,7 +22,7 @@ public void beforeAll(ExtensionContext context) { KafkaToxiproxyExtension extension = (KafkaToxiproxyExtension) globalStore.get(KafkaToxiproxyExtension.class); if (extension == null) { LOGGER.info("Starting Kafka broker proxy"); - kafka = configureKafkaContainer(new ProxiedStrimziKafkaContainer()); + kafka = configureContainer(new ProxiedStrimziKafkaContainer()); kafka.setNetwork(Network.newNetwork()); kafka.start(); await().until(() -> kafka.isRunning()); @@ -51,7 +51,7 @@ public Object resolveParameter(ParameterContext parameterContext, ExtensionConte if (extension != null) { if (extension.kafka != null) { if (parameterContext.isAnnotated(KafkaBootstrapServers.class)) { - return extension.kafka.getBootstrapServers(); + return KafkaBrokerExtension.getBootstrapServers(extension.kafka); } if (parameterContext.getParameter().getType().equals(KafkaProxy.class)) { return ((ProxiedStrimziKafkaContainer) extension.kafka).getKafkaProxy(); diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaRestartTest.java b/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaRestartTest.java index b3619c4108..cf7f4e4dcb 100644 --- a/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaRestartTest.java +++ b/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/test/KafkaRestartTest.java @@ -5,26 +5,26 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion; -import io.strimzi.test.container.StrimziKafkaContainer; public class KafkaRestartTest { @Test void testRestartedBroker() { - try (StrimziKafkaContainer kafkaContainer = KafkaBrokerExtension.createKafkaContainer()) { + try (GenericContainer kafkaContainer = KafkaBrokerExtension.createKafkaContainer()) { kafkaContainer.start(); await().until(kafkaContainer::isRunning); - String bootstrapServers = kafkaContainer.getBootstrapServers(); + String bootstrapServers = KafkaBrokerExtension.getBootstrapServers(kafkaContainer); try (KafkaCompanion companion = new KafkaCompanion(bootstrapServers)) { companion.produceStrings() .fromRecords(new ProducerRecord<>("topic", "1")) .awaitCompletion(); - StrimziKafkaContainer restarted = KafkaBrokerExtension.restart(kafkaContainer, 2); + GenericContainer restarted = KafkaBrokerExtension.restart(kafkaContainer, 2); - assertThat(restarted.getBootstrapServers()).isEqualTo(bootstrapServers); + assertThat(KafkaBrokerExtension.getBootstrapServers(restarted)).isEqualTo(bootstrapServers); companion.produceStrings() .fromRecords(new ProducerRecord<>("topic", "1")) diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java index 8a0a4bf0d9..1581488d56 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java @@ -32,6 +32,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; import org.testng.Assert; import io.smallrye.common.annotation.Identifier; @@ -51,7 +52,6 @@ import io.smallrye.reactive.messaging.kafka.impl.KafkaSource; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; -import io.strimzi.test.container.StrimziKafkaContainer; public class KafkaSourceTest extends KafkaCompanionTestBase { @@ -220,17 +220,17 @@ public void testBroadcastWithPartitions() { @Tag(TestTags.SLOW) public void testRetry() { // This test need an individual Kafka container - try (StrimziKafkaContainer kafka = KafkaBrokerExtension.createKafkaContainer()) { + try (GenericContainer kafka = KafkaBrokerExtension.createKafkaContainer()) { kafka.start(); await().until(kafka::isRunning); MapBasedConfig config = newCommonConfigForSource() - .with("bootstrap.servers", kafka.getBootstrapServers()) + .with("bootstrap.servers", KafkaBrokerExtension.getBootstrapServers(kafka)) .with("value.deserializer", IntegerDeserializer.class.getName()) .with("retry", true) .with("retry-attempts", 100) .with("retry-max-wait", 30); - KafkaCompanion kafkaCompanion = new KafkaCompanion(kafka.getBootstrapServers()); + KafkaCompanion kafkaCompanion = new KafkaCompanion(KafkaBrokerExtension.getBootstrapServers(kafka)); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, @@ -246,7 +246,7 @@ public void testRetry() { await().atMost(2, TimeUnit.MINUTES).until(() -> messages1.size() >= 10); try (@SuppressWarnings("unused") - StrimziKafkaContainer container = KafkaBrokerExtension.restart(kafka, 2)) { + GenericContainer container = KafkaBrokerExtension.restart(kafka, 2)) { kafkaCompanion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); await().atMost(2, TimeUnit.MINUTES).until(() -> messages1.size() >= 20); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java index d1eb000bbc..4e5acf49d9 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java @@ -13,6 +13,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.helpers.test.AssertSubscriber; @@ -22,7 +23,6 @@ import io.smallrye.reactive.messaging.kafka.impl.KafkaSource; import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; -import io.strimzi.test.container.StrimziKafkaContainer; @Tag(TestTags.SLOW) // TODO should not extend ClientTestBase, it uses KafkaBrokerExtension which creates a broker for tests @@ -30,7 +30,7 @@ public class BrokerRestartTest extends ClientTestBase { @Test public void testAcknowledgementUsingThrottledStrategyEvenAfterBrokerRestart() throws Exception { - try (StrimziKafkaContainer kafka = KafkaBrokerExtension.createKafkaContainer()) { + try (GenericContainer kafka = KafkaBrokerExtension.createKafkaContainer()) { kafka.start(); await().until(kafka::isRunning); @@ -44,7 +44,7 @@ public void testAcknowledgementUsingThrottledStrategyEvenAfterBrokerRestart() th CountDownLatch latch = new CountDownLatch(100); subscribe(stream, latch); - try (final StrimziKafkaContainer ignored = restart(kafka, 3)) { + try (final GenericContainer ignored = restart(kafka, 3)) { sendMessages(0, 100); waitForMessages(latch); checkConsumedMessages(0, 100); @@ -55,13 +55,13 @@ public void testAcknowledgementUsingThrottledStrategyEvenAfterBrokerRestart() th @Test public void testResumingPausingWhileBrokerIsDown() throws Exception { - try (StrimziKafkaContainer kafka = KafkaBrokerExtension.createKafkaContainer()) { + try (GenericContainer kafka = KafkaBrokerExtension.createKafkaContainer()) { kafka.start(); await().until(kafka::isRunning); String groupId = UUID.randomUUID().toString(); MapBasedConfig config = createConsumerConfig(groupId) .with("topic", topic) - .with(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()) + .with(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaBrokerExtension.getBootstrapServers(kafka)) .with(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 6000); createSource(config, groupId); source @@ -88,15 +88,15 @@ public void testResumingPausingWhileBrokerIsDown() throws Exception { @Test public void testPausingWhileBrokerIsDown() throws Exception { - try (StrimziKafkaContainer kafka = KafkaBrokerExtension.createKafkaContainer()) { + try (GenericContainer kafka = KafkaBrokerExtension.createKafkaContainer()) { kafka.start(); await().until(kafka::isRunning); Integer port = kafka.getMappedPort(KAFKA_PORT); - sendMessages(0, 10, kafka.getBootstrapServers()); + sendMessages(0, 10, KafkaBrokerExtension.getBootstrapServers(kafka)); String groupId = UUID.randomUUID().toString(); MapBasedConfig config = createConsumerConfig(groupId) .with("topic", topic) - .with(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()) + .with(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaBrokerExtension.getBootstrapServers(kafka)) .with(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 6000); createSource(config, groupId); Multi> stream = source.getStream(); @@ -113,7 +113,7 @@ public void testPausingWhileBrokerIsDown() throws Exception { await().until(() -> subscriber.getItems().size() == 1); await().until(() -> !source.getConsumer().paused().await().indefinitely().isEmpty()); - sendMessages(0, 10, kafka.getBootstrapServers()); + sendMessages(0, 10, KafkaBrokerExtension.getBootstrapServers(kafka)); kafka.stop(); await().until(() -> !kafka.isRunning()); @@ -131,13 +131,13 @@ public void testPausingWhileBrokerIsDown() throws Exception { return last.get() == last.getAndSet(subscriber.getItems().size()); }); - try (StrimziKafkaContainer restarted = KafkaBrokerExtension.startKafkaBroker(port)) { + try (GenericContainer restarted = KafkaBrokerExtension.startKafkaBroker(port)) { await().until(restarted::isRunning); subscriber.request(100); await().until(() -> source.getConsumer().paused().await().indefinitely().isEmpty()); - sendMessages(10, 45, restarted.getBootstrapServers()); + sendMessages(10, 45, KafkaBrokerExtension.getBootstrapServers(restarted)); await().until(() -> subscriber.getItems().size() == 55); } } @@ -146,7 +146,7 @@ public void testPausingWhileBrokerIsDown() throws Exception { @Test public void testWithBrokerRestart() throws Exception { int sendBatchSize = 10; - try (StrimziKafkaContainer kafka = KafkaBrokerExtension.createKafkaContainer()) { + try (GenericContainer kafka = KafkaBrokerExtension.createKafkaContainer()) { kafka.start(); String groupId = UUID.randomUUID().toString(); MapBasedConfig config = createConsumerConfig(groupId) @@ -156,7 +156,7 @@ public void testWithBrokerRestart() throws Exception { CountDownLatch receiveLatch = new CountDownLatch(sendBatchSize * 2); subscribe(source.getStream(), receiveLatch); sendMessages(0, sendBatchSize); - try (StrimziKafkaContainer ignored = restart(kafka, 5)) { + try (GenericContainer ignored = restart(kafka, 5)) { sendMessages(sendBatchSize, sendBatchSize); waitForMessages(receiveLatch); checkConsumedMessages();