Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions server/src/main/java/org/opensearch/cluster/DiffableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,18 @@ public void writeDiff(Diff<V> value, StreamOutput out) throws IOException {
* @opensearch.internal
*/
public abstract static class NonDiffableValueSerializer<K, V> implements ValueSerializer<K, V> {
private static final NonDiffableValueSerializer ABSTRACT_INSTANCE = new NonDiffableValueSerializer<>() {
@Override
public void write(Object value, StreamOutput out) {
throw new UnsupportedOperationException();
}

@Override
public Object read(StreamInput in, Object key) {
throw new UnsupportedOperationException();
}
};

@Override
public boolean supportsDiffableValues() {
return false;
Expand All @@ -513,6 +525,10 @@ public void writeDiff(Diff<V> value, StreamOutput out) throws IOException {
public Diff<V> readDiff(StreamInput in, K key) throws IOException {
throw new UnsupportedOperationException();
}

public static <K, V> NonDiffableValueSerializer<K, V> getAbstractInstance() {
return ABSTRACT_INSTANCE;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static DiffableStringMap readFrom(StreamInput in) throws IOException {
return map.isEmpty() ? EMPTY : new DiffableStringMap(map);
}

DiffableStringMap(final Map<String, String> map) {
public DiffableStringMap(final Map<String, String> map) {
this.innerMap = Collections.unmodifiableMap(map);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,10 @@ public static boolean isSettingsMetadataEqual(Metadata metadata1, Metadata metad
return metadata1.persistentSettings.equals(metadata2.persistentSettings);
}

public static boolean isTransientSettingsMetadataEqual(Metadata metadata1, Metadata metadata2) {
return metadata1.transientSettings.equals(metadata2.transientSettings);
}

public static boolean isTemplatesMetadataEqual(Metadata metadata1, Metadata metadata2) {
return metadata1.templates.equals(metadata2.templates);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Interface for RemoteRoutingTableService. Exposes methods to orchestrate upload and download of routing table from remote store.
*/
public interface RemoteRoutingTableService extends LifecycleComponent {
static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
public static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
@Override
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterState.Custom;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore;
import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms;
import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes;
import org.opensearch.gateway.remote.model.RemoteReadResult;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore}
*
* @opensearch.internal
*/
public class RemoteClusterStateAttributesManager {
public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute";
public static final String DISCOVERY_NODES = "nodes";
public static final String CLUSTER_BLOCKS = "blocks";
public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1;
private final RemoteClusterStateBlobStore<ClusterBlocks, RemoteClusterBlocks> clusterBlocksBlobStore;
private final RemoteClusterStateBlobStore<DiscoveryNodes, RemoteDiscoveryNodes> discoveryNodesBlobStore;
private final RemoteClusterStateBlobStore<Custom, RemoteClusterStateCustoms> customsBlobStore;
private final Compressor compressor;
private final NamedXContentRegistry namedXContentRegistry;
private final NamedWriteableRegistry namedWriteableRegistry;

RemoteClusterStateAttributesManager(
RemoteClusterStateBlobStore<ClusterBlocks, RemoteClusterBlocks> clusterBlocksBlobStore,
RemoteClusterStateBlobStore<DiscoveryNodes, RemoteDiscoveryNodes> discoveryNodesBlobStore,
RemoteClusterStateBlobStore<Custom, RemoteClusterStateCustoms> customsBlobStore,
Compressor compressor,
NamedXContentRegistry namedXContentRegistry,
NamedWriteableRegistry namedWriteableRegistry
) {
this.clusterBlocksBlobStore = clusterBlocksBlobStore;
this.discoveryNodesBlobStore = discoveryNodesBlobStore;
this.customsBlobStore = customsBlobStore;
this.compressor = compressor;
this.namedXContentRegistry = namedXContentRegistry;
this.namedWriteableRegistry = namedWriteableRegistry;
}

/**
* Allows async upload of Cluster State Attribute components to remote
*/
CheckedRunnable<IOException> getAsyncMetadataWriteAction(
String component,
AbstractRemoteWritableBlobEntity blobEntity,
RemoteClusterStateBlobStore remoteEntityStore,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
return () -> remoteEntityStore.writeAsync(blobEntity, getActionListener(component, blobEntity, latchedActionListener));
}

private ActionListener<Void> getActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
return ActionListener.wrap(
resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()),
ex -> latchedActionListener.onFailure(new RemoteStateTransferException(component, remoteObject, ex))
);
}

public CheckedRunnable<IOException> getAsyncMetadataReadAction(
String component,
AbstractRemoteWritableBlobEntity blobEntity,
RemoteClusterStateBlobStore remoteEntityStore,
LatchedActionListener<RemoteReadResult> listener
) {
final ActionListener actionListener = ActionListener.wrap(
response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)),
listener::onFailure
);
return () -> remoteEntityStore.readAsync(blobEntity, actionListener);
}

public Map<String, ClusterState.Custom> getUpdatedCustoms(ClusterState clusterState, ClusterState previousClusterState) {
Map<String, ClusterState.Custom> updatedCustoms = new HashMap<>();
Set<String> currentCustoms = new HashSet<>(clusterState.customs().keySet());
for (Map.Entry<String, ClusterState.Custom> entry : previousClusterState.customs().entrySet()) {
if (currentCustoms.contains(entry.getKey()) && !entry.getValue().equals(clusterState.customs().get(entry.getKey()))) {
updatedCustoms.put(entry.getKey(), clusterState.customs().get(entry.getKey()));
}
currentCustoms.remove(entry.getKey());
}
for (String custom : currentCustoms) {
updatedCustoms.put(custom, clusterState.customs().get(custom));
}
return updatedCustoms;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

import static java.util.Objects.requireNonNull;
import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -171,12 +172,6 @@ public class RemoteClusterStateService implements Closeable {
/**
* Manifest format compatible with codec v2, where global metadata file is replaced with multiple metadata attribute files
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V2 =
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV2);

/**
* Manifest format compatible with codec v3, where global metadata file is replaced with multiple metadata attribute files
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>(
"cluster-metadata-manifest",
METADATA_MANIFEST_NAME_FORMAT,
Expand Down Expand Up @@ -226,7 +221,6 @@ public class RemoteClusterStateService implements Closeable {
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1;
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3;
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 2;

// ToXContent Params with gateway mode.
Expand Down Expand Up @@ -836,26 +830,25 @@ private RemoteClusterStateManifestInfo uploadManifest(
committed,
MANIFEST_CURRENT_CODEC_VERSION
);
final ClusterMetadataManifest manifest = new ClusterMetadataManifest(
clusterState.term(),
clusterState.getVersion(),
clusterState.metadata().clusterUUID(),
clusterState.stateUUID(),
Version.CURRENT,
nodeId,
committed,
MANIFEST_CURRENT_CODEC_VERSION,
null,
uploadedIndexMetadata,
previousClusterUUID,
clusterState.metadata().clusterUUIDCommitted(),
uploadedCoordinationMetadata,
uploadedSettingsMetadata,
uploadedTemplatesMetadata,
uploadedCustomMetadataMap,
clusterState.routingTable().version(),
uploadedIndicesRouting
);
final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder()
.clusterTerm(clusterState.term())
.stateVersion(clusterState.getVersion())
.clusterUUID(clusterState.metadata().clusterUUID())
.stateUUID(clusterState.stateUUID())
.opensearchVersion(Version.CURRENT)
.nodeId(nodeId)
.committed(committed)
.codecVersion(MANIFEST_CURRENT_CODEC_VERSION)
.indices(uploadedIndexMetadata)
.previousClusterUUID(previousClusterUUID)
.clusterUUIDCommitted(clusterState.metadata().clusterUUIDCommitted())
.coordinationMetadata(uploadedCoordinationMetadata)
.settingMetadata(uploadedSettingsMetadata)
.templatesMetadata(uploadedTemplatesMetadata)
.customMetadataMap(uploadedCustomMetadataMap)
.routingTableVersion(clusterState.routingTable().version())
.indicesRouting(uploadedIndicesRouting)
.build();
writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName);
return new RemoteClusterStateManifestInfo(manifest, manifestFileName);
}
Expand Down Expand Up @@ -1540,8 +1533,6 @@ private ChecksumBlobStoreFormat<ClusterMetadataManifest> getClusterMetadataManif
long codecVersion = getManifestCodecVersion(fileName);
if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) {
return CLUSTER_METADATA_MANIFEST_FORMAT;
} else if (codecVersion == ClusterMetadataManifest.CODEC_V2) {
return CLUSTER_METADATA_MANIFEST_FORMAT_V2;
} else if (codecVersion == ClusterMetadataManifest.CODEC_V1) {
return CLUSTER_METADATA_MANIFEST_FORMAT_V1;
} else if (codecVersion == ClusterMetadataManifest.CODEC_V0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,17 @@
package org.opensearch.gateway.remote;

import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
Expand All @@ -21,19 +28,99 @@
public class RemoteClusterStateUtils {

public static final String DELIMITER = "__";
public static final String PATH_DELIMITER = "/";
public static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata";
public static final String METADATA_NAME_FORMAT = "%s.dat";
public static final String METADATA_NAME_PLAIN_FORMAT = "%s";
public static final String CLUSTER_STATE_PATH_TOKEN = "cluster-state";
public static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata";
public static final String CLUSTER_STATE_EPHEMERAL_PATH_TOKEN = "ephemeral";
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1;
public static final String CUSTOM_DELIMITER = "--";
public static final String PATH_DELIMITER = "/";
public static final String METADATA_NAME_PLAIN_FORMAT = "%s";

// ToXContent Params with gateway mode.
// We are using gateway context mode to persist all custom metadata.
public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(
Map.of(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY)
);

public static BlobPath getCusterMetadataBasePath(BlobStoreRepository blobStoreRepository, String clusterName, String clusterUUID) {
return blobStoreRepository.basePath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID);
}

public static String encodeString(String content) {
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));
}

public static String getFormattedFileName(String fileName, int codecVersion) {
if (codecVersion < ClusterMetadataManifest.CODEC_V2) {
return String.format(Locale.ROOT, METADATA_NAME_FORMAT, fileName);
}
return fileName;
}

static BlobContainer clusterUUIDContainer(BlobStoreRepository blobStoreRepository, String clusterName) {
return blobStoreRepository.blobStore()
.blobContainer(
blobStoreRepository.basePath()
.add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8)))
.add(CLUSTER_STATE_PATH_TOKEN)
);
}

/**
* Container class to keep metadata of all uploaded attributes
*/
public static class UploadedMetadataResults {
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadata;
Map<String, ClusterMetadataManifest.UploadedMetadataAttribute> uploadedCustomMetadataMap;
Map<String, ClusterMetadataManifest.UploadedMetadataAttribute> uploadedClusterStateCustomMetadataMap;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedCoordinationMetadata;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedSettingsMetadata;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedTransientSettingsMetadata;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedTemplatesMetadata;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedDiscoveryNodes;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks;
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndicesRoutingMetadata;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedHashesOfConsistentSettings;

public UploadedMetadataResults(
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadata,
Map<String, ClusterMetadataManifest.UploadedMetadataAttribute> uploadedCustomMetadataMap,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedCoordinationMetadata,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedSettingsMetadata,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedTransientSettingsMetadata,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedTemplatesMetadata,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedDiscoveryNodes,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks,
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndicesRoutingMetadata,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedHashesOfConsistentSettings,
Map<String, ClusterMetadataManifest.UploadedMetadataAttribute> uploadedClusterStateCustomMap
) {
this.uploadedIndexMetadata = uploadedIndexMetadata;
this.uploadedCustomMetadataMap = uploadedCustomMetadataMap;
this.uploadedCoordinationMetadata = uploadedCoordinationMetadata;
this.uploadedSettingsMetadata = uploadedSettingsMetadata;
this.uploadedTransientSettingsMetadata = uploadedTransientSettingsMetadata;
this.uploadedTemplatesMetadata = uploadedTemplatesMetadata;
this.uploadedDiscoveryNodes = uploadedDiscoveryNodes;
this.uploadedClusterBlocks = uploadedClusterBlocks;
this.uploadedIndicesRoutingMetadata = uploadedIndicesRoutingMetadata;
this.uploadedHashesOfConsistentSettings = uploadedHashesOfConsistentSettings;
this.uploadedClusterStateCustomMetadataMap = uploadedClusterStateCustomMap;
}

public UploadedMetadataResults() {
this.uploadedIndexMetadata = new ArrayList<>();
this.uploadedCustomMetadataMap = new HashMap<>();
this.uploadedCoordinationMetadata = null;
this.uploadedSettingsMetadata = null;
this.uploadedTransientSettingsMetadata = null;
this.uploadedTemplatesMetadata = null;
this.uploadedDiscoveryNodes = null;
this.uploadedClusterBlocks = null;
this.uploadedIndicesRoutingMetadata = new ArrayList<>();
this.uploadedHashesOfConsistentSettings = null;
this.uploadedClusterStateCustomMetadataMap = new HashMap<>();
}
}
}
Loading