Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ public void testIndexDeletionNoPinnedTimestamps() throws Exception {
client().admin().indices().prepareDelete(INDEX_NAME).get();

assertBusy(() -> {
assertEquals(0, Files.list(translogMetadataPath).collect(Collectors.toList()).size());
assertEquals(0, Files.list(translogDataPath).collect(Collectors.toList()).size());
assertEquals(1, Files.list(translogMetadataPath).collect(Collectors.toList()).size());
assertEquals(4, Files.list(translogDataPath).collect(Collectors.toList()).size());
});
}

Expand Down Expand Up @@ -490,7 +490,7 @@ public void testIndexDeletionWithPinnedTimestamps() throws Exception {

assertBusy(() -> {
List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
assertEquals(1, metadataFiles.size());
assertEquals(2, metadataFiles.size());

verifyTranslogDataFileCount(metadataFiles, translogDataPath);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,26 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.index.Index;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotRestoreException;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
Expand All @@ -53,15 +57,18 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
Expand Down Expand Up @@ -885,4 +892,88 @@ public void testRestoreOperationsUsingDifferentRepos() throws Exception {
ensureGreen(indexName1);
assertDocsPresentInIndex(client, indexName1, 3 * numDocsInIndex1);
}

public void testContinuousIndexing() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
String index = "test-index";
String snapshotRepo = "test-restore-snapshot-repo";
String baseSnapshotName = "snapshot_";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

createRepository(snapshotRepo, "fs", getRepositorySettings(absolutePath1, true));

Client client = client();
Settings indexSettings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();

createIndex(index, indexSettings);
ensureGreen(index);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(index)
);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(randomIntBetween(1, 5)));
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueSeconds(randomIntBetween(1, 5)));

long totalDocs = 0;
Map<String, Long> snapshots = new HashMap<>();
int numDocs = randomIntBetween(200, 300);
totalDocs += numDocs;
try (BackgroundIndexer indexer = new BackgroundIndexer(index, MapperService.SINGLE_MAPPING_NAME, client(), numDocs)) {
int numberOfSnapshots = 5;
for (int i = 0; i < numberOfSnapshots; i++) {
logger.info("--> waiting for {} docs to be indexed ...", numDocs);
long finalTotalDocs1 = totalDocs;
assertBusy(() -> assertEquals(finalTotalDocs1, indexer.totalIndexedDocs()), 120, TimeUnit.SECONDS);
logger.info("--> {} total docs indexed", totalDocs);
String snapshotName = baseSnapshotName + i;
createSnapshot(snapshotRepo, snapshotName, new ArrayList<>());
snapshots.put(snapshotName, totalDocs);
if (i < numberOfSnapshots - 1) {
numDocs = randomIntBetween(200, 300);
indexer.continueIndexing(numDocs);
totalDocs += numDocs;
}
}
}

logger.info("Snapshots Status: " + snapshots);

for (String snapshot : snapshots.keySet()) {
logger.info("Restoring snapshot: {}", snapshot);
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(index)).get());

RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepo, snapshot)
.setWaitForCompletion(true)
.setIndices()
.get();

assertEquals(RestStatus.OK, restoreSnapshotResponse1.status());

// Verify restored index's stats
ensureGreen(TimeValue.timeValueSeconds(60), index);
long finalTotalDocs = totalDocs;
assertBusy(() -> {
Long hits = client().prepareSearch(index)
.setQuery(matchAllQuery())
.setSize((int) finalTotalDocs)
.storedFields()
.execute()
.actionGet()
.getHits()
.getTotalHits().value;

assertEquals(snapshots.get(snapshot), hits);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -891,6 +892,16 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
lastSuccessfulFetchOfPinnedTimestamps
);

if (metadataFilesEligibleToDelete.isEmpty()) {
logger.debug("No metadata files are eligible to be deleted based on lastNMetadataFilesToKeep and age");
return;
}

// If pinned timestamps are enabled, make sure to not delete last metadata file.
if (RemoteStoreSettings.isPinnedTimestampsEnabled()) {
metadataFilesEligibleToDelete.remove(sortedMetadataFileList.get(0));
}

List<String> metadataFilesToBeDeleted = metadataFilesEligibleToDelete.stream()
.filter(metadataFile -> allLockFiles.contains(metadataFile) == false)
.collect(Collectors.toList());
Expand All @@ -905,7 +916,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
Set<String> activeSegmentRemoteFilenames = new HashSet<>();

final Set<String> metadataFilesToFilterActiveSegments = getMetadataFilesToFilterActiveSegments(
lastNMetadataFilesToKeep,
sortedMetadataFileList.indexOf(metadataFilesEligibleToDelete.get(0)),
sortedMetadataFileList,
allLockFiles
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, indexDeleted);

// If index is not deleted, make sure to keep latest metadata file
if (indexDeleted == false) {
if (indexDeleted == false || RemoteStoreSettings.isPinnedTimestampsEnabled()) {
metadataFilesToBeDeleted.remove(metadataFiles.get(0));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,9 @@ public void testIndexDeletionWithNoPinnedTimestampNoRecentMdFiles() throws Excep
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));

assertBusy(() -> {
assertEquals(0, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());
assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());
assertEquals(
0,
12,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});
Expand Down