diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java index d6b099c6b24d8..d51569431506a 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java @@ -8,12 +8,6 @@ package org.opensearch.plugin.kafka; -import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; import org.opensearch.action.admin.cluster.node.info.NodeInfo; import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; @@ -22,40 +16,24 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.index.query.RangeQueryBuilder; -import org.opensearch.plugins.Plugin; import org.opensearch.plugins.PluginInfo; import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.Assert; -import java.util.Arrays; -import java.util.Collection; import java.util.List; -import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.utility.DockerImageName; - import static org.hamcrest.Matchers.is; import static org.awaitility.Awaitility.await; /** * Integration test for Kafka ingestion */ -@ThreadLeakFilters(filters = TestContainerWatchdogThreadLeakFilter.class) -public class IngestFromKafkaIT extends OpenSearchIntegTestCase { - static final String topicName = "test"; - - private KafkaContainer kafka; - - @Override - protected Collection> nodePlugins() { - return Arrays.asList(KafkaPlugin.class); - } - +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class IngestFromKafkaIT extends KafkaIngestionBaseIT { /** * test ingestion-kafka-plugin is installed */ @@ -75,128 +53,86 @@ public void testPluginsAreInstalled() { } public void testKafkaIngestion() { - try { - setupKafka(); - // create an index with ingestion source from kafka - createIndex( - "test", - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put("ingestion_source.type", "kafka") - .put("ingestion_source.pointer.init.reset", "earliest") - .put("ingestion_source.param.topic", "test") - .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) - .build(), - "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" - ); + produceData("1", "name1", "24"); + produceData("2", "name2", "20"); + + createIndex( + "test", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.param.topic", "test") + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("index.replication.type", "SEGMENT") + .build(), + "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" + ); - RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21); - await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { - refresh("test"); - SearchResponse response = client().prepareSearch("test").setQuery(query).get(); - assertThat(response.getHits().getTotalHits().value(), is(1L)); - }); - } finally { - stopKafka(); - } + RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21); + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + refresh("test"); + SearchResponse response = client().prepareSearch("test").setQuery(query).get(); + assertThat(response.getHits().getTotalHits().value(), is(1L)); + }); } public void testKafkaIngestion_RewindByTimeStamp() { - try { - setupKafka(); - // create an index with ingestion source from kafka - createIndex( - "test_rewind_by_timestamp", - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put("ingestion_source.type", "kafka") - .put("ingestion_source.pointer.init.reset", "rewind_by_timestamp") - // 1739459500000 is the timestamp of the first message - // 1739459800000 is the timestamp of the second message - // by resetting to 1739459600000, only the second message will be ingested - .put("ingestion_source.pointer.init.reset.value", "1739459600000") - .put("ingestion_source.param.topic", "test") - .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) - .put("ingestion_source.param.auto.offset.reset", "latest") - .build(), - "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" - ); + produceData("1", "name1", "24", 1739459500000L); + produceData("2", "name2", "20", 1739459800000L); + + // create an index with ingestion source from kafka + createIndex( + "test_rewind_by_timestamp", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "rewind_by_timestamp") + // 1739459500000 is the timestamp of the first message + // 1739459800000 is the timestamp of the second message + // by resetting to 1739459600000, only the second message will be ingested + .put("ingestion_source.pointer.init.reset.value", "1739459600000") + .put("ingestion_source.param.topic", "test") + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("ingestion_source.param.auto.offset.reset", "latest") + .build(), + "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" + ); - RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0); - await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { - refresh("test_rewind_by_timestamp"); - SearchResponse response = client().prepareSearch("test_rewind_by_timestamp").setQuery(query).get(); - assertThat(response.getHits().getTotalHits().value(), is(1L)); - }); - } finally { - stopKafka(); - } + RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0); + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + refresh("test_rewind_by_timestamp"); + SearchResponse response = client().prepareSearch("test_rewind_by_timestamp").setQuery(query).get(); + assertThat(response.getHits().getTotalHits().value(), is(1L)); + }); } public void testKafkaIngestion_RewindByOffset() { - try { - setupKafka(); - // create an index with ingestion source from kafka - createIndex( - "test_rewind_by_offset", - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put("ingestion_source.type", "kafka") - .put("ingestion_source.pointer.init.reset", "rewind_by_offset") - .put("ingestion_source.pointer.init.reset.value", "1") - .put("ingestion_source.param.topic", "test") - .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) - .put("ingestion_source.param.auto.offset.reset", "latest") - .build(), - "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" - ); - - RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0); - await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { - refresh("test_rewind_by_offset"); - SearchResponse response = client().prepareSearch("test_rewind_by_offset").setQuery(query).get(); - assertThat(response.getHits().getTotalHits().value(), is(1L)); - }); - } finally { - stopKafka(); - } - } - - private void setupKafka() { - kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) - // disable topic auto creation - .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false"); - kafka.start(); - prepareKafkaData(); - } - - private void stopKafka() { - if (kafka != null) { - kafka.stop(); - } - } - - private void prepareKafkaData() { - String boostrapServers = kafka.getBootstrapServers(); - KafkaUtils.createTopic(topicName, 1, boostrapServers); - Properties props = new Properties(); - props.put("bootstrap.servers", kafka.getBootstrapServers()); - Producer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); - producer.send( - new ProducerRecord<>(topicName, null, 1739459500000L, "null", "{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}") - ); - producer.send( - new ProducerRecord<>( - topicName, - null, - 1739459800000L, - "null", - "{\"_id\":\"2\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"alice\", \"age\": 20}}" - ) + produceData("1", "name1", "24"); + produceData("2", "name2", "20"); + // create an index with ingestion source from kafka + createIndex( + "test_rewind_by_offset", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "rewind_by_offset") + .put("ingestion_source.pointer.init.reset.value", "1") + .put("ingestion_source.param.topic", "test") + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("ingestion_source.param.auto.offset.reset", "latest") + .build(), + "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" ); - producer.close(); + + RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0); + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + refresh("test_rewind_by_offset"); + SearchResponse response = client().prepareSearch("test_rewind_by_offset").setQuery(query).get(); + assertThat(response.getHits().getTotalHits().value(), is(1L)); + }); } } diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java new file mode 100644 index 0000000000000..087bc9786872f --- /dev/null +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java @@ -0,0 +1,111 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.kafka; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.After; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +/** + * Base test class for Kafka ingestion tests + */ +@ThreadLeakFilters(filters = TestContainerThreadLeakFilter.class) +public class KafkaIngestionBaseIT extends OpenSearchIntegTestCase { + static final String topicName = "test"; + static final String indexName = "testindex"; + static final String mapping = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"; + static final long defaultMessageTimestamp = 1739459500000L; + + protected KafkaContainer kafka; + protected Producer producer; + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(KafkaPlugin.class); + } + + @Before + private void setup() { + setupKafka(); + } + + @After + private void cleanup() { + stopKafka(); + } + + private void setupKafka() { + kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) + // disable topic auto creation + .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false"); + kafka.start(); + + // setup producer + String boostrapServers = kafka.getBootstrapServers(); + KafkaUtils.createTopic(topicName, 1, boostrapServers); + Properties props = new Properties(); + props.put("bootstrap.servers", kafka.getBootstrapServers()); + producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); + } + + private void stopKafka() { + if (producer != null) { + producer.close(); + } + + if (kafka != null) { + kafka.stop(); + } + } + + protected void produceData(String id, String name, String age) { + produceData(id, name, age, defaultMessageTimestamp); + } + + protected void produceData(String id, String name, String age, long timestamp) { + String payload = String.format( + Locale.ROOT, + "{\"_id\":\"%s\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"%s\", \"age\": %s}}", + id, + name, + age + ); + producer.send(new ProducerRecord<>(topicName, null, timestamp, "null", payload)); + } + + protected void waitForSearchableDocs(long docCount, List nodes) throws Exception { + assertBusy(() -> { + for (String node : nodes) { + final SearchResponse response = client(node).prepareSearch(indexName).setSize(0).setPreference("_only_local").get(); + final long hits = response.getHits().getTotalHits().value(); + if (hits < docCount) { + fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits); + } + } + }, 1, TimeUnit.MINUTES); + } +} diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java new file mode 100644 index 0000000000000..a9f818a9ca825 --- /dev/null +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.kafka; + +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.query.RangeQueryBuilder; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; +import java.util.Arrays; + +import static org.hamcrest.Matchers.is; + +/** + * Integration tests for segment replication with remote store using kafka as ingestion source. + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStoreKafkaIT extends KafkaIngestionBaseIT { + private static final String REPOSITORY_NAME = "test-remote-store-repo"; + private Path absolutePath; + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + if (absolutePath == null) { + absolutePath = randomRepoPath().toAbsolutePath(); + } + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath)) + .build(); + } + + public void testSegmentReplicationWithRemoteStore() throws Exception { + // Step 1: Create primary and replica nodes. Create index with 1 replica and kafka as ingestion source. + + internalCluster().startClusterManagerOnlyNode(); + final String nodeA = internalCluster().startDataOnlyNode(); + + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.param.topic", topicName) + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("index.replication.type", "SEGMENT") + .build(), + mapping + ); + + ensureYellowAndNoInitializingShards(indexName); + final String nodeB = internalCluster().startDataOnlyNode(); + ensureGreen(indexName); + assertTrue(nodeA.equals(primaryNodeName(indexName))); + assertTrue(nodeB.equals(replicaNodeName(indexName))); + verifyRemoteStoreEnabled(nodeA); + verifyRemoteStoreEnabled(nodeB); + + // Step 2: Produce update messages and validate segment replication + + produceData("1", "name1", "24"); + produceData("2", "name2", "20"); + refresh(indexName); + waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB)); + + RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21); + SearchResponse primaryResponse = client(nodeA).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get(); + assertThat(primaryResponse.getHits().getTotalHits().value(), is(1L)); + SearchResponse replicaResponse = client(nodeB).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get(); + assertThat(replicaResponse.getHits().getTotalHits().value(), is(1L)); + + // Step 3: Stop current primary node and validate replica promotion. + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA)); + ensureYellowAndNoInitializingShards(indexName); + assertTrue(nodeB.equals(primaryNodeName(indexName))); + + // Step 4: Verify new primary node is able to index documents + + produceData("3", "name3", "30"); + produceData("4", "name4", "31"); + refresh(indexName); + waitForSearchableDocs(4, Arrays.asList(nodeB)); + + SearchResponse newPrimaryResponse = client(nodeB).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get(); + assertThat(newPrimaryResponse.getHits().getTotalHits().value(), is(3L)); + + // Step 5: Add a new node and assign the replica shard. Verify node recovery works. + + final String nodeC = internalCluster().startDataOnlyNode(); + client().admin().cluster().prepareReroute().add(new AllocateReplicaAllocationCommand(indexName, 0, nodeC)).get(); + ensureGreen(indexName); + assertTrue(nodeC.equals(replicaNodeName(indexName))); + verifyRemoteStoreEnabled(nodeC); + + waitForSearchableDocs(4, Arrays.asList(nodeC)); + SearchResponse newReplicaResponse = client(nodeC).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get(); + assertThat(newReplicaResponse.getHits().getTotalHits().value(), is(3L)); + + // Step 6: Produce new updates and verify segment replication works when primary and replica index are not empty. + produceData("5", "name5", "40"); + produceData("6", "name6", "41"); + refresh(indexName); + waitForSearchableDocs(6, Arrays.asList(nodeB, nodeC)); + } + + private void verifyRemoteStoreEnabled(String node) { + GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get(); + String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled"); + assertEquals("Remote store should be enabled", "true", remoteStoreEnabled); + } +} diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerWatchdogThreadLeakFilter.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerThreadLeakFilter.java similarity index 76% rename from plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerWatchdogThreadLeakFilter.java rename to plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerThreadLeakFilter.java index 50b88c6233a46..91e2c83ebfa48 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerWatchdogThreadLeakFilter.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerThreadLeakFilter.java @@ -13,11 +13,12 @@ /** * The {@link org.testcontainers.images.TimeLimitedLoggedPullImageResultCallback} instance used by test containers, * for example {@link org.testcontainers.containers.KafkaContainer} creates a watcher daemon thread which is never - * stopped. This filter excludes that thread from the thread leak detection logic. + * stopped. This filter excludes that thread from the thread leak detection logic. It also excludes ryuk resource reaper + * thread which is not closed on time. */ -public final class TestContainerWatchdogThreadLeakFilter implements ThreadFilter { +public final class TestContainerThreadLeakFilter implements ThreadFilter { @Override public boolean reject(Thread t) { - return t.getName().startsWith("testcontainers-pull-watchdog-"); + return t.getName().startsWith("testcontainers-pull-watchdog-") || t.getName().startsWith("testcontainers-ryuk"); } } diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index b37281b9d1582..72b59ba88b4c2 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -8,145 +8,54 @@ package org.opensearch.index.engine; -import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.SegmentCommitInfo; -import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.ReferenceManager; -import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.LockObtainFailedException; -import org.apache.lucene.util.InfoStream; import org.opensearch.ExceptionsHelper; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IngestionSource; -import org.opensearch.common.Booleans; -import org.opensearch.common.Nullable; -import org.opensearch.common.SuppressForbidden; -import org.opensearch.common.concurrent.GatedCloseable; -import org.opensearch.common.lucene.LoggerInfoStream; import org.opensearch.common.lucene.Lucene; -import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.concurrent.AbstractRunnable; -import org.opensearch.common.util.concurrent.ReleasableLock; -import org.opensearch.common.util.io.IOUtils; -import org.opensearch.core.common.unit.ByteSizeValue; -import org.opensearch.core.index.shard.ShardId; -import org.opensearch.index.IndexSettings; import org.opensearch.index.IngestionConsumerFactory; import org.opensearch.index.IngestionShardConsumer; import org.opensearch.index.IngestionShardPointer; import org.opensearch.index.mapper.DocumentMapperForType; import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.ParseContext; -import org.opensearch.index.merge.MergeStats; -import org.opensearch.index.merge.OnGoingMerge; -import org.opensearch.index.seqno.SeqNoStats; -import org.opensearch.index.shard.OpenSearchMergePolicy; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.translog.NoOpTranslogManager; import org.opensearch.index.translog.Translog; -import org.opensearch.index.translog.TranslogCorruptedException; +import org.opensearch.index.translog.TranslogDeletionPolicy; import org.opensearch.index.translog.TranslogManager; import org.opensearch.index.translog.TranslogStats; +import org.opensearch.index.translog.listener.CompositeTranslogEventListener; import org.opensearch.indices.pollingingest.DefaultStreamPoller; import org.opensearch.indices.pollingingest.StreamPoller; -import org.opensearch.search.suggest.completion.CompletionStats; -import org.opensearch.threadpool.ThreadPool; -import java.io.Closeable; import java.io.IOException; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.BiConsumer; import java.util.function.BiFunction; -import java.util.function.UnaryOperator; import static org.opensearch.index.translog.Translog.EMPTY_TRANSLOG_SNAPSHOT; /** * IngestionEngine is an engine that ingests data from a stream source. */ -public class IngestionEngine extends Engine { - - private volatile SegmentInfos lastCommittedSegmentInfos; - private final CompletionStatsCache completionStatsCache; - private final IndexWriter indexWriter; - private final OpenSearchReaderManager internalReaderManager; - private final ExternalReaderManager externalReaderManager; - private final Lock flushLock = new ReentrantLock(); - private final ReentrantLock optimizeLock = new ReentrantLock(); - private final OpenSearchConcurrentMergeScheduler mergeScheduler; - private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false); - private final TranslogManager translogManager; - private final DocumentMapperForType documentMapperForType; - private final IngestionConsumerFactory ingestionConsumerFactory; - private StreamPoller streamPoller; +public class IngestionEngine extends InternalEngine { - /** - * UUID value that is updated every time the engine is force merged. - */ - @Nullable - private volatile String forceMergeUUID; + private StreamPoller streamPoller; + private final IngestionConsumerFactory ingestionConsumerFactory; + private final DocumentMapperForType documentMapperForType; public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory) { super(engineConfig); - store.incRef(); - boolean success = false; - try { - this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); - this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats")); - IndexMetadata indexMetadata = engineConfig.getIndexSettings().getIndexMetadata(); - assert indexMetadata != null; - mergeScheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings()); - indexWriter = createWriter(); - externalReaderManager = createReaderManager(new InternalEngine.RefreshWarmerListener(logger, isClosed, engineConfig)); - internalReaderManager = externalReaderManager.internalReaderManager; - translogManager = new NoOpTranslogManager( - shardId, - readLock, - this::ensureOpen, - new TranslogStats(0, 0, 0, 0, 0), - EMPTY_TRANSLOG_SNAPSHOT - ); - documentMapperForType = engineConfig.getDocumentMapperForTypeSupplier().get(); - this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory); - - success = true; - } catch (IOException | TranslogCorruptedException e) { - throw new EngineCreationFailureException(shardId, "failed to create engine", e); - } finally { - if (!success) { - if (streamPoller != null) { - try { - streamPoller.close(); - } catch (IOException e) { - logger.error("failed to close stream poller", e); - throw new RuntimeException(e); - } - } - if (!isClosed.get()) { - // failure, we need to dec the store reference - store.decRef(); - } - } - } + this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory); + this.documentMapperForType = engineConfig.getDocumentMapperForTypeSupplier().get(); + } /** @@ -169,11 +78,11 @@ public void start() { engineConfig.getShardId().getId() ); logger.info("created ingestion consumer for shard [{}]", engineConfig.getShardId()); - - Map commitData = commitDataAsMap(); + Map commitData = commitDataAsMap(indexWriter); StreamPoller.ResetState resetState = ingestionSource.getPointerInitReset().getType(); IngestionShardPointer startPointer = null; Set persistedPointers = new HashSet<>(); + if (commitData.containsKey(StreamPoller.BATCH_START)) { // try recovering from commit data String batchStartStr = commitData.get(StreamPoller.BATCH_START); @@ -190,23 +99,13 @@ public void start() { String resetValue = ingestionSource.getPointerInitReset().getValue(); streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState, resetValue); - streamPoller.start(); - } - private IndexWriter createWriter() throws IOException { - try { - final IndexWriterConfig iwc = getIndexWriterConfig(); - return createWriter(store.directory(), iwc); - } catch (LockObtainFailedException ex) { - logger.warn("could not lock IndexWriter", ex); - throw ex; + // Poller is only started on the primary shard. Replica shards will rely on segment replication. + if (!engineConfig.isReadOnlyReplica()) { + streamPoller.start(); } } - public DocumentMapperForType getDocumentMapperForType() { - return documentMapperForType; - } - protected Set fetchPersistedOffsets(DirectoryReader directoryReader, IngestionShardPointer batchStart) throws IOException { final IndexSearcher searcher = new IndexSearcher(directoryReader); @@ -228,195 +127,6 @@ protected Set fetchPersistedOffsets(DirectoryReader direc return result; } - /** - * a copy of ExternalReaderManager from InternalEngine - */ - @SuppressForbidden(reason = "reference counting is required here") - static final class ExternalReaderManager extends ReferenceManager { - private final BiConsumer refreshListener; - private final OpenSearchReaderManager internalReaderManager; - private boolean isWarmedUp; // guarded by refreshLock - - ExternalReaderManager( - OpenSearchReaderManager internalReaderManager, - BiConsumer refreshListener - ) throws IOException { - this.refreshListener = refreshListener; - this.internalReaderManager = internalReaderManager; - this.current = internalReaderManager.acquire(); // steal the reference without warming up - } - - @Override - protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader referenceToRefresh) throws IOException { - // we simply run a blocking refresh on the internal reference manager and then steal it's reader - // it's a save operation since we acquire the reader which incs it's reference but then down the road - // steal it by calling incRef on the "stolen" reader - internalReaderManager.maybeRefreshBlocking(); - final OpenSearchDirectoryReader newReader = internalReaderManager.acquire(); - if (isWarmedUp == false || newReader != referenceToRefresh) { - boolean success = false; - try { - refreshListener.accept(newReader, isWarmedUp ? referenceToRefresh : null); - isWarmedUp = true; - success = true; - } finally { - if (success == false) { - internalReaderManager.release(newReader); - } - } - } - // nothing has changed - both ref managers share the same instance so we can use reference equality - if (referenceToRefresh == newReader) { - internalReaderManager.release(newReader); - return null; - } else { - return newReader; // steal the reference - } - } - - @Override - protected boolean tryIncRef(OpenSearchDirectoryReader reference) { - return reference.tryIncRef(); - } - - @Override - protected int getRefCount(OpenSearchDirectoryReader reference) { - return reference.getRefCount(); - } - - @Override - protected void decRef(OpenSearchDirectoryReader reference) throws IOException { - reference.decRef(); - } - } - - private ExternalReaderManager createReaderManager(InternalEngine.RefreshWarmerListener externalRefreshListener) throws EngineException { - boolean success = false; - OpenSearchReaderManager internalReaderManager = null; - try { - try { - final OpenSearchDirectoryReader directoryReader = OpenSearchDirectoryReader.wrap( - DirectoryReader.open(indexWriter), - shardId - ); - internalReaderManager = new OpenSearchReaderManager(directoryReader); - lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); - ExternalReaderManager externalReaderManager = new ExternalReaderManager(internalReaderManager, externalRefreshListener); - success = true; - return externalReaderManager; - } catch (IOException e) { - maybeFailEngine("start", e); - try { - indexWriter.rollback(); - } catch (IOException inner) { // iw is closed below - e.addSuppressed(inner); - } - throw new EngineCreationFailureException(shardId, "failed to open reader on writer", e); - } - } finally { - if (success == false) { // release everything we created on a failure - IOUtils.closeWhileHandlingException(internalReaderManager, indexWriter); - } - } - } - - // pkg-private for testing - IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { - return new IndexWriter(directory, iwc); - } - - private IndexWriterConfig getIndexWriterConfig() { - final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer()); - iwc.setCommitOnClose(false); // we by default don't commit on close - iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); - // with tests.verbose, lucene sets this up: plumb to align with filesystem stream - boolean verbose = false; - try { - verbose = Boolean.parseBoolean(System.getProperty("tests.verbose")); - } catch (Exception ignore) {} - iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger)); - iwc.setMergeScheduler(mergeScheduler); - // set merge scheduler - MergePolicy mergePolicy = config().getMergePolicy(); - boolean shuffleForcedMerge = Booleans.parseBoolean(System.getProperty("opensearch.shuffle_forced_merge", Boolean.TRUE.toString())); - if (shuffleForcedMerge) { - // We wrap the merge policy for all indices even though it is mostly useful for time-based indices - // but there should be no overhead for other type of indices so it's simpler than adding a setting - // to enable it. - mergePolicy = new ShuffleForcedMergePolicy(mergePolicy); - } - - if (config().getIndexSettings().isMergeOnFlushEnabled()) { - final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis(); - if (maxFullFlushMergeWaitMillis > 0) { - iwc.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis); - final Optional> mergeOnFlushPolicy = config().getIndexSettings().getMergeOnFlushPolicy(); - if (mergeOnFlushPolicy.isPresent()) { - mergePolicy = mergeOnFlushPolicy.get().apply(mergePolicy); - } - } - } else { - // Disable merge on refresh - iwc.setMaxFullFlushMergeWaitMillis(0); - } - - iwc.setCheckPendingFlushUpdate(config().getIndexSettings().isCheckPendingFlushEnabled()); - iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy)); - iwc.setSimilarity(engineConfig.getSimilarity()); - iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac()); - iwc.setCodec(engineConfig.getCodec()); - iwc.setUseCompoundFile(engineConfig.useCompoundFile()); - if (config().getIndexSort() != null) { - iwc.setIndexSort(config().getIndexSort()); - } - if (config().getLeafSorter() != null) { - iwc.setLeafSorter(config().getLeafSorter()); // The default segment search order - } - - return new IndexWriterConfig(new StandardAnalyzer()); - } - - @Override - public TranslogManager translogManager() { - // ingestion engine does not have translog - return translogManager; - } - - @Override - protected SegmentInfos getLastCommittedSegmentInfos() { - return lastCommittedSegmentInfos; - } - - @Override - protected SegmentInfos getLatestSegmentInfos() { - throw new UnsupportedOperationException(); - } - - @Override - public String getHistoryUUID() { - return loadHistoryUUID(lastCommittedSegmentInfos.userData); - } - - @Override - public long getWritingBytes() { - return 0; - } - - @Override - public CompletionStats completionStats(String... fieldNamePatterns) { - return completionStatsCache.get(fieldNamePatterns); - } - - @Override - public long getIndexThrottleTimeInMillis() { - return 0; - } - - @Override - public boolean isThrottled() { - return false; - } - @Override public IndexResult index(Index index) throws IOException { assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field(); @@ -457,16 +167,6 @@ public GetResult get(Get get, BiFunction search return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL); } - @Override - protected ReferenceManager getReferenceManager(SearcherScope scope) { - return externalReaderManager; - } - - @Override - public Closeable acquireHistoryRetentionLock() { - throw new UnsupportedOperationException("Not implemented"); - } - @Override public Translog.Snapshot newChangesSnapshot( String source, @@ -475,199 +175,36 @@ public Translog.Snapshot newChangesSnapshot( boolean requiredFullRange, boolean accurateCount ) throws IOException { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNumber) throws IOException { - return 0; - } - - @Override - public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) { - return false; - } - - @Override - public long getMinRetainedSeqNo() { - return 0; - } - - @Override - public long getPersistedLocalCheckpoint() { - return 0; - } - - @Override - public long getProcessedLocalCheckpoint() { - return 0; - } - - @Override - public SeqNoStats getSeqNoStats(long globalCheckpoint) { - return null; - } - - @Override - public long getLastSyncedGlobalCheckpoint() { - return 0; - } - - @Override - public long getIndexBufferRAMBytesUsed() { - return 0; - } - - @Override - public List segments(boolean verbose) { - try (ReleasableLock lock = readLock.acquire()) { - Segment[] segmentsArr = getSegmentInfo(lastCommittedSegmentInfos, verbose); - - // fill in the merges flag - Set onGoingMerges = mergeScheduler.onGoingMerges(); - for (OnGoingMerge onGoingMerge : onGoingMerges) { - for (SegmentCommitInfo segmentInfoPerCommit : onGoingMerge.getMergedSegments()) { - for (Segment segment : segmentsArr) { - if (segment.getName().equals(segmentInfoPerCommit.info.name)) { - segment.mergeId = onGoingMerge.getId(); - break; - } - } - } - } - return Arrays.asList(segmentsArr); - } - } - - @Override - public void refresh(String source) throws EngineException { - refresh(source, SearcherScope.EXTERNAL, true); - } - - final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException { - boolean refreshed; - try { - // refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way. - if (store.tryIncRef()) { - // increment the ref just to ensure nobody closes the store during a refresh - try { - // even though we maintain 2 managers we really do the heavy-lifting only once. - // the second refresh will only do the extra work we have to do for warming caches etc. - ReferenceManager referenceManager = getReferenceManager(scope); - // it is intentional that we never refresh both internal / external together - if (block) { - referenceManager.maybeRefreshBlocking(); - refreshed = true; - } else { - refreshed = referenceManager.maybeRefresh(); - } - } finally { - store.decRef(); - } - } else { - refreshed = false; - } - } catch (AlreadyClosedException e) { - failOnTragicEvent(e); - throw e; - } catch (Exception e) { - try { - failEngine("refresh failed source[" + source + "]", e); - } catch (Exception inner) { - e.addSuppressed(inner); - } - throw new RefreshFailedEngineException(shardId, e); - } - // We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes - // for a long time: - maybePruneDeletes(); - // TODO: use OS merge scheduler - mergeScheduler.refreshConfig(); - return refreshed; - } - - @Override - public boolean maybeRefresh(String source) throws EngineException { - return refresh(source, SearcherScope.EXTERNAL, false); - } - - @Override - public void writeIndexingBuffer() throws EngineException { - refresh("write indexing buffer", SearcherScope.INTERNAL, false); - } - - @Override - public boolean shouldPeriodicallyFlush() { - return false; - } - - @Override - public void flush(boolean force, boolean waitIfOngoing) throws EngineException { - ensureOpen(); - if (force && waitIfOngoing == false) { - assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing; - throw new IllegalArgumentException( - "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing - ); - } - try (ReleasableLock lock = readLock.acquire()) { - ensureOpen(); - if (flushLock.tryLock() == false) { - // if we can't get the lock right away we block if needed otherwise barf - if (waitIfOngoing == false) { - return; - } - logger.trace("waiting for in-flight flush to finish"); - flushLock.lock(); - logger.trace("acquired flush lock after blocking"); - } else { - logger.trace("acquired flush lock immediately"); - } - try { - // Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, - // - // do we need to consider #3 and #4 as in InternalEngine? - // (3) the newly created commit points to a different translog generation (can free translog), - // or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries. - boolean hasUncommittedChanges = indexWriter.hasUncommittedChanges(); - if (hasUncommittedChanges || force) { - logger.trace("starting commit for flush;"); - - // TODO: do we need to close the latest commit as done in InternalEngine? - commitIndexWriter(indexWriter); - - logger.trace("finished commit for flush"); - - // a temporary debugging to investigate test failure - issue#32827. Remove when the issue is resolved - logger.debug("new commit on flush, hasUncommittedChanges:{}, force:{}", hasUncommittedChanges, force); - - // we need to refresh in order to clear older version values - refresh("version_table_flush", SearcherScope.INTERNAL, true); - } - } catch (FlushFailedEngineException ex) { - maybeFailEngine("flush", ex); - throw ex; - } catch (IOException e) { - throw new FlushFailedEngineException(shardId, e); - } finally { - flushLock.unlock(); - } - } + return EMPTY_TRANSLOG_SNAPSHOT; } /** - * Commits the specified index writer. - * - * @param writer the index writer to commit + * This method is a copy of commitIndexWriter method from {@link InternalEngine} with some additions for ingestion + * source. */ - protected void commitIndexWriter(final IndexWriter writer) throws IOException { + @Override + protected void commitIndexWriter(final IndexWriter writer, final String translogUUID) throws IOException { try { + final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); writer.setLiveCommitData(() -> { /* - * The user data captured the min and max range of the stream poller + * The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes + * segments, including the local checkpoint amongst other values. The maximum sequence number is different, we never want + * the maximum sequence number to be less than the last sequence number to go into a Lucene commit, otherwise we run the + * risk of re-using a sequence number for two different documents when restoring from this commit point and subsequently + * writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the + * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time + * of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene). */ - final Map commitData = new HashMap<>(2); - + final Map commitData = new HashMap<>(7); + commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); + commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); + commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo())); + commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); + commitData.put(HISTORY_UUID_KEY, historyUUID); + commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); + + // ingestion engine needs to record batch start pointer commitData.put(StreamPoller.BATCH_START, streamPoller.getBatchStartPointer().asString()); final String currentForceMergeUUID = forceMergeUUID; if (currentForceMergeUUID != null) { @@ -676,6 +213,7 @@ protected void commitIndexWriter(final IndexWriter writer) throws IOException { logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); + shouldPeriodicallyFlushAfterBigMerge.set(false); writer.commit(); } catch (final Exception ex) { try { @@ -703,268 +241,6 @@ protected void commitIndexWriter(final IndexWriter writer) throws IOException { } } - @Override - public MergeStats getMergeStats() { - return mergeScheduler.stats(); - } - - @Override - public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { - mergeScheduler.refreshConfig(); - // TODO: do we need more? - } - - protected Map commitDataAsMap() { - return commitDataAsMap(indexWriter); - } - - /** - * Gets the commit data from {@link IndexWriter} as a map. - */ - protected static Map commitDataAsMap(final IndexWriter indexWriter) { - final Map commitData = new HashMap<>(8); - for (Map.Entry entry : indexWriter.getLiveCommitData()) { - commitData.put(entry.getKey(), entry.getValue()); - } - return commitData; - } - - @Override - public void forceMerge( - boolean flush, - int maxNumSegments, - boolean onlyExpungeDeletes, - boolean upgrade, - boolean upgradeOnlyAncientSegments, - String forceMergeUUID - ) throws EngineException, IOException { - /* - * We do NOT acquire the readlock here since we are waiting on the merges to finish - * that's fine since the IW.rollback should stop all the threads and trigger an IOException - * causing us to fail the forceMerge - * - * The way we implement upgrades is a bit hackish in the sense that we set an instance - * variable and that this setting will thus apply to the next forced merge that will be run. - * This is ok because (1) this is the only place we call forceMerge, (2) we have a single - * thread for optimize, and the 'optimizeLock' guarding this code, and (3) ConcurrentMergeScheduler - * syncs calls to findForcedMerges. - */ - assert indexWriter.getConfig().getMergePolicy() instanceof OpenSearchMergePolicy : "MergePolicy is " - + indexWriter.getConfig().getMergePolicy().getClass().getName(); - OpenSearchMergePolicy mp = (OpenSearchMergePolicy) indexWriter.getConfig().getMergePolicy(); - optimizeLock.lock(); - try { - ensureOpen(); - if (upgrade) { - logger.info("starting segment upgrade upgradeOnlyAncientSegments={}", upgradeOnlyAncientSegments); - mp.setUpgradeInProgress(true, upgradeOnlyAncientSegments); - } - store.incRef(); // increment the ref just to ensure nobody closes the store while we optimize - try { - if (onlyExpungeDeletes) { - assert upgrade == false; - indexWriter.forceMergeDeletes(true /* blocks and waits for merges*/); - } else if (maxNumSegments <= 0) { - assert upgrade == false; - indexWriter.maybeMerge(); - } else { - indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/); - this.forceMergeUUID = forceMergeUUID; - } - if (flush) { - flush(false, true); - } - if (upgrade) { - logger.info("finished segment upgrade"); - } - } finally { - store.decRef(); - } - } catch (AlreadyClosedException ex) { - /* in this case we first check if the engine is still open. If so this exception is just fine - * and expected. We don't hold any locks while we block on forceMerge otherwise it would block - * closing the engine as well. If we are not closed we pass it on to failOnTragicEvent which ensures - * we are handling a tragic even exception here */ - ensureOpen(ex); - failOnTragicEvent(ex); - throw ex; - } catch (Exception e) { - try { - maybeFailEngine(FORCE_MERGE, e); - } catch (Exception inner) { - e.addSuppressed(inner); - } - throw e; - } finally { - try { - // reset it just to make sure we reset it in a case of an error - mp.setUpgradeInProgress(false, false); - } finally { - optimizeLock.unlock(); - } - } - } - - @Override - public GatedCloseable acquireLastIndexCommit(boolean flushFirst) throws EngineException { - store.incRef(); - try { - var reader = getReferenceManager(SearcherScope.INTERNAL).acquire(); - return new GatedCloseable<>(reader.getIndexCommit(), () -> { - store.decRef(); - getReferenceManager(SearcherScope.INTERNAL).release(reader); - }); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public GatedCloseable acquireSafeIndexCommit() throws EngineException { - // TODO: do we need this? likely not - return acquireLastIndexCommit(false); - } - - @Override - public SafeCommitInfo getSafeCommitInfo() { - // TODO: do we need this? - return SafeCommitInfo.EMPTY; - } - - @Override - protected void closeNoLock(String reason, CountDownLatch closedLatch) { - if (isClosed.compareAndSet(false, true)) { - assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() - : "Either the write lock must be held or the engine must be currently be failing itself"; - try { - try { - IOUtils.close(externalReaderManager, internalReaderManager); - } catch (Exception e) { - logger.warn("Failed to close ReaderManager", e); - } - - // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed - logger.trace("rollback indexWriter"); - try { - indexWriter.rollback(); - } catch (AlreadyClosedException ex) { - failOnTragicEvent(ex); - throw ex; - } - logger.trace("rollback indexWriter done"); - } catch (Exception e) { - logger.warn("failed to rollback writer on close", e); - } finally { - try { - store.decRef(); - logger.debug("engine closed [{}]", reason); - } finally { - closedLatch.countDown(); - } - } - } - } - - private boolean failOnTragicEvent(AlreadyClosedException ex) { - final boolean engineFailed; - // if we are already closed due to some tragic exception - // we need to fail the engine. it might have already been failed before - // but we are double-checking it's failed and closed - if (indexWriter.isOpen() == false && indexWriter.getTragicException() != null) { - final Exception tragicException; - if (indexWriter.getTragicException() instanceof Exception) { - tragicException = (Exception) indexWriter.getTragicException(); - } else { - tragicException = new RuntimeException(indexWriter.getTragicException()); - } - failEngine("already closed by tragic event on the index writer", tragicException); - engineFailed = true; - } else if (failedEngine.get() == null && isClosed.get() == false) { // we are closed but the engine is not failed yet? - // this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by - // a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error - throw new AssertionError("Unexpected AlreadyClosedException", ex); - } else { - engineFailed = false; - } - return engineFailed; - } - - private final class EngineMergeScheduler extends OpenSearchConcurrentMergeScheduler { - private final AtomicInteger numMergesInFlight = new AtomicInteger(0); - private final AtomicBoolean isThrottling = new AtomicBoolean(); - - EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings) { - super(shardId, indexSettings); - } - - @Override - public synchronized void beforeMerge(OnGoingMerge merge) { - int maxNumMerges = mergeScheduler.getMaxMergeCount(); - if (numMergesInFlight.incrementAndGet() > maxNumMerges) { - if (isThrottling.getAndSet(true) == false) { - logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); - activateThrottling(); - } - } - } - - @Override - public synchronized void afterMerge(OnGoingMerge merge) { - int maxNumMerges = mergeScheduler.getMaxMergeCount(); - if (numMergesInFlight.decrementAndGet() < maxNumMerges) { - if (isThrottling.getAndSet(false)) { - logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); - deactivateThrottling(); - } - } - if (indexWriter.hasPendingMerges() == false - && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) { - // NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the writer - // we deadlock on engine#close for instance. - engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - if (isClosed.get() == false) { - logger.warn("failed to flush after merge has finished"); - } - } - - @Override - protected void doRun() { - // if we have no pending merges and we are supposed to flush once merges have finished to - // free up transient disk usage of the (presumably biggish) segments that were just merged - flush(); - } - }); - } else if (merge.getTotalBytesSize() >= engineConfig.getIndexSettings().getFlushAfterMergeThresholdSize().getBytes()) { - // we hit a significant merge which would allow us to free up memory if we'd commit it hence on the next change - // we should execute a flush on the next operation if that's a flush after inactive or indexing a document. - // we could fork a thread and do it right away but we try to minimize forking and piggyback on outside events. - shouldPeriodicallyFlushAfterBigMerge.set(true); - } - } - - @Override - protected void handleMergeException(final Throwable exc) { - engineConfig.getThreadPool().generic().execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - logger.debug("merge failure action rejected", e); - } - - @Override - protected void doRun() throws Exception { - /* - * We do this on another thread rather than the merge thread that we are initially called on so that we have complete - * confidence that the call stack does not contain catch statements that would cause the error that might be thrown - * here from being caught and never reaching the uncaught exception handler. - */ - failEngine(MERGE_FAILED, new MergePolicy.MergeException(exc)); - } - }); - } - } - @Override public void activateThrottling() { // TODO: add this when we have a thread pool for indexing in parallel @@ -975,38 +251,41 @@ public void deactivateThrottling() { // TODO: is this needed? } - @Override - public int fillSeqNoGaps(long primaryTerm) throws IOException { - // TODO: is this needed? - return 0; - } - @Override public void maybePruneDeletes() { // no need to prune deletes in ingestion engine } @Override - public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) { - // TODO: is this needed? + public void close() throws IOException { + if (streamPoller != null) { + streamPoller.close(); + } + super.close(); } - @Override - public long getMaxSeqNoOfUpdatesOrDeletes() { - // TODO: is this needed? - return 0; + public DocumentMapperForType getDocumentMapperForType() { + return documentMapperForType; } @Override - public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) { - // TODO: is this needed? + protected TranslogManager createTranslogManager( + String translogUUID, + TranslogDeletionPolicy translogDeletionPolicy, + CompositeTranslogEventListener translogEventListener + ) throws IOException { + return new NoOpTranslogManager( + shardId, + readLock, + this::ensureOpen, + new TranslogStats(), + EMPTY_TRANSLOG_SNAPSHOT, + translogUUID, + true + ); } - @Override - public void close() throws IOException { - if (streamPoller != null) { - streamPoller.close(); - } - super.close(); + protected Map commitDataAsMap() { + return commitDataAsMap(indexWriter); } } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index ff790fa1513f1..064e757c6ebb7 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -144,16 +144,28 @@ */ public class InternalEngine extends Engine { + /** + * UUID value that is updated every time the engine is force merged. + */ + @Nullable + protected volatile String forceMergeUUID; + /** * When we last pruned expired tombstones from versionMap.deletes: */ private volatile long lastDeleteVersionPruneTimeMSec; - private final InternalTranslogManager translogManager; - private final OpenSearchConcurrentMergeScheduler mergeScheduler; + protected final TranslogManager translogManager; + protected final IndexWriter indexWriter; + protected final LocalCheckpointTracker localCheckpointTracker; + protected final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); + protected final SoftDeletesPolicy softDeletesPolicy; + protected final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false); - private final IndexWriter indexWriter; + @Nullable + protected final String historyUUID; + private final OpenSearchConcurrentMergeScheduler mergeScheduler; private final ExternalReaderManager externalReaderManager; private final OpenSearchReaderManager internalReaderManager; @@ -168,15 +180,12 @@ public class InternalEngine extends Engine { private final IndexThrottle throttle; - private final LocalCheckpointTracker localCheckpointTracker; - private final CombinedDeletionPolicy combinedDeletionPolicy; // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling // incoming indexing ops to a single thread: private final AtomicInteger throttleRequestCount = new AtomicInteger(); - private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1); // max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine. // An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index. @@ -189,14 +198,12 @@ public class InternalEngine extends Engine { private final CounterMetric numDocAppends = new CounterMetric(); private final CounterMetric numDocUpdates = new CounterMetric(); private final NumericDocValuesField softDeletesField = Lucene.newSoftDeletesField(); - private final SoftDeletesPolicy softDeletesPolicy; private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener; private final CompletionStatsCache completionStatsCache; private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false); private final KeyedLock noOpKeyedLock = new KeyedLock<>(); - private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false); /** * If multiple writes passed {@link InternalEngine#tryAcquireInFlightDocs(Operation, int)} but they haven't adjusted @@ -210,15 +217,6 @@ public class InternalEngine extends Engine { private final int maxDocs; - @Nullable - private final String historyUUID; - - /** - * UUID value that is updated every time the engine is force merged. - */ - @Nullable - private volatile String forceMergeUUID; - public InternalEngine(EngineConfig engineConfig) { this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER); } @@ -249,7 +247,7 @@ public TranslogManager translogManager() { ExternalReaderManager externalReaderManager = null; OpenSearchReaderManager internalReaderManager = null; EngineMergeScheduler scheduler = null; - InternalTranslogManager translogManagerRef = null; + TranslogManager translogManagerRef = null; boolean success = false; try { this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); @@ -280,20 +278,11 @@ public void onFailure(String reason, Exception ex) { } } }; - translogManagerRef = new InternalTranslogManager( - engineConfig.getTranslogConfig(), - engineConfig.getPrimaryTermSupplier(), - engineConfig.getGlobalCheckpointSupplier(), - translogDeletionPolicy, - shardId, - readLock, - this::getLocalCheckpointTracker, - translogUUID, - new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId), - this::ensureOpen, - engineConfig.getTranslogFactory(), - engineConfig.getStartedPrimarySupplier() + CompositeTranslogEventListener compositeTranslogEventListener = new CompositeTranslogEventListener( + Arrays.asList(internalTranslogEventListener, translogEventListener), + shardId ); + translogManagerRef = createTranslogManager(translogUUID, translogDeletionPolicy, compositeTranslogEventListener); this.translogManager = translogManagerRef; this.softDeletesPolicy = newSoftDeletesPolicy(); this.combinedDeletionPolicy = new CombinedDeletionPolicy( @@ -362,6 +351,27 @@ public void onFailure(String reason, Exception ex) { logger.trace("created new InternalEngine"); } + protected TranslogManager createTranslogManager( + String translogUUID, + TranslogDeletionPolicy translogDeletionPolicy, + CompositeTranslogEventListener translogEventListener + ) throws IOException { + return new InternalTranslogManager( + engineConfig.getTranslogConfig(), + engineConfig.getPrimaryTermSupplier(), + engineConfig.getGlobalCheckpointSupplier(), + translogDeletionPolicy, + shardId, + readLock, + this::getLocalCheckpointTracker, + translogUUID, + translogEventListener, + this::ensureOpen, + engineConfig.getTranslogFactory(), + engineConfig.getStartedPrimarySupplier() + ); + } + private LocalCheckpointTracker createLocalCheckpointTracker( BiFunction localCheckpointTrackerSupplier ) throws IOException { @@ -2773,7 +2783,7 @@ public Closeable acquireHistoryRetentionLock() { /** * Gets the commit data from {@link IndexWriter} as a map. */ - private static Map commitDataAsMap(final IndexWriter indexWriter) { + protected static Map commitDataAsMap(final IndexWriter indexWriter) { final Map commitData = new HashMap<>(8); for (Map.Entry entry : indexWriter.getLiveCommitData()) { commitData.put(entry.getKey(), entry.getValue()); diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index e2210217672ef..d2c81c4274ebd 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -21,7 +21,6 @@ import org.opensearch.index.translog.listener.TranslogEventListener; import org.opensearch.index.translog.transfer.TranslogUploadFailedException; -import java.io.Closeable; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; @@ -31,12 +30,12 @@ import java.util.stream.Stream; /** - * The {@link TranslogManager} implementation capable of orchestrating all read/write {@link Translog} operations while - * interfacing with the {@link org.opensearch.index.engine.InternalEngine} + * The {@link TranslogManager} implementation capable of orchestrating all read/write {@link Translog} operations for + * the {@link org.opensearch.index.engine.InternalEngine} * * @opensearch.internal */ -public class InternalTranslogManager implements TranslogManager, Closeable { +public class InternalTranslogManager implements TranslogManager { private final ReleasableLock readLock; private final LifecycleAware engineLifeCycleAware; diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index b4aa7865570a6..7ae80f88b0595 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -11,6 +11,7 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.seqno.SequenceNumbers; import java.io.IOException; import java.util.stream.Stream; @@ -27,6 +28,9 @@ public class NoOpTranslogManager implements TranslogManager { private final Runnable ensureOpen; private final ShardId shardId; private final TranslogStats translogStats; + private final TranslogDeletionPolicy translogDeletionPolicy; + private final String translogUUID; + private final boolean skipRecoveryStep; public NoOpTranslogManager( ShardId shardId, @@ -34,12 +38,27 @@ public NoOpTranslogManager( Runnable ensureOpen, TranslogStats translogStats, Translog.Snapshot emptyTranslogSnapshot + ) throws IOException { + this(shardId, readLock, ensureOpen, translogStats, emptyTranslogSnapshot, "", false); + } + + public NoOpTranslogManager( + ShardId shardId, + ReleasableLock readLock, + Runnable ensureOpen, + TranslogStats translogStats, + Translog.Snapshot emptyTranslogSnapshot, + String translogUUID, + boolean skipRecoveryStep ) throws IOException { this.emptyTranslogSnapshot = emptyTranslogSnapshot; this.readLock = readLock; this.shardId = shardId; this.ensureOpen = ensureOpen; this.translogStats = translogStats; + this.translogDeletionPolicy = new DefaultTranslogDeletionPolicy(0, 0, 0); + this.translogUUID = translogUUID; + this.skipRecoveryStep = skipRecoveryStep; } @Override @@ -48,6 +67,11 @@ public void rollTranslogGeneration() throws TranslogException {} @Override public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo) throws IOException { + // skip translog recovery attempt when skipRecoveryStep is true + if (skipRecoveryStep) { + return 0; + } + try (ReleasableLock ignored = readLock.acquire()) { ensureOpen.run(); try (Translog.Snapshot snapshot = emptyTranslogSnapshot) { @@ -132,6 +156,42 @@ public Releasable drainSync() { @Override public Translog.TranslogGeneration getTranslogGeneration() { + return new Translog.TranslogGeneration(translogUUID, 0); + } + + @Override + public long getLastSyncedGlobalCheckpoint() { + return 0; + } + + @Override + public long getMaxSeqNo() { + return SequenceNumbers.NO_OPS_PERFORMED; + } + + @Override + public void trimUnreferencedReaders() throws IOException {} + + @Override + public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) { + return false; + } + + @Override + public Exception getTragicExceptionIfClosed() { return null; } + + @Override + public TranslogDeletionPolicy getDeletionPolicy() { + return translogDeletionPolicy; + } + + @Override + public String getTranslogUUID() { + return translogUUID; + } + + @Override + public void close() throws IOException {} } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index ffda06d8d8292..b1e88624c9906 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -899,6 +899,8 @@ public TranslogDeletionPolicy getDeletionPolicy() { return deletionPolicy; } + public static final Translog.Location EMPTY_TRANSLOG_LOCATION = new Translog.Location(0, 0, 0); + /** * Location in the translot * diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index e1a0b7d1c1293..ec312636e7ee1 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -11,6 +11,7 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.lease.Releasable; +import java.io.Closeable; import java.io.IOException; import java.util.stream.Stream; @@ -20,7 +21,7 @@ * @opensearch.api */ @PublicApi(since = "1.0.0") -public interface TranslogManager { +public interface TranslogManager extends Closeable { /** * Rolls the translog generation and cleans unneeded. @@ -142,4 +143,46 @@ public interface TranslogManager { Releasable drainSync(); Translog.TranslogGeneration getTranslogGeneration(); + + /** + * Retrieves last synced global checkpoint. + */ + long getLastSyncedGlobalCheckpoint(); + + /** + * Retrieves the max seq no. + */ + long getMaxSeqNo(); + + /** + * Trims unreferenced translog generations by asking {@link TranslogDeletionPolicy} for the minimum required + * generation. + */ + void trimUnreferencedReaders() throws IOException; + + /** + * + * @param localCheckpointOfLastCommit local checkpoint reference of last commit to translog + * @param flushThreshold threshold to flush to translog + * @return if the translog should be flushed + */ + boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold); + + /** + * Retrieves the underlying translog tragic exception + * @return the tragic exception + */ + Exception getTragicExceptionIfClosed(); + + /** + * Retrieves the translog deletion policy + * @return TranslogDeletionPolicy + */ + TranslogDeletionPolicy getDeletionPolicy(); + + /** + * Retrieves the translog unique identifier + * @return the uuid of the translog + */ + String getTranslogUUID(); } diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java b/server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java index e124adb90365b..16688feddf53c 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java @@ -13,6 +13,7 @@ import org.opensearch.index.engine.EngineConfig; import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.engine.IngestionEngine; +import org.opensearch.index.engine.NRTReplicationEngine; import java.util.Objects; @@ -29,6 +30,10 @@ public IngestionEngineFactory(IngestionConsumerFactory ingestionConsumerFactory) @Override public Engine newReadWriteEngine(EngineConfig config) { + if (config.isReadOnlyReplica()) { + return new NRTReplicationEngine(config); + } + IngestionEngine ingestionEngine = new IngestionEngine(config, ingestionConsumerFactory); ingestionEngine.start(); return ingestionEngine; diff --git a/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java b/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java index 19718384bd926..2d00bbcba0c8c 100644 --- a/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java @@ -36,8 +36,9 @@ import java.util.concurrent.atomic.AtomicLong; import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; public class IngestionEngineTests extends EngineTestCase { @@ -46,6 +47,7 @@ public class IngestionEngineTests extends EngineTestCase { private IngestionEngine ingestionEngine; // the messages of the stream to ingest from private List messages; + private EngineConfig engineConfig; @Override @Before @@ -86,6 +88,7 @@ public void tearDown() throws Exception { ingestionEngineStore.close(); } super.tearDown(); + engineConfig = null; } public void testCreateEngine() throws IOException { @@ -95,7 +98,7 @@ public void testCreateEngine() throws IOException { ingestionEngine.flush(false, true); Map commitData = ingestionEngine.commitDataAsMap(); // verify the commit data - Assert.assertEquals(1, commitData.size()); + Assert.assertEquals(7, commitData.size()); Assert.assertEquals("2", commitData.get(StreamPoller.BATCH_START)); // verify the stored offsets @@ -120,21 +123,19 @@ public void testRecovery() throws IOException { publishData("{\"_id\":\"3\",\"_source\":{\"name\":\"john\", \"age\": 30}}"); publishData("{\"_id\":\"4\",\"_source\":{\"name\":\"jane\", \"age\": 25}}"); ingestionEngine.close(); - ingestionEngine = buildIngestionEngine(new AtomicLong(2), ingestionEngineStore, indexSettings); + ingestionEngine = buildIngestionEngine(new AtomicLong(0), ingestionEngineStore, indexSettings); waitForResults(ingestionEngine, 4); } public void testCreationFailure() throws IOException { - // Simulate an error scenario - Store mockStore = mock(Store.class); - doThrow(new IOException("Simulated IOException")).when(mockStore).readLastCommittedSegmentsInfo(); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - FakeIngestionSource.FakeIngestionConsumerFactory consumerFactory = new FakeIngestionSource.FakeIngestionConsumerFactory(messages); + Store mockStore = spy(store); + doThrow(new IOException("Simulated IOException")).when(mockStore).trimUnsafeCommits(any()); + EngineConfig engineConfig = config( indexSettings, - store, + mockStore, createTempDir(), NoMergePolicy.INSTANCE, null, @@ -156,7 +157,9 @@ public void testCreationFailure() throws IOException { private IngestionEngine buildIngestionEngine(AtomicLong globalCheckpoint, Store store, IndexSettings settings) throws IOException { FakeIngestionSource.FakeIngestionConsumerFactory consumerFactory = new FakeIngestionSource.FakeIngestionConsumerFactory(messages); - EngineConfig engineConfig = config(settings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get); + if (engineConfig == null) { + engineConfig = config(settings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get); + } // overwrite the config with ingestion engine settings String mapping = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"; MapperService mapperService = createMapperService(mapping); diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 4f04c0b08fd0a..f9a09c088095b 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -98,6 +98,7 @@ import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.EngineTestCase; +import org.opensearch.index.engine.IngestionEngine; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; @@ -1378,7 +1379,9 @@ private void assertOpenTranslogReferences() throws Exception { for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { try { - if (IndexShardTestCase.getEngine(indexShard) instanceof InternalEngine) { + if (IndexShardTestCase.getEngine(indexShard) instanceof IngestionEngine) { + // no-op, as IngestionEngine does not use translog. + } else if (IndexShardTestCase.getEngine(indexShard) instanceof InternalEngine) { IndexShardTestCase.getTranslog(indexShard).getDeletionPolicy().assertNoOpenTranslogRefs(); } } catch (AlreadyClosedException ok) {