Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -174,6 +175,118 @@ public void testCloneShallowCopyV2() throws Exception {
assertThat(cloneSnapshotInfo.totalShards(), equalTo(sourceSnapshotInfo.totalShards()));
}

public void testCloneShallowCopyV2DeletedIndex() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath));
internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath));
internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath));

String indexName1 = "testindex1";
String indexName2 = "testindex2";
String indexName3 = "testindex3";
String snapshotRepoName = "test-clone-snapshot-repo";
String snapshotName1 = "test-create-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Client client = client();

assertAcked(
client.admin()
.cluster()
.preparePutRepository(snapshotRepoName)
.setType(FsRepository.TYPE)
.setSettings(
Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true)
)
);

createIndex(indexName1, getRemoteStoreBackedIndexSettings());
createIndex(indexName2, getRemoteStoreBackedIndexSettings());

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexRandomDocs(indexName1, numDocsInIndex1);
indexRandomDocs(indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.get();
SnapshotInfo sourceSnapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(sourceSnapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(sourceSnapshotInfo.successfulShards(), greaterThan(0));
assertThat(sourceSnapshotInfo.successfulShards(), equalTo(sourceSnapshotInfo.totalShards()));
assertThat(sourceSnapshotInfo.snapshotId().getName(), equalTo(snapshotName1));

// Validate that the snapshot was created
final BlobStoreRepository repository = (BlobStoreRepository) internalCluster().getCurrentClusterManagerNodeInstance(
RepositoriesService.class
).repository(snapshotRepoName);
PlainActionFuture<RepositoryData> repositoryDataPlainActionFuture = new PlainActionFuture<>();
repository.getRepositoryData(repositoryDataPlainActionFuture);

RepositoryData repositoryData = repositoryDataPlainActionFuture.get();

assertTrue(repositoryData.getSnapshotIds().contains(sourceSnapshotInfo.snapshotId()));

createIndex(indexName3, getRemoteStoreBackedIndexSettings());
indexRandomDocs(indexName3, 10);
ensureGreen(indexName3);

assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName1)).get());
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName2)).get());

AcknowledgedResponse response = client().admin()
.cluster()
.prepareCloneSnapshot(snapshotRepoName, snapshotName1, "test_clone_snapshot1")
.setIndices("*")
.get();
assertTrue(response.isAcknowledged());
awaitClusterManagerFinishRepoOperations();

AtomicReference<SnapshotId> cloneSnapshotId = new AtomicReference<>();
// Validate that snapshot is present in repository data
waitUntil(() -> {
PlainActionFuture<RepositoryData> repositoryDataPlainActionFutureClone = new PlainActionFuture<>();
repository.getRepositoryData(repositoryDataPlainActionFutureClone);

RepositoryData repositoryData1;
try {
repositoryData1 = repositoryDataPlainActionFutureClone.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
for (SnapshotId snapshotId : repositoryData1.getSnapshotIds()) {
if (snapshotId.getName().equals("test_clone_snapshot1")) {
cloneSnapshotId.set(snapshotId);
return true;
}
}
return false;
}, 90, TimeUnit.SECONDS);

final SnapshotId cloneSnapshotIdFinal = cloneSnapshotId.get();
SnapshotInfo cloneSnapshotInfo = PlainActionFuture.get(
f -> repository.threadPool().generic().execute(ActionRunnable.supply(f, () -> repository.getSnapshotInfo(cloneSnapshotIdFinal)))
);

assertThat(cloneSnapshotInfo.getPinnedTimestamp(), equalTo(sourceSnapshotInfo.getPinnedTimestamp()));
for (String index : sourceSnapshotInfo.indices()) {
assertTrue(cloneSnapshotInfo.indices().contains(index));

}
assertThat(cloneSnapshotInfo.totalShards(), equalTo(sourceSnapshotInfo.totalShards()));
}

public void testCloneShallowCopyAfterDisablingV2() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,6 @@ public void cloneSnapshotV2(
) {

long startTime = System.currentTimeMillis();
ClusterState currentState = clusterService.state();
String snapshotName = snapshot.getSnapshotId().getName();
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.URGENT) {
private SnapshotsInProgress.Entry newEntry;
Expand Down Expand Up @@ -1146,8 +1145,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);

executor.execute(ActionRunnable.supply(snapshotInfoListener, () -> repository.getSnapshotInfo(sourceSnapshotId)));
final ShardGenerations shardGenerations = repositoryData.shardGenerations();

snapshotInfoListener.whenComplete(snapshotInfo -> {
final SnapshotInfo cloneSnapshotInfo = new SnapshotInfo(
snapshot.getSnapshotId(),
Expand All @@ -1167,17 +1164,28 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
throw new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2 clone, no longer cluster manager");
}
final StepListener<RepositoryData> pinnedTimestampListener = new StepListener<>();
pinnedTimestampListener.whenComplete(repoData -> {
final StepListener<Metadata> metadataListener = new StepListener<>();
pinnedTimestampListener.whenComplete(
rData -> threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> {
final Metadata.Builder metaBuilder = Metadata.builder(repository.getSnapshotGlobalMetadata(newEntry.source()));
for (IndexId index : newEntry.indices()) {
metaBuilder.put(repository.getSnapshotIndexMetaData(repositoryData, newEntry.source(), index), false);
}
return metaBuilder.build();
})),
e -> {
logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} ", repositoryName, snapshotName);
stateWithoutSnapshotV2(newState);
leaveRepoLoop(repositoryName);
listener.onFailure(e);
}
);
metadataListener.whenComplete(meta -> {
ShardGenerations shardGenerations = buildGenerationsV2(newEntry, meta);
repository.finalizeSnapshot(
shardGenerations,
repositoryData.getGenId(),
metadataForSnapshot(
currentState.metadata(),
newEntry.includeGlobalState(),
false,
newEntry.dataStreams(),
newEntry.indices()
),
metadataForSnapshot(meta, newEntry.includeGlobalState(), false, newEntry.dataStreams(), newEntry.indices()),
cloneSnapshotInfo,
repositoryData.getVersion(sourceSnapshotId),
state -> stateWithoutSnapshot(state, snapshot),
Expand Down Expand Up @@ -1221,7 +1229,7 @@ public void onFailure(Exception e) {
}
);
}, e -> {
logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} ", repositoryName, snapshotName);
logger.error("Failed to retrieve metadata for snapshot-v2 {} {} ", repositoryName, snapshotName);
stateWithoutSnapshotV2(newState);
leaveRepoLoop(repositoryName);
listener.onFailure(e);
Expand Down Expand Up @@ -1962,6 +1970,17 @@ private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snaps
return builder.build();
}

private static ShardGenerations buildGenerationsV2(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
ShardGenerations.Builder builder = ShardGenerations.builder();
snapshot.indices().forEach(indexId -> {
int shardCount = metadata.index(indexId.getName()).getNumberOfShards();
for (int i = 0; i < shardCount; i++) {
builder.put(indexId, i, null);
}
});
return builder.build();
}

private static Metadata metadataForSnapshot(
Metadata metadata,
boolean includeGlobalState,
Expand Down