diff --git a/CHANGELOG.md b/CHANGELOG.md index d12a0555ce97c..233a668b61d4b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add circuit breaker support for gRPC transport to prevent out-of-memory errors ([#20203](https://github.com/opensearch-project/OpenSearch/pull/20203)) - Add index-level-encryption support for snapshots and remote-store ([#20095](https://github.com/opensearch-project/OpenSearch/pull/20095)) - Adding BackWardCompatibility test for remote publication enabled cluster ([#20221](https://github.com/opensearch-project/OpenSearch/pull/20221)) +- Adding chaos test cases for Context Aware Segments ([#20468](https://github.com/opensearch-project/OpenSearch/pull/20468/changes)) ### Changed diff --git a/server/src/test/java/org/apache/lucene/index/IndexWriterUtil.java b/server/src/test/java/org/apache/lucene/index/IndexWriterUtil.java new file mode 100644 index 0000000000000..b285a41a0f1e9 --- /dev/null +++ b/server/src/test/java/org/apache/lucene/index/IndexWriterUtil.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.apache.lucene.index; + +/** + * Utility class which helps overriding index writer configuration which can be overridden by default package. + * + */ +public class IndexWriterUtil { + public static void suppressMergePolicyException(MergeScheduler mergeScheduler) { + if (mergeScheduler instanceof ConcurrentMergeScheduler) { + // This test intentionally produces exceptions + // in the threads that CMS launches; we don't + // want to pollute test output with these. + ((ConcurrentMergeScheduler) mergeScheduler).setSuppressExceptions(); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/engine/chaos/InternalEngineOnDiskWriterFullTests.java b/server/src/test/java/org/opensearch/index/engine/chaos/InternalEngineOnDiskWriterFullTests.java new file mode 100644 index 0000000000000..5409aa628e245 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/engine/chaos/InternalEngineOnDiskWriterFullTests.java @@ -0,0 +1,316 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.chaos; + +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterUtil; +import org.apache.lucene.index.MergeScheduler; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.TotalHitCountCollector; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.tests.store.MockDirectoryWrapper; +import org.apache.lucene.tests.util.TestUtil; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.EngineTestCase; +import org.opensearch.index.engine.InternalEngine; +import org.opensearch.index.engine.RefreshFailedEngineException; +import org.opensearch.index.mapper.ParsedDocument; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.store.Store; +import org.opensearch.test.IndexSettingsModule; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.opensearch.common.util.FeatureFlags.CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +public class InternalEngineOnDiskWriterFullTests extends EngineTestCase { + + @LockFeatureFlag(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG) + public void testAddDocumentOnDiskFull() throws IOException { + final Path storeDirPath = createTempDir(); + final Path translogPath = createTempDir(); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + MockDirectoryWrapper dir = new MockDirectoryWrapper(random(), newFSDirectory(storeDirPath)); + IndexWriterFactory indexWriterFactory = (directory, iwc) -> { + MergeScheduler ms = iwc.getMergeScheduler(); + IndexWriterUtil.suppressMergePolicyException(ms); + return new IndexWriter(directory, iwc); + }; + boolean hitException = false; + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + "test", + Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_CONTEXT_AWARE_ENABLED_SETTING.getKey(), true) + .build() + ); + + int flushedDocCount = 0; + + try ( + Store store = createStore(dir); + InternalEngine engine = createEngine( + indexSettings, + store, + translogPath, + newMergePolicy(), + indexWriterFactory, + null, + globalCheckpoint::get + ) + ) { + int numDocsFirstSegment = 300; + long diskUsage = dir.sizeInBytes(); + // Start with 10 bytes more than we are currently using: + long diskFree = diskUsage + TestUtil.nextInt(random(), 10, 20); + dir.setTrackDiskUsage(true); + dir.setMaxSizeInBytes(diskFree); + try { + for (int i = 0; i < numDocsFirstSegment; i++) { + String id = Integer.toString(i); + ParsedDocument doc = testParsedDocument(id, null, testDocument(), TENANT_SOURCE, null); + engine.index(indexForDoc(doc)); + if (i % 5 == 0) { + engine.refresh("Testing"); + } + + if (i % 20 == 0) { + flushedDocCount = i + 1; + engine.flush(); + } + } + } catch (Exception ex) { + hitException = true; + } + + assertTrue(hitException); + ParsedDocument doc = testParsedDocument("-1", null, testDocument(), B_1, null); + assertThrows(AlreadyClosedException.class, () -> engine.index(indexForDoc(doc))); + } + + try (Store store = createStore(newFSDirectory(storeDirPath))) { + try ( + InternalEngine engine = createEngine( + indexSettings, + store, + translogPath, + newMergePolicy(), + indexWriterFactory, + null, + globalCheckpoint::get + ) + ) { + engine.refresh("Testing"); + // Ensure that whenever Engine re initialises,correctly. All documents may not be present in case translog + // does not gets persisted on node and translog remains in buffer during the crash. + try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.search(new MatchAllDocsQuery(), collector); + assertThat(collector.getTotalHits(), greaterThanOrEqualTo(flushedDocCount)); + } + } catch (Exception ex) { + ex.printStackTrace(); + fail(ex.getMessage()); + } + } + } + + @LockFeatureFlag(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG) + public void testUpdateOrDeleteDocumentOnDiskFull() throws IOException { + final Path storeDirPath = createTempDir(); + final Path translogPath = createTempDir(); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + MockDirectoryWrapper dir = new MockDirectoryWrapper(random(), newFSDirectory(storeDirPath)); + IndexWriterFactory indexWriterFactory = (directory, iwc) -> { + MergeScheduler ms = iwc.getMergeScheduler(); + IndexWriterUtil.suppressMergePolicyException(ms); + return new IndexWriter(directory, iwc); + }; + boolean hitException = false; + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + "test", + Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_CONTEXT_AWARE_ENABLED_SETTING.getKey(), true) + .build() + ); + + int flushedDocCount = 0; + int delCountAtFlush = 0; + + try ( + Store store = createStore(dir); + InternalEngine engine = createEngine( + indexSettings, + store, + translogPath, + newMergePolicy(), + indexWriterFactory, + null, + globalCheckpoint::get + ) + ) { + int numDocsFirstSegment = 300; + AtomicInteger delCount = new AtomicInteger(0); + long diskUsage = dir.sizeInBytes(); + // Start with 10 bytes more than we are currently using: + long diskFree = diskUsage + TestUtil.nextInt(random(), 10, 20); + dir.setTrackDiskUsage(true); + dir.setMaxSizeInBytes(diskFree); + try { + for (int i = 0; i < numDocsFirstSegment; i++) { + String id = Integer.toString(i); + ParsedDocument doc = testParsedDocument(id, null, testDocument(), TENANT_SOURCE, null); + Engine.Index operation = indexForDoc(doc); + engine.index(operation); + if (i % 2 == 0) { + engine.delete(new Engine.Delete(operation.id(), operation.uid(), operation.primaryTerm())); + delCount.incrementAndGet(); + } else if (i % 3 == 0) { + engine.index(indexForDoc(doc)); + } + + if (i % 5 == 0) { + engine.refresh("Testing"); + } + + if (i % 20 == 0) { + flushedDocCount = i + 1; + delCountAtFlush = delCount.get(); + engine.flush(); + } + } + } catch (Exception ex) { + hitException = true; + } + + assertTrue(hitException); + ParsedDocument doc = testParsedDocument("-1", null, testDocument(), B_1, null); + assertThrows(AlreadyClosedException.class, () -> engine.index(indexForDoc(doc))); + } + + try (Store store = createStore(newFSDirectory(storeDirPath))) { + try ( + InternalEngine engine = createEngine( + indexSettings, + store, + translogPath, + newMergePolicy(), + indexWriterFactory, + null, + globalCheckpoint::get + ) + ) { + engine.refresh("Testing"); + // Ensure that whenever Engine re initialises,correctly. All documents may not be present in case translog + // does not gets persisted on node and translog remains in buffer during the crash. + try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.search(new MatchAllDocsQuery(), collector); + assertThat(collector.getTotalHits(), greaterThanOrEqualTo(flushedDocCount - delCountAtFlush)); + } + } catch (Exception ex) { + ex.printStackTrace(); + fail(ex.getMessage()); + } + } + } + + @LockFeatureFlag(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG) + public void testEngineRefreshOnDiskFull() throws IOException { + final Path storeDirPath = createTempDir(); + final Path translogPath = createTempDir(); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + MockDirectoryWrapper dir = new MockDirectoryWrapper(random(), newFSDirectory(storeDirPath)); + IndexWriterFactory indexWriterFactory = (directory, iwc) -> { + MergeScheduler ms = iwc.getMergeScheduler(); + IndexWriterUtil.suppressMergePolicyException(ms); + return new IndexWriter(directory, iwc); + }; + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + "test", + Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_CONTEXT_AWARE_ENABLED_SETTING.getKey(), true) + .build() + ); + + int flushedDocCount = 0; + try ( + Store store = createStore(dir); + InternalEngine engine = createEngine( + indexSettings, + store, + translogPath, + newMergePolicy(), + indexWriterFactory, + null, + globalCheckpoint::get + ) + ) { + int numDocsFirstSegment = randomIntBetween(50, 100); + for (int i = 0; i < numDocsFirstSegment; i++) { + String id = Integer.toString(i); + ParsedDocument doc = testParsedDocument(id, null, testContextSpecificDocument(), TENANT_SOURCE, null); + engine.index(indexForDoc(doc)); + if (i % 20 == 0) { + flushedDocCount = i + 1; + engine.flush(); + } + } + + long diskUsage = dir.sizeInBytes(); + // Start with 100 bytes more than we are currently using: + long diskFree = diskUsage + TestUtil.nextInt(random(), 10, 20); + dir.setTrackDiskUsage(true); + dir.setMaxSizeInBytes(diskFree); + for (int i = numDocsFirstSegment; i < numDocsFirstSegment + 60; i++) { + String id = Integer.toString(i); + ParsedDocument doc = testParsedDocument(id, null, testContextSpecificDocument(), TENANT_SOURCE, null); + engine.index(indexForDoc(doc)); + } + + expectThrows(RefreshFailedEngineException.class, () -> engine.refresh("testing")); + ParsedDocument doc = testParsedDocument("-1", null, testDocument(), B_1, null); + assertThrows(AlreadyClosedException.class, () -> engine.index(indexForDoc(doc))); + } + + try (Store store = createStore(newFSDirectory(storeDirPath))) { + try ( + InternalEngine engine = createEngine( + indexSettings, + store, + translogPath, + newMergePolicy(), + indexWriterFactory, + null, + globalCheckpoint::get + ) + ) { + engine.refresh("Testing"); + // Ensure that whenever Engine re initialises,correctly. All documents may not be present in case translog + // does not gets persisted on node and translog remains in buffer during the crash. + try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.search(new MatchAllDocsQuery(), collector); + assertThat(collector.getTotalHits(), greaterThanOrEqualTo(flushedDocCount)); + } + } catch (Exception ex) { + ex.printStackTrace(); + fail(ex.getMessage()); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/index/engine/chaos/InternalEngineOnJRECrashTests.java b/server/src/test/java/org/opensearch/index/engine/chaos/InternalEngineOnJRECrashTests.java new file mode 100644 index 0000000000000..5d7e656eea439 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/engine/chaos/InternalEngineOnJRECrashTests.java @@ -0,0 +1,138 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.chaos; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.store.Directory; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.EngineTestCase; +import org.opensearch.index.engine.InternalEngine; +import org.opensearch.index.mapper.ParsedDocument; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.store.Store; +import org.opensearch.test.IndexSettingsModule; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.opensearch.common.util.FeatureFlags.CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG; + +public class InternalEngineOnJRECrashTests extends EngineTestCase { + + private void runIndexingWorkload(Engine engine, AtomicInteger operationCount) throws IOException { + long i = 0; + while (!Thread.currentThread().isInterrupted()) { + final String id = "docid#" + i; + ParsedDocument doc = testParsedDocument( + id, + null, + testContextSpecificDocumentWithTenantField("grouping_criteria"), + TENANT_SOURCE, + null + ); + engine.index(indexForDoc(doc)); + operationCount.incrementAndGet(); + i++; + if (i % 100 == 0) { + engine.refresh("testing"); + } + } + } + + public void verifyDataPersistenceAfterCrash(int crashDelayMs) throws Exception { + final Path storeDirPath = createTempDir(); + final Path translogPath = createTempDir(); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + AtomicInteger operationCount = new AtomicInteger(0); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + "test", + Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) + .put(IndexSettings.INDEX_CONTEXT_AWARE_ENABLED_SETTING.getKey(), true) + .build() + ); + + try (Store store = createStore(newFSDirectory(storeDirPath))) { + try ( + InternalEngine engine = createEngine( + config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get) + ) + ) { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch finished = new CountDownLatch(1); + // Thread that runs the actual test + Thread testThread = new Thread(() -> { + try { + started.countDown(); + runIndexingWorkload(engine, operationCount); + } catch (IOException ignore) { + logger.error("Ignoring exception " + ignore.getMessage(), ignore); + } finally { + finished.countDown(); + } + }, "engine-test-thread"); + + testThread.setUncaughtExceptionHandler( + (t, e) -> { logger.info("Ignoring uncaught exception in an interrupted thread", e); } + ); + + // Start the indexing thread. + testThread.start(); + // Wait for the indexing thread to start. + started.await(); + + // Wait main thread for specified time period, so that write continues on indexing thread. + Thread.sleep(crashDelayMs); + + // Interrupt the thread. + testThread.interrupt(); + // Wait for thread to finish (with timeout) + boolean threadFinished = finished.await(5, TimeUnit.SECONDS); + if (!threadFinished) { + logger.warn("Thread did not finish in time"); + } + } + } + + // Validate if there is no corruption on JVM crash. + // Do we need to remove Write.lock file incase rollback did not happened correctly. + // TODO: Validate if doc count remained same post jvm crash via translog replay (Validate JVM crash prevents any + // translog entry to be written to disk from memory). + try (Directory directory = newFSDirectory(storeDirPath)) { + assertTrue(DirectoryReader.indexExists(directory)); + } + } + + @LockFeatureFlag(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG) + public void testEngineOperationsForJRECrashAfterDelay_10ms() throws Exception { + verifyDataPersistenceAfterCrash(10); + } + + @LockFeatureFlag(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG) + public void testEngineOperationsForJRECrashAfterDelay_100ms() throws Exception { + verifyDataPersistenceAfterCrash(100); + } + + @LockFeatureFlag(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG) + public void testEngineOperationsForJRECrashAfterDelay_1000ms() throws Exception { + verifyDataPersistenceAfterCrash(1000); + } + + @LockFeatureFlag(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG) + public void testEngineOperationsForJRECrashAfterDelay_10000ms() throws Exception { + verifyDataPersistenceAfterCrash(10000); + } +} diff --git a/server/src/test/java/org/opensearch/index/engine/chaos/InternalEngineOnOutOfFileDescriptorsTests.java b/server/src/test/java/org/opensearch/index/engine/chaos/InternalEngineOnOutOfFileDescriptorsTests.java new file mode 100644 index 0000000000000..8ab4549d3ff02 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/engine/chaos/InternalEngineOnOutOfFileDescriptorsTests.java @@ -0,0 +1,269 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.chaos; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterUtil; +import org.apache.lucene.index.MergeScheduler; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.TotalHitCountCollector; +import org.apache.lucene.tests.store.MockDirectoryWrapper; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.EngineException; +import org.opensearch.index.engine.EngineTestCase; +import org.opensearch.index.engine.InternalEngine; +import org.opensearch.index.mapper.ParsedDocument; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.store.Store; +import org.opensearch.test.IndexSettingsModule; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.opensearch.common.util.FeatureFlags.CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +public class InternalEngineOnOutOfFileDescriptorsTests extends EngineTestCase { + + @LockFeatureFlag(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG) + public void testAddDocumentOnOutOfFileDescriptors() throws IOException { + final Path storeDirPath = createTempDir(); + final Path translogPath = createTempDir(); + double rate = 0.0; + MockDirectoryWrapper dir = new MockDirectoryWrapper(random(), newFSDirectory(storeDirPath)); + dir.setRandomIOExceptionRateOnOpen(rate); + IndexWriterFactory indexWriterFactory = (directory, iwc) -> { + MergeScheduler ms = iwc.getMergeScheduler(); + IndexWriterUtil.suppressMergePolicyException(ms); + return new IndexWriter(directory, iwc); + }; + boolean hitException = false; + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + "test", + Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_CONTEXT_AWARE_ENABLED_SETTING.getKey(), true) + .build() + ); + + AtomicInteger docCount = new AtomicInteger(0); + int docCountAtFlush = 0; + try ( + Store store = createStore(dir); + InternalEngine engine = createEngine( + indexSettings, + store, + translogPath, + newMergePolicy(), + indexWriterFactory, + null, + globalCheckpoint::get + ) + ) { + int numDocsFirstSegment = randomIntBetween(50, 100); + try { + for (int i = 0; i < numDocsFirstSegment; i++) { + String id = Integer.toString(i); + ParsedDocument doc = testParsedDocument(id, null, testDocument(), TENANT_SOURCE, null); + engine.index(indexForDoc(doc)); + docCount.incrementAndGet(); + + if (i % 20 == 0) { + docCountAtFlush = docCount.get(); + engine.flush(); + } + } + } catch (IOException ex) { + hitException = true; + } + + assertFalse(hitException); + assertTrue(DirectoryReader.indexExists(dir)); + + try { + engine.refresh("testing"); + } catch (EngineException e) { + hitException = true; + } + + assertTrue(DirectoryReader.indexExists(dir)); + rate = 1.0; + dir.setRandomIOExceptionRateOnOpen(rate); + try { + for (int i = numDocsFirstSegment; i < numDocsFirstSegment + numDocsFirstSegment; i++) { + String id = Integer.toString(i); + ParsedDocument doc = testParsedDocument(id, null, testDocument(), TENANT_SOURCE, null); + engine.index(indexForDoc(doc)); + } + + engine.refresh("testing"); + } catch (EngineException e) { + hitException = true; + } + + assertTrue(hitException); + assertTrue(DirectoryReader.indexExists(dir)); + rate = 0.0; + dir.setRandomIOExceptionRateOnOpen(rate); + } + + try (Store store = createStore(newFSDirectory(storeDirPath))) { + try ( + InternalEngine engine = createEngine( + indexSettings, + store, + translogPath, + newMergePolicy(), + indexWriterFactory, + null, + globalCheckpoint::get + ) + ) { + engine.refresh("Testing"); + // Ensure that whenever Engine re initialises,correctly. All documents may not be present in case translog + // does not gets persisted on node and translog remains in buffer during the crash. + try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.search(new MatchAllDocsQuery(), collector); + assertThat(collector.getTotalHits(), greaterThanOrEqualTo(docCountAtFlush)); + } + } catch (Exception ex) { + ex.printStackTrace(); + fail(ex.getMessage()); + } + } + } + + @LockFeatureFlag(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG) + public void testUpdateOrDeleteDocumentOnOutOfFileDescriptors() throws IOException { + final Path storeDirPath = createTempDir(); + final Path translogPath = createTempDir(); + double rate = 0.0; + MockDirectoryWrapper dir = new MockDirectoryWrapper(random(), newFSDirectory(storeDirPath)); + dir.setRandomIOExceptionRateOnOpen(rate); + IndexWriterFactory indexWriterFactory = (directory, iwc) -> { + MergeScheduler ms = iwc.getMergeScheduler(); + IndexWriterUtil.suppressMergePolicyException(ms); + return new IndexWriter(directory, iwc); + }; + boolean hitException = false; + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + "test", + Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_CONTEXT_AWARE_ENABLED_SETTING.getKey(), true) + .build() + ); + + AtomicInteger docCount = new AtomicInteger(0); + AtomicInteger delCount = new AtomicInteger(0); + int docCountAtFlush = 0; + int delCountAtFlush = 0; + try ( + Store store = createStore(dir); + InternalEngine engine = createEngine( + indexSettings, + store, + translogPath, + newMergePolicy(), + indexWriterFactory, + null, + globalCheckpoint::get + ) + ) { + int numDocsFirstSegment = randomIntBetween(50, 100); + try { + for (int i = 0; i < numDocsFirstSegment; i++) { + String id = Integer.toString(i); + ParsedDocument doc = testParsedDocument(id, null, testDocument(), TENANT_SOURCE, null); + Engine.Index operation = indexForDoc(doc); + engine.index(operation); + docCount.incrementAndGet(); + + if (i % 2 == 0) { + engine.delete(new Engine.Delete(operation.id(), operation.uid(), operation.primaryTerm())); + delCount.incrementAndGet(); + } else if (i % 3 == 0) { + engine.index(indexForDoc(doc)); + } + + if (i % 20 == 0) { + docCountAtFlush = docCount.get(); + delCountAtFlush = delCount.get(); + engine.flush(); + } + } + } catch (IOException ex) { + hitException = true; + } + + assertFalse(hitException); + assertTrue(DirectoryReader.indexExists(dir)); + + try { + engine.refresh("testing"); + } catch (EngineException e) { + hitException = true; + } + + assertTrue(DirectoryReader.indexExists(dir)); + rate = 1.0; + dir.setRandomIOExceptionRateOnOpen(rate); + try { + for (int i = numDocsFirstSegment; i < numDocsFirstSegment + numDocsFirstSegment; i++) { + String id = Integer.toString(i); + ParsedDocument doc = testParsedDocument(id, null, testDocument(), TENANT_SOURCE, null); + engine.index(indexForDoc(doc)); + } + + engine.refresh("testing"); + } catch (EngineException e) { + hitException = true; + } + + assertTrue(hitException); + assertTrue(DirectoryReader.indexExists(dir)); + rate = 0.0; + dir.setRandomIOExceptionRateOnOpen(rate); + } + + try (Store store = createStore(newFSDirectory(storeDirPath))) { + try ( + InternalEngine engine = createEngine( + indexSettings, + store, + translogPath, + newMergePolicy(), + indexWriterFactory, + null, + globalCheckpoint::get + ) + ) { + engine.refresh("Testing"); + // Ensure that whenever Engine re initialises,correctly. All documents may not be present in case translog + // does not gets persisted on node and translog remains in buffer during the crash. + try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.search(new MatchAllDocsQuery(), collector); + assertThat(collector.getTotalHits(), greaterThanOrEqualTo(docCountAtFlush - delCountAtFlush)); + } + } catch (Exception ex) { + ex.printStackTrace(); + fail(ex.getMessage()); + } + } + } +} diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index 2220be6daba81..264695dcbc5d1 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -198,6 +198,7 @@ public abstract class EngineTestCase extends OpenSearchTestCase { protected Path replicaTranslogDir; // A default primary term is used by engine instances created in this test. protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(1L); + protected static final BytesArray TENANT_SOURCE = bytesArray("{\"grouping_criteria\": \"grouping_criteria\"}"); protected static void assertVisibleCount(Engine engine, int numDocs, boolean refresh) throws IOException { if (refresh) { @@ -395,6 +396,13 @@ protected static ParseContext.Document testDocumentWithTextField(String value) { return document; } + protected static ParseContext.Document testContextSpecificDocumentWithTenantField(String tenant) { + ParseContext.Document document = testDocument(); + document.add(new TextField("grouping_criteria", tenant, Field.Store.YES)); + document.setGroupingCriteria(tenant); + return document; + } + protected static ParseContext.Document testDocumentWithGroupingCriteria() { ParseContext.Document document = new ParseContext.Document(); document.setGroupingCriteria("grouping_criteria"); @@ -816,6 +824,17 @@ protected long doGenerateSeqNoForOperation(final Operation operation) { ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation); } + + @Override + public IndexResult index(Index index) throws IOException { + if (config().getIndexSettings().isContextAwareEnabled()) { + for (ParseContext.Document doc : index.docs()) { + doc.setGroupingCriteria("grouping_criteria"); + } + } + + return super.index(index); + } }; } else { return new InternalTestEngine(config, IndexWriter.MAX_DOCS, localCheckpointTrackerSupplier) { @@ -832,6 +851,17 @@ protected long doGenerateSeqNoForOperation(final Operation operation) { ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation); } + + @Override + public IndexResult index(Index index) throws IOException { + if (config().getIndexSettings().isContextAwareEnabled()) { + for (ParseContext.Document doc : index.docs()) { + doc.setGroupingCriteria("grouping_criteria"); + } + } + + return super.index(index); + } }; } @@ -1663,7 +1693,8 @@ public static void assertAtMostOneLuceneDocumentPerSequenceNumber(IndexSettings } public static MapperService createMapperServiceForContextAwareIndex() throws IOException { - String mapping = "{\"properties\": {}}"; + String mapping = + "{\"properties\":{\"grouping_criteria\":{\"type\":\"text\",\"store\":true}},\"context_aware_grouping\":{\"fields\":[\"grouping_criteria\"],\"script\":{\"source\":\"String.valueOf(grouping_criteria)\"}}}"; IndexMetadata indexMetadata = IndexMetadata.builder("test") .settings( Settings.builder()