Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,6 @@

package org.opensearch.plugin.kafka;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
Expand All @@ -22,40 +16,24 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Assert;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import static org.hamcrest.Matchers.is;
import static org.awaitility.Awaitility.await;

/**
* Integration test for Kafka ingestion
*/
@ThreadLeakFilters(filters = TestContainerWatchdogThreadLeakFilter.class)
public class IngestFromKafkaIT extends OpenSearchIntegTestCase {
static final String topicName = "test";

private KafkaContainer kafka;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(KafkaPlugin.class);
}

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IngestFromKafkaIT extends KafkaIngestionBaseIT {
/**
* test ingestion-kafka-plugin is installed
*/
Expand All @@ -75,128 +53,86 @@ public void testPluginsAreInstalled() {
}

public void testKafkaIngestion() {
try {
setupKafka();
// create an index with ingestion source from kafka
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);
produceData("1", "name1", "24");
produceData("2", "name2", "20");

createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("index.replication.type", "SEGMENT")
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test");
SearchResponse response = client().prepareSearch("test").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
});
} finally {
stopKafka();
}
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test");
SearchResponse response = client().prepareSearch("test").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
});
}

public void testKafkaIngestion_RewindByTimeStamp() {
try {
setupKafka();
// create an index with ingestion source from kafka
createIndex(
"test_rewind_by_timestamp",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "rewind_by_timestamp")
// 1739459500000 is the timestamp of the first message
// 1739459800000 is the timestamp of the second message
// by resetting to 1739459600000, only the second message will be ingested
.put("ingestion_source.pointer.init.reset.value", "1739459600000")
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.param.auto.offset.reset", "latest")
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);
produceData("1", "name1", "24", 1739459500000L);
produceData("2", "name2", "20", 1739459800000L);

// create an index with ingestion source from kafka
createIndex(
"test_rewind_by_timestamp",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "rewind_by_timestamp")
// 1739459500000 is the timestamp of the first message
// 1739459800000 is the timestamp of the second message
// by resetting to 1739459600000, only the second message will be ingested
.put("ingestion_source.pointer.init.reset.value", "1739459600000")
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.param.auto.offset.reset", "latest")
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test_rewind_by_timestamp");
SearchResponse response = client().prepareSearch("test_rewind_by_timestamp").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
});
} finally {
stopKafka();
}
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test_rewind_by_timestamp");
SearchResponse response = client().prepareSearch("test_rewind_by_timestamp").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
});
}

public void testKafkaIngestion_RewindByOffset() {
try {
setupKafka();
// create an index with ingestion source from kafka
createIndex(
"test_rewind_by_offset",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "rewind_by_offset")
.put("ingestion_source.pointer.init.reset.value", "1")
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.param.auto.offset.reset", "latest")
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test_rewind_by_offset");
SearchResponse response = client().prepareSearch("test_rewind_by_offset").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
});
} finally {
stopKafka();
}
}

private void setupKafka() {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
// disable topic auto creation
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");
kafka.start();
prepareKafkaData();
}

private void stopKafka() {
if (kafka != null) {
kafka.stop();
}
}

private void prepareKafkaData() {
String boostrapServers = kafka.getBootstrapServers();
KafkaUtils.createTopic(topicName, 1, boostrapServers);
Properties props = new Properties();
props.put("bootstrap.servers", kafka.getBootstrapServers());
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.send(
new ProducerRecord<>(topicName, null, 1739459500000L, "null", "{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}")
);
producer.send(
new ProducerRecord<>(
topicName,
null,
1739459800000L,
"null",
"{\"_id\":\"2\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"alice\", \"age\": 20}}"
)
produceData("1", "name1", "24");
produceData("2", "name2", "20");
// create an index with ingestion source from kafka
createIndex(
"test_rewind_by_offset",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "rewind_by_offset")
.put("ingestion_source.pointer.init.reset.value", "1")
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.param.auto.offset.reset", "latest")
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);
producer.close();

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test_rewind_by_offset");
SearchResponse response = client().prepareSearch("test_rewind_by_offset").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.kafka;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

/**
* Base test class for Kafka ingestion tests
*/
@ThreadLeakFilters(filters = TestContainerThreadLeakFilter.class)
public class KafkaIngestionBaseIT extends OpenSearchIntegTestCase {
static final String topicName = "test";
static final String indexName = "testindex";
static final String mapping = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}";
static final long defaultMessageTimestamp = 1739459500000L;

protected KafkaContainer kafka;
protected Producer<String, String> producer;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(KafkaPlugin.class);
}

@Before
private void setup() {
setupKafka();
}

@After
private void cleanup() {
stopKafka();
}

private void setupKafka() {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
// disable topic auto creation
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");
kafka.start();

// setup producer
String boostrapServers = kafka.getBootstrapServers();
KafkaUtils.createTopic(topicName, 1, boostrapServers);
Properties props = new Properties();
props.put("bootstrap.servers", kafka.getBootstrapServers());
producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
}

private void stopKafka() {
if (producer != null) {
producer.close();
}

if (kafka != null) {
kafka.stop();
}
}

protected void produceData(String id, String name, String age) {
produceData(id, name, age, defaultMessageTimestamp);
}

protected void produceData(String id, String name, String age, long timestamp) {
String payload = String.format(
Locale.ROOT,
"{\"_id\":\"%s\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"%s\", \"age\": %s}}",
id,
name,
age
);
producer.send(new ProducerRecord<>(topicName, null, timestamp, "null", payload));
}

protected void waitForSearchableDocs(long docCount, List<String> nodes) throws Exception {
assertBusy(() -> {
for (String node : nodes) {
final SearchResponse response = client(node).prepareSearch(indexName).setSize(0).setPreference("_only_local").get();
final long hits = response.getHits().getTotalHits().value();
if (hits < docCount) {
fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits);
}
}
}, 1, TimeUnit.MINUTES);
}
}
Loading
Loading