Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,126 @@ public void testCloneSnapshotV2MasterSwitch() throws Exception {
assertThat(snapInfo, containsInAnyOrder(csr.getSnapshotInfo(), csr2.getSnapshotInfo()));
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/pull/16191")
public void testDeleteWhileV2CreateOngoing() throws Exception {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String repoName = "test-create-snapshot-repo";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Settings.Builder settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), false);
createRepository(repoName, "mock", settings);

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

startFullSnapshot(repoName, "snapshot-v1").actionGet();

// Creating a v2 repo
settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true);
createRepository(repoName, "mock", settings);

blockClusterManagerOnWriteIndexFile(repoName);

final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, "snapshot-v2");
awaitNumberOfSnapshotsInProgress(1);

ActionFuture<AcknowledgedResponse> a = startDeleteSnapshot(repoName, "snapshot-v1");

unblockNode(repoName, clusterManagerName);
CreateSnapshotResponse csr = snapshotFuture.actionGet();
assertTrue(csr.getSnapshotInfo().getPinnedTimestamp() != 0);
assertTrue(a.actionGet().isAcknowledged());
List<SnapshotInfo> snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots();
assertEquals(1, snapInfo.size());
assertThat(snapInfo, contains(csr.getSnapshotInfo()));
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/pull/16191")
public void testDeleteAndCloneV1WhileV2CreateOngoing() throws Exception {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String repoName = "test-create-snapshot-repo";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Settings.Builder settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), false);
createRepository(repoName, "mock", settings);

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

startFullSnapshot(repoName, "snapshot-v1").actionGet();
startFullSnapshot(repoName, "snapshot-v1-2").actionGet();

// Creating a v2 repo
settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true);
createRepository(repoName, "mock", settings);

blockClusterManagerOnWriteIndexFile(repoName);

final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, "snapshot-v2");
awaitNumberOfSnapshotsInProgress(1);

ActionFuture<AcknowledgedResponse> startDeleteSnapshot = startDeleteSnapshot(repoName, "snapshot-v1");
ActionFuture<AcknowledgedResponse> startCloneSnapshot = startCloneSnapshot(repoName, "snapshot-v1-2", "snapshot-v1-2-clone");

unblockNode(repoName, clusterManagerName);
CreateSnapshotResponse csr = snapshotFuture.actionGet();
assertTrue(csr.getSnapshotInfo().getPinnedTimestamp() != 0);
assertTrue(startDeleteSnapshot.actionGet().isAcknowledged());
assertTrue(startCloneSnapshot.actionGet().isAcknowledged());
List<SnapshotInfo> snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots();
assertEquals(3, snapInfo.size());
}

