|
27 | 27 | import org.opensearch.common.blobstore.BlobPath; |
28 | 28 | import org.opensearch.common.io.PathUtils; |
29 | 29 | import org.opensearch.common.settings.Settings; |
| 30 | +import org.opensearch.common.unit.TimeValue; |
30 | 31 | import org.opensearch.common.util.io.IOUtils; |
31 | 32 | import org.opensearch.core.index.Index; |
32 | 33 | import org.opensearch.core.rest.RestStatus; |
33 | 34 | import org.opensearch.index.IndexService; |
34 | 35 | import org.opensearch.index.IndexSettings; |
| 36 | +import org.opensearch.index.mapper.MapperService; |
35 | 37 | import org.opensearch.index.remote.RemoteStoreEnums; |
36 | 38 | import org.opensearch.index.shard.IndexShard; |
37 | 39 | import org.opensearch.indices.IndicesService; |
38 | 40 | import org.opensearch.indices.RemoteStoreSettings; |
39 | 41 | import org.opensearch.indices.recovery.RecoveryState; |
40 | 42 | import org.opensearch.indices.replication.common.ReplicationType; |
| 43 | +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; |
41 | 44 | import org.opensearch.repositories.blobstore.BlobStoreRepository; |
42 | 45 | import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; |
43 | 46 | import org.opensearch.snapshots.SnapshotInfo; |
44 | 47 | import org.opensearch.snapshots.SnapshotRestoreException; |
45 | 48 | import org.opensearch.snapshots.SnapshotState; |
| 49 | +import org.opensearch.test.BackgroundIndexer; |
46 | 50 | import org.opensearch.test.InternalTestCluster; |
47 | 51 | import org.opensearch.test.OpenSearchIntegTestCase; |
48 | 52 | import org.junit.After; |
|
53 | 57 | import java.nio.file.Path; |
54 | 58 | import java.util.ArrayList; |
55 | 59 | import java.util.Arrays; |
| 60 | +import java.util.HashMap; |
56 | 61 | import java.util.List; |
57 | 62 | import java.util.Map; |
58 | 63 | import java.util.Objects; |
59 | 64 | import java.util.Optional; |
60 | 65 | import java.util.concurrent.ExecutionException; |
| 66 | +import java.util.concurrent.TimeUnit; |
61 | 67 | import java.util.stream.Collectors; |
62 | 68 | import java.util.stream.Stream; |
63 | 69 |
|
64 | 70 | import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; |
| 71 | +import static org.opensearch.index.query.QueryBuilders.matchAllQuery; |
65 | 72 | import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; |
66 | 73 | import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; |
67 | 74 | import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; |
@@ -885,4 +892,88 @@ public void testRestoreOperationsUsingDifferentRepos() throws Exception { |
885 | 892 | ensureGreen(indexName1); |
886 | 893 | assertDocsPresentInIndex(client, indexName1, 3 * numDocsInIndex1); |
887 | 894 | } |
| 895 | + |
| 896 | + public void testContinuousIndexing() throws Exception { |
| 897 | + internalCluster().startClusterManagerOnlyNode(); |
| 898 | + internalCluster().startDataOnlyNode(); |
| 899 | + String index = "test-index"; |
| 900 | + String snapshotRepo = "test-restore-snapshot-repo"; |
| 901 | + String baseSnapshotName = "snapshot_"; |
| 902 | + Path absolutePath1 = randomRepoPath().toAbsolutePath(); |
| 903 | + logger.info("Snapshot Path [{}]", absolutePath1); |
| 904 | + |
| 905 | + createRepository(snapshotRepo, "fs", getRepositorySettings(absolutePath1, true)); |
| 906 | + |
| 907 | + Client client = client(); |
| 908 | + Settings indexSettings = Settings.builder() |
| 909 | + .put(super.indexSettings()) |
| 910 | + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) |
| 911 | + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) |
| 912 | + .build(); |
| 913 | + |
| 914 | + createIndex(index, indexSettings); |
| 915 | + ensureGreen(index); |
| 916 | + |
| 917 | + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( |
| 918 | + RemoteStorePinnedTimestampService.class, |
| 919 | + primaryNodeName(index) |
| 920 | + ); |
| 921 | + |
| 922 | + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(randomIntBetween(1, 5))); |
| 923 | + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueSeconds(randomIntBetween(1, 5))); |
| 924 | + |
| 925 | + long totalDocs = 0; |
| 926 | + Map<String, Long> snapshots = new HashMap<>(); |
| 927 | + int numDocs = randomIntBetween(200, 300); |
| 928 | + totalDocs += numDocs; |
| 929 | + try (BackgroundIndexer indexer = new BackgroundIndexer(index, MapperService.SINGLE_MAPPING_NAME, client(), numDocs)) { |
| 930 | + int numberOfSnapshots = 5; |
| 931 | + for (int i = 0; i < numberOfSnapshots; i++) { |
| 932 | + logger.info("--> waiting for {} docs to be indexed ...", numDocs); |
| 933 | + long finalTotalDocs1 = totalDocs; |
| 934 | + assertBusy(() -> assertEquals(finalTotalDocs1, indexer.totalIndexedDocs()), 120, TimeUnit.SECONDS); |
| 935 | + logger.info("--> {} total docs indexed", totalDocs); |
| 936 | + String snapshotName = baseSnapshotName + i; |
| 937 | + createSnapshot(snapshotRepo, snapshotName, new ArrayList<>()); |
| 938 | + snapshots.put(snapshotName, totalDocs); |
| 939 | + if (i < numberOfSnapshots - 1) { |
| 940 | + numDocs = randomIntBetween(200, 300); |
| 941 | + indexer.continueIndexing(numDocs); |
| 942 | + totalDocs += numDocs; |
| 943 | + } |
| 944 | + } |
| 945 | + } |
| 946 | + |
| 947 | + logger.info("Snapshots Status: " + snapshots); |
| 948 | + |
| 949 | + for (String snapshot : snapshots.keySet()) { |
| 950 | + logger.info("Restoring snapshot: {}", snapshot); |
| 951 | + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(index)).get()); |
| 952 | + |
| 953 | + RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin() |
| 954 | + .cluster() |
| 955 | + .prepareRestoreSnapshot(snapshotRepo, snapshot) |
| 956 | + .setWaitForCompletion(true) |
| 957 | + .setIndices() |
| 958 | + .get(); |
| 959 | + |
| 960 | + assertEquals(RestStatus.OK, restoreSnapshotResponse1.status()); |
| 961 | + |
| 962 | + // Verify restored index's stats |
| 963 | + ensureGreen(TimeValue.timeValueSeconds(60), index); |
| 964 | + long finalTotalDocs = totalDocs; |
| 965 | + assertBusy(() -> { |
| 966 | + Long hits = client().prepareSearch(index) |
| 967 | + .setQuery(matchAllQuery()) |
| 968 | + .setSize((int) finalTotalDocs) |
| 969 | + .storedFields() |
| 970 | + .execute() |
| 971 | + .actionGet() |
| 972 | + .getHits() |
| 973 | + .getTotalHits().value; |
| 974 | + |
| 975 | + assertEquals(snapshots.get(snapshot), hits); |
| 976 | + }); |
| 977 | + } |
| 978 | + } |
888 | 979 | } |
0 commit comments