Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Rule Based Auto-tagging] Add rule schema for auto tagging ([#17238](https://github.com/opensearch-project/OpenSearch/pull/17238))
- Renaming the node role search to warm ([#17573](https://github.com/opensearch-project/OpenSearch/pull/17573))
- Introduce a new search node role to hold search only shards ([#17620](https://github.com/opensearch-project/OpenSearch/pull/17620))
- Fix systemd integTest on deb regarding path ownership check ([#17641](https://github.com/opensearch-project/OpenSearch/pull/17641))
- Fix systemd integTest on deb regarding path ownership check ([#17641](https://github.com/opensearch-project/OpenSearch/pull/17641))
- Add dfs transformation function in XContentMapValues ([#17612](https://github.com/opensearch-project/OpenSearch/pull/17612))
- Added Kinesis support as a plugin for the pull-based ingestion ([#17615](https://github.com/opensearch-project/OpenSearch/pull/17615))
- [Security Manager Replacement] Create initial Java Agent to intercept Socket::connect calls ([#17724](https://github.com/opensearch-project/OpenSearch/pull/17724))
- Add ingestion management APIs for pause, resume and get ingestion state ([#17631](https://github.com/opensearch-project/OpenSearch/pull/17631))

### Changed
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ private void check(Element element) {
case INTERFACE:
case ENUM:
case ANNOTATION_TYPE:
case RECORD:
if (level(element) >= CLASS) {
checkComment(element);
for (var subElement : element.getEnclosedElements()) {
Expand Down Expand Up @@ -343,7 +344,7 @@ private boolean isGenerated(Element element) {
if (!isGenerated && element.getEnclosingElement() != null) {
// check if enclosing element is generated
return isGenerated(element.getEnclosingElement());
}
}

return isGenerated;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import static org.awaitility.Awaitility.await;

/**
* Integration test for Kafka ingestion
* Integration test for Kafka ingestion.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IngestFromKafkaIT extends KafkaIngestionBaseIT {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse;
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
import org.opensearch.action.pagination.PageParams;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.transport.client.Requests;
import org.junit.After;
import org.junit.Before;

Expand All @@ -28,6 +33,7 @@
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.testcontainers.containers.KafkaContainer;
Expand All @@ -45,6 +51,7 @@ public class KafkaIngestionBaseIT extends OpenSearchIntegTestCase {

protected KafkaContainer kafka;
protected Producer<String, String> producer;
protected int numKafkaPartitions = 1;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand All @@ -53,23 +60,23 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {

@Before
private void setup() {
setupKafka();
setupKafka(numKafkaPartitions);
}

@After
private void cleanup() {
stopKafka();
}

private void setupKafka() {
private void setupKafka(int numKafkaPartitions) {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
// disable topic auto creation
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");
kafka.start();

// setup producer
String boostrapServers = kafka.getBootstrapServers();
KafkaUtils.createTopic(topicName, 1, boostrapServers);
KafkaUtils.createTopic(topicName, numKafkaPartitions, boostrapServers);
Properties props = new Properties();
props.put("bootstrap.servers", kafka.getBootstrapServers());
producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
Expand Down Expand Up @@ -112,6 +119,11 @@ protected void waitForSearchableDocs(long docCount, List<String> nodes) throws E
}, 1, TimeUnit.MINUTES);
}

protected long getSearchableDocCount(String node) throws Exception {
final SearchResponse response = client(node).prepareSearch(indexName).setSize(0).setPreference("_only_local").get();
return response.getHits().getTotalHits().value();
}

protected void waitForState(Callable<Boolean> checkState) throws Exception {
assertBusy(() -> {
if (checkState.call() == false) {
Expand All @@ -124,7 +136,33 @@ protected String getSettings(String indexName, String setting) {
return client().admin().indices().prepareGetSettings(indexName).get().getSetting(indexName, setting);
}

protected GetIngestionStateResponse getIngestionState(String indexName) throws ExecutionException, InterruptedException {
return client().admin().indices().getIngestionState(Requests.getIngestionStateRequest(indexName)).get();
}

protected GetIngestionStateResponse getIngestionState(String[] indexNames, int[] shards) throws ExecutionException,
InterruptedException {
return client().admin().indices().getIngestionState(Requests.getIngestionStateRequest(indexNames, shards, null)).get();
}

protected GetIngestionStateResponse getIngestionState(String[] indexNames, int[] shards, PageParams pageParams)
throws ExecutionException, InterruptedException {
return client().admin().indices().getIngestionState(Requests.getIngestionStateRequest(indexNames, shards, pageParams)).get();
}

protected PauseIngestionResponse pauseIngestion(String indexName) throws ExecutionException, InterruptedException {
return client().admin().indices().pauseIngestion(Requests.pauseIngestionRequest(indexName)).get();
}

protected ResumeIngestionResponse resumeIngestion(String indexName) throws ExecutionException, InterruptedException {
return client().admin().indices().resumeIngestion(Requests.resumeIngestionRequest(indexName)).get();
}

protected void createIndexWithDefaultSettings(int numShards, int numReplicas) {
createIndexWithDefaultSettings(indexName, numShards, numReplicas);
}

protected void createIndexWithDefaultSettings(String indexName, int numShards, int numReplicas) {
createIndex(
indexName,
Settings.builder()
Expand All @@ -142,4 +180,9 @@ protected void createIndexWithDefaultSettings(int numShards, int numReplicas) {
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);
}

protected void recreateKafkaTopics(int numKafkaPartitions) {
cleanup();
setupKafka(numKafkaPartitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@

package org.opensearch.plugin.kafka;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse;
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
import org.opensearch.action.pagination.PageParams;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
Expand All @@ -19,14 +25,19 @@
import org.opensearch.transport.client.Requests;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import static org.hamcrest.Matchers.is;

/**
* Integration tests for segment replication with remote store using kafka as ingestion source.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
@ThreadLeakFilters(filters = TestContainerThreadLeakFilter.class)
public class RemoteStoreKafkaIT extends KafkaIngestionBaseIT {
private static final String REPOSITORY_NAME = "test-remote-store-repo";
private Path absolutePath;
Expand Down Expand Up @@ -154,6 +165,151 @@ public void testErrorStrategy() throws Exception {
waitForSearchableDocs(2, Arrays.asList(node));
}

public void testPauseAndResumeIngestion() throws Exception {
// setup nodes and index
produceData("1", "name1", "24");
produceData("2", "name2", "20");
internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();

createIndexWithDefaultSettings(1, 1);
ensureGreen(indexName);
waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB));

// pause ingestion
PauseIngestionResponse pauseResponse = pauseIngestion(indexName);
assertTrue(pauseResponse.isAcknowledged());
assertTrue(pauseResponse.isShardsAcknowledged());
waitForState(() -> {
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
return Arrays.stream(ingestionState.getShardStates())
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"));
});

// verify ingestion state is persisted
produceData("3", "name3", "30");
produceData("4", "name4", "31");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));
ensureYellowAndNoInitializingShards(indexName);
assertTrue(nodeB.equals(primaryNodeName(indexName)));

final String nodeC = internalCluster().startDataOnlyNode();
client().admin().cluster().prepareReroute().add(new AllocateReplicaAllocationCommand(indexName, 0, nodeC)).get();
ensureGreen(indexName);
assertTrue(nodeC.equals(replicaNodeName(indexName)));
assertEquals(2, getSearchableDocCount(nodeB));
waitForState(() -> {
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
return Arrays.stream(ingestionState.getShardStates())
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"));
});

// resume ingestion
ResumeIngestionResponse resumeResponse = resumeIngestion(indexName);
assertTrue(resumeResponse.isAcknowledged());
assertTrue(resumeResponse.isShardsAcknowledged());
waitForState(() -> {
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
return Arrays.stream(ingestionState.getShardStates())
.allMatch(
state -> state.isPollerPaused() == false
&& (state.pollerState().equalsIgnoreCase("polling") || state.pollerState().equalsIgnoreCase("processing"))
);
});
waitForSearchableDocs(4, Arrays.asList(nodeB, nodeC));
}

public void testDefaultGetIngestionState() throws ExecutionException, InterruptedException {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
createIndexWithDefaultSettings(1, 1);
ensureGreen(indexName);

GetIngestionStateResponse ingestionState = getIngestionState(new String[] { indexName }, new int[] { 0 });
assertEquals(0, ingestionState.getFailedShards());
assertEquals(1, ingestionState.getSuccessfulShards());
assertEquals(1, ingestionState.getTotalShards());
assertEquals(1, ingestionState.getShardStates().length);
assertEquals(0, ingestionState.getShardStates()[0].shardId());
assertEquals("POLLING", ingestionState.getShardStates()[0].pollerState());
assertEquals("DROP", ingestionState.getShardStates()[0].errorPolicy());
assertFalse(ingestionState.getShardStates()[0].isPollerPaused());

GetIngestionStateResponse ingestionStateForInvalidShard = getIngestionState(new String[] { indexName }, new int[] { 1 });
assertEquals(0, ingestionStateForInvalidShard.getTotalShards());
}

public void testPaginatedGetIngestionState() throws ExecutionException, InterruptedException {
recreateKafkaTopics(5);
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
createIndexWithDefaultSettings("index1", 5, 0);
createIndexWithDefaultSettings("index2", 5, 0);
ensureGreen("index1");
ensureGreen("index2");

List<GetIngestionStateResponse> ingestionStateResponseList = new ArrayList<>();
GetIngestionStateResponse ingestionStatePage = null;
while (ingestionStatePage == null || ingestionStatePage.getNextPageToken() != null) {
String nextToken = ingestionStatePage == null ? null : ingestionStatePage.getNextPageToken();
PageParams pageParams = new PageParams(nextToken, "asc", 3);
ingestionStatePage = getIngestionState(new String[] { "index1", "index2" }, new int[] { 0, 1, 2, 3, 4 }, pageParams);
ingestionStateResponseList.add(ingestionStatePage);
}

// we have 2 index, each with 5 shards, total of 10 shards
// for page size of 3, we expect 4 pages in total
assertEquals(4, ingestionStateResponseList.size());

// validate page 1
GetIngestionStateResponse responsePage1 = ingestionStateResponseList.get(0);
assertEquals(3, responsePage1.getTotalShards());
assertEquals(3, responsePage1.getSuccessfulShards());
assertEquals(3, responsePage1.getShardStates().length);
assertTrue(Arrays.stream(responsePage1.getShardStates()).allMatch(shardIngestionState -> {
boolean shardsMatch = Set.of(0, 1, 2).contains(shardIngestionState.shardId());
boolean indexMatch = "index1".equalsIgnoreCase(shardIngestionState.index());
return indexMatch && shardsMatch;
}));

// validate page 2
GetIngestionStateResponse responsePage2 = ingestionStateResponseList.get(1);
assertEquals(3, responsePage2.getTotalShards());
assertEquals(3, responsePage2.getSuccessfulShards());
assertEquals(3, responsePage2.getShardStates().length);
assertTrue(Arrays.stream(responsePage2.getShardStates()).allMatch(shardIngestionState -> {
boolean matchIndex1 = Set.of(3, 4).contains(shardIngestionState.shardId())
&& "index1".equalsIgnoreCase(shardIngestionState.index());
boolean matchIndex2 = shardIngestionState.shardId() == 0 && "index2".equalsIgnoreCase(shardIngestionState.index());
return matchIndex1 || matchIndex2;
}));

// validate page 3
GetIngestionStateResponse responsePage3 = ingestionStateResponseList.get(2);
assertEquals(3, responsePage3.getTotalShards());
assertEquals(3, responsePage3.getSuccessfulShards());
assertEquals(3, responsePage3.getShardStates().length);
assertTrue(Arrays.stream(responsePage3.getShardStates()).allMatch(shardIngestionState -> {
boolean shardsMatch = Set.of(1, 2, 3).contains(shardIngestionState.shardId());
boolean indexMatch = "index2".equalsIgnoreCase(shardIngestionState.index());
return indexMatch && shardsMatch;
}));

// validate page 4
GetIngestionStateResponse responsePage4 = ingestionStateResponseList.get(3);
assertEquals(1, responsePage4.getTotalShards());
assertEquals(1, responsePage4.getSuccessfulShards());
assertEquals(1, responsePage4.getShardStates().length);
assertTrue(Arrays.stream(responsePage4.getShardStates()).allMatch(shardIngestionState -> {
boolean shardsMatch = shardIngestionState.shardId() == 4;
boolean indexMatch = "index2".equalsIgnoreCase(shardIngestionState.index());
return indexMatch && shardsMatch;
}));
}

private void verifyRemoteStoreEnabled(String node) {
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
* The {@link org.testcontainers.images.TimeLimitedLoggedPullImageResultCallback} instance used by test containers,
* for example {@link org.testcontainers.containers.KafkaContainer} creates a watcher daemon thread which is never
* stopped. This filter excludes that thread from the thread leak detection logic. It also excludes ryuk resource reaper
* thread which is not closed on time.
* thread and pollers which is not closed on time.
*/
public final class TestContainerThreadLeakFilter implements ThreadFilter {
@Override
public boolean reject(Thread t) {
return t.getName().startsWith("testcontainers-pull-watchdog-") || t.getName().startsWith("testcontainers-ryuk");
return t.getName().startsWith("testcontainers-pull-watchdog-")
|| t.getName().startsWith("testcontainers-ryuk")
|| t.getName().startsWith("stream-poller-consumer");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void testKinesisIngestion_RewindByOffset() throws InterruptedException {
);

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test_rewind_by_offset");
SearchResponse response = client().prepareSearch("test_rewind_by_offset").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(2L));
Expand Down
Loading
Loading