Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA;
import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA;
import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static org.opensearch.cluster.metadata.Metadata.CLUSTER_READ_ONLY_BLOCK;
import static org.opensearch.cluster.metadata.Metadata.SETTING_READ_ONLY_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.encodeString;
import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -326,9 +327,7 @@ public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathTh
// Step - 3 Delete index metadata file in remote
try {
Files.move(
segmentRepoPath.resolve(
RemoteClusterStateService.encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"
),
segmentRepoPath.resolve(encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"),
segmentRepoPath.resolve("cluster-state/")
);
} catch (IOException e) {
Expand All @@ -354,10 +353,7 @@ public void testRemoteStateFullRestart() throws Exception {
try {
Files.move(
segmentRepoPath.resolve(
RemoteClusterStateService.encodeString(clusterService().state().getClusterName().value())
+ "/cluster-state/"
+ prevClusterUUID
+ "/manifest"
encodeString(clusterService().state().getClusterName().value()) + "/cluster-state/" + prevClusterUUID + "/manifest"
),
segmentRepoPath.resolve("cluster-state/")
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStorePathStrategy;
Expand All @@ -52,6 +52,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

/**
Expand Down Expand Up @@ -87,7 +88,6 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen

public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
public static final String DELIMITER = "__";
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";

private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class);
Expand Down Expand Up @@ -175,10 +175,7 @@ public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
)
),
ex -> latchedActionListener.onFailure(
new RemoteClusterStateService.RemoteStateTransferException(
"Exception in writing index to remote store: " + indexRouting.getIndex().toString(),
ex
)
new RemoteStateTransferException("Exception in writing index to remote store: " + indexRouting.getIndex().toString(), ex)
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public AbstractRemoteWritableBlobEntity(

public abstract BlobPathParameters getBlobPathParameters();

public abstract String getType();

public String getFullBlobName() {
return blobName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@
import java.util.Set;
import java.util.function.Predicate;

import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING;
import static org.opensearch.gateway.remote.RemoteIndexMetadataManager.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING;
import static org.opensearch.gateway.remote.RemoteManifestManager.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING;

/**
* Encapsulates all valid cluster level settings.
*
Expand Down Expand Up @@ -717,9 +721,9 @@ public void apply(Settings value, Settings current, Settings previous) {
// Remote cluster state settings
RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1158,8 +1158,7 @@ public String getComponent() {
}

public String getUploadedFilename() {
String[] splitPath = uploadedFilename.split("/");
return splitPath[splitPath.length - 1];
return uploadedFilename;
}

public String getIndexName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_FORMAT;
import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_FORMAT;
import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN;
import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST;
import static org.opensearch.gateway.remote.model.RemoteGlobalMetadata.GLOBAL_METADATA_FORMAT;

/**
* A Manager which provides APIs to clean up stale cluster state files and runs an async stale cleanup task
Expand Down Expand Up @@ -74,6 +71,7 @@ public class RemoteClusterStateCleanupManager implements Closeable {
private long lastCleanupAttemptStateVersion;
private final ThreadPool threadpool;
private final ClusterApplierService clusterApplierService;
private RemoteManifestManager remoteManifestManager;

public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) {
this.remoteClusterStateService = remoteClusterStateService;
Expand All @@ -89,6 +87,7 @@ public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterS

void start() {
staleFileDeletionTask = new AsyncStaleFileDeletion(this);
remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
}

@Override
Expand Down Expand Up @@ -172,13 +171,17 @@ void deleteClusterMetadata(
Set<String> staleIndexMetadataPaths = new HashSet<>();
Set<String> staleGlobalMetadataPaths = new HashSet<>();
activeManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest(
clusterName,
clusterUUID,
blobMetadata.name()
);
clusterMetadataManifest.getIndices()
.forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
.forEach(
uploadedIndexMetadata -> filesToKeep.add(
RemoteClusterStateUtils.getFormattedIndexFileName(uploadedIndexMetadata.getUploadedFilename())
)
);
if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName());
} else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
Expand All @@ -191,43 +194,38 @@ void deleteClusterMetadata(
}
});
staleManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest(
clusterName,
clusterUUID,
blobMetadata.name()
);
staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name());
staleManifestPaths.add(
remoteManifestManager.getManifestFolderPath(clusterName, clusterUUID).buildAsString() + blobMetadata.name()
);
if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
addStaleGlobalMetadataPath(clusterMetadataManifest.getGlobalMetadataFileName(), filesToKeep, staleGlobalMetadataPaths);
} else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
addStaleGlobalMetadataPath(
clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename(),
filesToKeep,
staleGlobalMetadataPaths
);
addStaleGlobalMetadataPath(
clusterMetadataManifest.getSettingsMetadata().getUploadedFilename(),
filesToKeep,
staleGlobalMetadataPaths
);
addStaleGlobalMetadataPath(
clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename(),
filesToKeep,
staleGlobalMetadataPaths
);
if (filesToKeep.contains(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()) == false) {
staleGlobalMetadataPaths.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename());
}
if (filesToKeep.contains(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()) == false) {
staleGlobalMetadataPaths.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename());
}
if (filesToKeep.contains(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()) == false) {
staleGlobalMetadataPaths.add(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename());
}
clusterMetadataManifest.getCustomMetadataMap()
.values()
.forEach(
attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths)
);
.stream()
.map(ClusterMetadataManifest.UploadedMetadataAttribute::getUploadedFilename)
.filter(file -> filesToKeep.contains(file) == false)
.forEach(staleGlobalMetadataPaths::add);
}

clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
staleIndexMetadataPaths.add(
new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString()
+ INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename())
);
String fileName = RemoteClusterStateUtils.getFormattedIndexFileName(uploadedIndexMetadata.getUploadedFilename());
if (filesToKeep.contains(fileName) == false) {
staleIndexMetadataPaths.add(fileName);
}
});
});
Expand All @@ -237,9 +235,9 @@ void deleteClusterMetadata(
return;
}

deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
deleteStalePaths(new ArrayList<>(staleGlobalMetadataPaths));
deleteStalePaths(new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(new ArrayList<>(staleManifestPaths));
} catch (IllegalStateException e) {
logger.error("Error while fetching Remote Cluster Metadata manifests", e);
} catch (IOException e) {
Expand Down Expand Up @@ -267,8 +265,8 @@ void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int mani
try {
getBlobStoreTransferService().listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteClusterStateService.getManifestFolderPath(clusterName, clusterUUID),
MANIFEST_FILE_PREFIX,
remoteManifestManager.getManifestFolderPath(clusterName, clusterUUID),
MANIFEST,
Integer.MAX_VALUE,
new ActionListener<>() {
@Override
Expand Down Expand Up @@ -312,7 +310,11 @@ void deleteStaleUUIDsClusterMetadata(String clusterName, List<String> clusterUUI
clusterUUIDs.forEach(
clusterUUID -> getBlobStoreTransferService().deleteAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
RemoteClusterStateUtils.getClusterMetadataBasePath(
remoteClusterStateService.getBlobStoreRepository(),
clusterName,
clusterUUID
),
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
Expand All @@ -336,12 +338,9 @@ public void onFailure(Exception e) {
}

// package private for testing
void deleteStalePaths(String clusterName, String clusterUUID, List<String> stalePaths) throws IOException {
void deleteStalePaths(List<String> stalePaths) throws IOException {
logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));
getBlobStoreTransferService().deleteBlobs(
remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
stalePaths
);
getBlobStoreTransferService().deleteBlobs(BlobPath.cleanPath(), stalePaths);
}

/**
Expand Down
Loading