protected ActionFuture<AcknowledgedResponse> startCloneSnapshot(String repoName, String sourceSnapshotName, String snapshotName) {
logger.info("--> creating full snapshot [{}] to repo [{}]", snapshotName, repoName);
return clusterAdmin().prepareCloneSnapshot(repoName, sourceSnapshotName, snapshotName).setIndices("*").execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ public ClusterState execute(ClusterState currentState) {
);
if (request.partial() == false) {
Set<String> missing = new HashSet<>();
for (final Map.Entry<ShardId, SnapshotsInProgress.ShardSnapshotStatus> entry : shards.entrySet()) {
for (final Map.Entry<ShardId, ShardSnapshotStatus> entry : shards.entrySet()) {
if (entry.getValue().state() == ShardState.MISSING) {
missing.add(entry.getKey().getIndex().getName());
}
Expand Down Expand Up @@ -789,8 +789,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
new ActionListener<RepositoryData>() {
@Override
public void onResponse(RepositoryData repositoryData) {
leaveRepoLoop(repositoryName);
if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) {
leaveRepoLoop(repositoryName);
failSnapshotCompletionListeners(
snapshot,
new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager")
Expand All @@ -805,6 +805,9 @@ public void onResponse(RepositoryData repositoryData) {
return;
}
listener.onResponse(snapshotInfo);
// For snapshot-v2, we don't allow concurrent snapshots . But meanwhile non-v2 snapshot operations
// can get queued . This is triggering them.
runNextQueuedOperation(repositoryData, repositoryName, true);
cleanOrphanTimestamp(repositoryName, repositoryData);
}

Expand Down Expand Up @@ -1193,8 +1196,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
new ActionListener<RepositoryData>() {
@Override
public void onResponse(RepositoryData repositoryData) {
leaveRepoLoop(repositoryName);
if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) {
leaveRepoLoop(repositoryName);
failSnapshotCompletionListeners(
snapshot,
new SnapshotException(snapshot, "Aborting Snapshot-v2 clone, no longer cluster manager")
Expand All @@ -1210,6 +1213,9 @@ public void onResponse(RepositoryData repositoryData) {
}
logger.info("snapshot-v2 clone [{}] completed successfully", snapshot);
listener.onResponse(null);
// For snapshot-v2, we don't allow concurrent snapshots . But meanwhile non-v2 snapshot operations
// can get queued . This is triggering them.
runNextQueuedOperation(repositoryData, repositoryName, true);
}

@Override
Expand Down Expand Up @@ -2173,7 +2179,7 @@ private static boolean assertNoDanglingSnapshots(ClusterState state) {
/**
* Updates the state of in-progress snapshots in reaction to a change in the configuration of the cluster nodes (cluster-manager fail-over or
* disconnect of a data node that was executing a snapshot) or a routing change that started shards whose snapshot state is
* {@link SnapshotsInProgress.ShardState#WAITING}.
* {@link ShardState#WAITING}.
*
* @param changedNodes true iff either a cluster-manager fail-over occurred or a data node that was doing snapshot work got removed from the
* cluster
Expand Down Expand Up @@ -3181,7 +3187,7 @@ private static List<SnapshotId> matchingSnapshotIds(
}
}
}
return Collections.unmodifiableList(new ArrayList<>(foundSnapshots));
return unmodifiableList(new ArrayList<>(foundSnapshots));
}

// Return in-progress snapshot entries by name and repository in the given cluster state or null if none is found
Expand Down Expand Up @@ -3351,7 +3357,7 @@ public ClusterState execute(ClusterState currentState) {
reusedExistingDelete = true;
return currentState;
}
final List<SnapshotId> toDelete = Collections.unmodifiableList(new ArrayList<>(snapshotIdsRequiringCleanup));
final List<SnapshotId> toDelete = unmodifiableList(new ArrayList<>(snapshotIdsRequiringCleanup));
ensureBelowConcurrencyLimit(repoName, toDelete.get(0).getName(), snapshots, deletionsInProgress);
newDelete = new SnapshotDeletionsInProgress.Entry(
toDelete,
Expand Down Expand Up @@ -3941,7 +3947,7 @@ private static <T> void completeListenersIgnoringException(@Nullable List<Action
* @param useShardGenerations whether to write {@link ShardGenerations} during the snapshot
* @return list of shard to be included into current snapshot
*/
private static Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(
private static Map<ShardId, ShardSnapshotStatus> shards(
SnapshotsInProgress snapshotsInProgress,
@Nullable SnapshotDeletionsInProgress deletionsInProgress,
Metadata metadata,
Expand All @@ -3951,7 +3957,7 @@ private static Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(
RepositoryData repositoryData,
String repoName
) {
final Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = new HashMap<>();
final Map<ShardId, ShardSnapshotStatus> builder = new HashMap<>();
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
final InFlightShardSnapshotStates inFlightShardStates = InFlightShardSnapshotStates.forRepo(
repoName,
Expand Down Expand Up @@ -3988,7 +3994,7 @@ private static Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(
}
final ShardSnapshotStatus shardSnapshotStatus;
if (indexRoutingTable == null) {
shardSnapshotStatus = new SnapshotsInProgress.ShardSnapshotStatus(
shardSnapshotStatus = new ShardSnapshotStatus(
null,
ShardState.MISSING,
"missing routing table",
Expand Down