Skip to content

Commit e217065

Browse files
author
Shivansh Arora
committed
Create Remote Object managers and use them in orchestration from RemoteClusterStateService
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
1 parent 49282c1 commit e217065

21 files changed

Lines changed: 1760 additions & 1152 deletions

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@
2626
import java.util.function.Function;
2727
import java.util.stream.Collectors;
2828

29-
import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA;
30-
import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA;
31-
import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
32-
import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX;
3329
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
34-
import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA;
35-
import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA;
30+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
31+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_FILE_PREFIX;
32+
import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA;
33+
import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA;
34+
import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA;
35+
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;
3636

3737
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
3838
public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase {

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import static org.opensearch.cluster.metadata.Metadata.CLUSTER_READ_ONLY_BLOCK;
5858
import static org.opensearch.cluster.metadata.Metadata.SETTING_READ_ONLY_SETTING;
5959
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
60+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.encodeString;
6061
import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
6162
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;
6263
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
@@ -326,9 +327,7 @@ public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathTh
326327
// Step - 3 Delete index metadata file in remote
327328
try {
328329
Files.move(
329-
segmentRepoPath.resolve(
330-
RemoteClusterStateService.encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"
331-
),
330+
segmentRepoPath.resolve(encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"),
332331
segmentRepoPath.resolve("cluster-state/")
333332
);
334333
} catch (IOException e) {
@@ -354,10 +353,7 @@ public void testRemoteStateFullRestart() throws Exception {
354353
try {
355354
Files.move(
356355
segmentRepoPath.resolve(
357-
RemoteClusterStateService.encodeString(clusterService().state().getClusterName().value())
358-
+ "/cluster-state/"
359-
+ prevClusterUUID
360-
+ "/manifest"
356+
encodeString(clusterService().state().getClusterName().value()) + "/cluster-state/" + prevClusterUUID + "/manifest"
361357
),
362358
segmentRepoPath.resolve("cluster-state/")
363359
);

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@
180180
import java.util.Set;
181181
import java.util.function.Predicate;
182182

183+
import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING;
184+
import static org.opensearch.gateway.remote.RemoteIndexMetadataManager.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING;
185+
import static org.opensearch.gateway.remote.RemoteManifestManager.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING;
186+
183187
/**
184188
* Encapsulates all valid cluster level settings.
185189
*
@@ -715,9 +719,9 @@ public void apply(Settings value, Settings current, Settings previous) {
715719
// Remote cluster state settings
716720
RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING,
717721
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
718-
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
719-
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
720-
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
722+
INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
723+
GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
724+
METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
721725
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
722726
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
723727
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,

server/src/main/java/org/opensearch/gateway/GatewayMetaState.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.opensearch.env.NodeMetadata;
6565
import org.opensearch.gateway.remote.ClusterMetadataManifest;
6666
import org.opensearch.gateway.remote.RemoteClusterStateService;
67+
import org.opensearch.gateway.remote.RemoteUploadDetails;
6768
import org.opensearch.index.recovery.RemoteStoreRestoreService;
6869
import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult;
6970
import org.opensearch.node.Node;
@@ -693,7 +694,7 @@ public void setCurrentTerm(long currentTerm) {
693694
@Override
694695
public void setLastAcceptedState(ClusterState clusterState) {
695696
try {
696-
final ClusterMetadataManifest manifest;
697+
final RemoteUploadDetails manifestDetails;
697698
if (shouldWriteFullClusterState(clusterState)) {
698699
final Optional<ClusterMetadataManifest> latestManifest = remoteClusterStateService.getLatestClusterMetadataManifest(
699700
clusterState.getClusterName().value(),
@@ -711,14 +712,21 @@ public void setLastAcceptedState(ClusterState clusterState) {
711712
clusterState.metadata().clusterUUID()
712713
);
713714
}
714-
manifest = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID);
715+
manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID);
715716
} else {
716717
assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true
717718
: "Previous manifest and previous ClusterState are not in sync";
718-
manifest = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedManifest);
719+
manifestDetails = remoteClusterStateService.writeIncrementalMetadata(
720+
lastAcceptedState,
721+
clusterState,
722+
lastAcceptedManifest
723+
);
719724
}
720-
assert verifyManifestAndClusterState(manifest, clusterState) == true : "Manifest and ClusterState are not in sync";
721-
lastAcceptedManifest = manifest;
725+
assert verifyManifestAndClusterState(
726+
Objects.requireNonNull(manifestDetails).getClusterMetadataManifest(),
727+
clusterState
728+
) == true : "Manifest and ClusterState are not in sync";
729+
lastAcceptedManifest = manifestDetails.getClusterMetadataManifest();
722730
lastAcceptedState = clusterState;
723731
} catch (Exception e) {
724732
remoteClusterStateService.writeMetadataFailed();
@@ -767,11 +775,12 @@ public void markLastAcceptedStateAsCommitted() {
767775
metadataBuilder.clusterUUIDCommitted(true);
768776
clusterState = ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build();
769777
}
770-
final ClusterMetadataManifest committedManifest = remoteClusterStateService.markLastStateAsCommitted(
778+
final RemoteUploadDetails committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted(
771779
clusterState,
772780
lastAcceptedManifest
773781
);
774-
lastAcceptedManifest = committedManifest;
782+
assert committedManifestDetails != null;
783+
lastAcceptedManifest = committedManifestDetails.getClusterMetadataManifest();
775784
lastAcceptedState = clusterState;
776785
} catch (Exception e) {
777786
handleExceptionOnWrite(e);

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java

Lines changed: 49 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,7 @@
3434
import java.util.Set;
3535
import java.util.concurrent.atomic.AtomicBoolean;
3636

37-
import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_FORMAT;
38-
import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN;
39-
import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_FORMAT;
40-
import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN;
41-
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
42-
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN;
37+
import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST;
4338

4439
/**
4540
* A Manager which provides APIs to clean up stale cluster state files and runs an async stale cleanup task
@@ -50,7 +45,7 @@ public class RemoteClusterStateCleanupManager implements Closeable {
5045

5146
public static final int RETAINED_MANIFESTS = 10;
5247
public static final int SKIP_CLEANUP_STATE_CHANGES = 10;
53-
public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT = TimeValue.timeValueMinutes(5);
48+
public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT = TimeValue.timeValueSeconds(15);
5449
public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_MINIMUM = TimeValue.MINUS_ONE;
5550

5651
/**
@@ -70,7 +65,7 @@ public class RemoteClusterStateCleanupManager implements Closeable {
7065
private BlobStoreTransferService blobStoreTransferService;
7166
private TimeValue staleFileCleanupInterval;
7267
private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);
73-
private volatile AsyncStaleFileDeletion staleFileDeletionTask;
68+
private AsyncStaleFileDeletion staleFileDeletionTask;
7469
private long lastCleanupAttemptStateVersion;
7570
private final ThreadPool threadpool;
7671
private final ClusterApplierService clusterApplierService;
@@ -150,12 +145,7 @@ void cleanUpStaleFiles() {
150145

151146
private void addStaleGlobalMetadataPath(String fileName, Set<String> filesToKeep, Set<String> staleGlobalMetadataPaths) {
152147
if (!filesToKeep.contains(fileName)) {
153-
String[] splitPath = fileName.split("/");
154-
staleGlobalMetadataPaths.add(
155-
new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
156-
splitPath[splitPath.length - 1]
157-
)
158-
);
148+
staleGlobalMetadataPaths.add(fileName);
159149
}
160150
}
161151

@@ -172,15 +162,24 @@ void deleteClusterMetadata(
172162
Set<String> staleIndexMetadataPaths = new HashSet<>();
173163
Set<String> staleGlobalMetadataPaths = new HashSet<>();
174164
activeManifestBlobMetadata.forEach(blobMetadata -> {
175-
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
176-
clusterName,
177-
clusterUUID,
178-
blobMetadata.name()
179-
);
165+
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.getRemoteManifestManager()
166+
.fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, blobMetadata.name());
180167
clusterMetadataManifest.getIndices()
181-
.forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
168+
.forEach(
169+
uploadedIndexMetadata -> filesToKeep.add(
170+
RemoteClusterStateUtils.getFormattedFileName(
171+
uploadedIndexMetadata.getUploadedFilename(),
172+
clusterMetadataManifest.getCodecVersion()
173+
)
174+
)
175+
);
182176
if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
183-
filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName());
177+
filesToKeep.add(
178+
RemoteClusterStateUtils.getFormattedFileName(
179+
clusterMetadataManifest.getGlobalMetadataFileName(),
180+
clusterMetadataManifest.getCodecVersion()
181+
)
182+
);
184183
} else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
185184
filesToKeep.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename());
186185
filesToKeep.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename());
@@ -191,14 +190,21 @@ void deleteClusterMetadata(
191190
}
192191
});
193192
staleManifestBlobMetadata.forEach(blobMetadata -> {
194-
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
195-
clusterName,
196-
clusterUUID,
197-
blobMetadata.name()
193+
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.getRemoteManifestManager()
194+
.fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, blobMetadata.name());
195+
staleManifestPaths.add(
196+
remoteClusterStateService.getRemoteManifestManager().getManifestFolderPath(clusterName, clusterUUID).buildAsString()
197+
+ blobMetadata.name()
198198
);
199-
staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name());
200199
if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
201-
addStaleGlobalMetadataPath(clusterMetadataManifest.getGlobalMetadataFileName(), filesToKeep, staleGlobalMetadataPaths);
200+
addStaleGlobalMetadataPath(
201+
RemoteClusterStateUtils.getFormattedFileName(
202+
clusterMetadataManifest.getGlobalMetadataFileName(),
203+
clusterMetadataManifest.getCodecVersion()
204+
),
205+
filesToKeep,
206+
staleGlobalMetadataPaths
207+
);
202208
} else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
203209
addStaleGlobalMetadataPath(
204210
clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename(),
@@ -225,8 +231,10 @@ void deleteClusterMetadata(
225231
clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
226232
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
227233
staleIndexMetadataPaths.add(
228-
new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString()
229-
+ INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename())
234+
RemoteClusterStateUtils.getFormattedFileName(
235+
uploadedIndexMetadata.getUploadedFilename(),
236+
clusterMetadataManifest.getCodecVersion()
237+
)
230238
);
231239
}
232240
});
@@ -237,9 +245,9 @@ void deleteClusterMetadata(
237245
return;
238246
}
239247

240-
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
241-
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
242-
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
248+
deleteStalePaths(new ArrayList<>(staleGlobalMetadataPaths));
249+
deleteStalePaths(new ArrayList<>(staleIndexMetadataPaths));
250+
deleteStalePaths(new ArrayList<>(staleManifestPaths));
243251
} catch (IllegalStateException e) {
244252
logger.error("Error while fetching Remote Cluster Metadata manifests", e);
245253
} catch (IOException e) {
@@ -267,8 +275,8 @@ void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int mani
267275
try {
268276
getBlobStoreTransferService().listAllInSortedOrderAsync(
269277
ThreadPool.Names.REMOTE_PURGE,
270-
remoteClusterStateService.getManifestFolderPath(clusterName, clusterUUID),
271-
MANIFEST_FILE_PREFIX,
278+
remoteClusterStateService.getRemoteManifestManager().getManifestFolderPath(clusterName, clusterUUID),
279+
MANIFEST,
272280
Integer.MAX_VALUE,
273281
new ActionListener<>() {
274282
@Override
@@ -312,7 +320,11 @@ void deleteStaleUUIDsClusterMetadata(String clusterName, List<String> clusterUUI
312320
clusterUUIDs.forEach(
313321
clusterUUID -> getBlobStoreTransferService().deleteAsync(
314322
ThreadPool.Names.REMOTE_PURGE,
315-
remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
323+
RemoteClusterStateUtils.getCusterMetadataBasePath(
324+
remoteClusterStateService.getBlobStoreRepository(),
325+
clusterName,
326+
clusterUUID
327+
),
316328
new ActionListener<>() {
317329
@Override
318330
public void onResponse(Void unused) {
@@ -336,12 +348,9 @@ public void onFailure(Exception e) {
336348
}
337349

338350
// package private for testing
339-
void deleteStalePaths(String clusterName, String clusterUUID, List<String> stalePaths) throws IOException {
351+
void deleteStalePaths(List<String> stalePaths) throws IOException {
340352
logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));
341-
getBlobStoreTransferService().deleteBlobs(
342-
remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
343-
stalePaths
344-
);
353+
getBlobStoreTransferService().deleteBlobs(BlobPath.cleanPath(), stalePaths);
345354
}
346355

347356
/**

0 commit comments

Comments
 (0)