Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.stream.Collectors;

import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -91,6 +92,16 @@ public Settings buildRemoteStoreNodeAttributes(Path repoLocation, double ioFailu
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
TRANSLOG_REPOSITORY_NAME
);
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
REPOSITORY_NAME
);
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
REPOSITORY_NAME
);

return Settings.builder()
.put("node.attr." + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, REPOSITORY_NAME)
Expand All @@ -104,6 +115,9 @@ public Settings buildRemoteStoreNodeAttributes(Path repoLocation, double ioFailu
.put("node.attr." + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, TRANSLOG_REPOSITORY_NAME)
.put(translogRepoTypeAttributeKey, "mock")
.put(translogRepoSettingsAttributeKeyPrefix + "location", repoLocation)
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, REPOSITORY_NAME)
.put(stateRepoTypeAttributeKey, "mock")
.put(stateRepoSettingsAttributeKeyPrefix + "location", repoLocation)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.stream.Collectors;

import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -222,14 +223,27 @@ public static Settings buildRemoteStoreNodeAttributes(
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
translogRepoName
);
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
segmentRepoName
);
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
segmentRepoName
);

Settings.Builder settings = Settings.builder()
.put("node.attr." + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
.put(segmentRepoTypeAttributeKey, FsRepository.TYPE)
.put(segmentRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put("node.attr." + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName)
.put(translogRepoTypeAttributeKey, FsRepository.TYPE)
.put(translogRepoSettingsAttributeKeyPrefix + "location", translogRepoPath);
.put(translogRepoSettingsAttributeKeyPrefix + "location", translogRepoPath)
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
.put(stateRepoTypeAttributeKey, FsRepository.TYPE)
.put(stateRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath);

if (withRateLimiterAttributes) {
settings.put(segmentRepoSettingsAttributeKeyPrefix + "compress", randomBoolean())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.ExecutionException;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING;
import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.opensearch.indices.ShardLimitValidator.SETTING_MAX_SHARDS_PER_CLUSTER_KEY;

Expand All @@ -32,11 +31,7 @@ public class RemoteStoreClusterStateRestoreIT extends BaseRemoteStoreRestoreIT {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put(REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.getKey(), REPOSITORY_NAME)
.build();
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
}

private void addNewNodes(int dataNodeCount, int clusterManagerNodeCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import java.util.Set;

import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* The core class of the cluster state coordination algorithm, directly implementing the
Expand Down Expand Up @@ -101,7 +101,7 @@ public CoordinationState(
.getLastAcceptedState()
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings);
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
}

public long getCurrentTerm() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,6 @@ public void apply(Settings value, Settings current, Settings previous) {

// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
import java.util.function.UnaryOperator;

import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
Expand Down Expand Up @@ -165,7 +165,7 @@ public void start(

if (DiscoveryNode.isClusterManagerNode(settings)) {
persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
if (REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) {
if (isRemoteStoreClusterStateEnabled(settings)) {
remotePersistedState = new RemotePersistedState(remoteClusterStateService);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -52,6 +54,7 @@
import java.util.stream.Collectors;

import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* A Service which provides APIs to upload and download cluster metadata from remote store.
Expand Down Expand Up @@ -80,23 +83,13 @@ public class RemoteClusterStateService implements Closeable {
/**
* Used to specify if cluster state metadata should be published to remote store
*/
// TODO The remote state enabled and repository settings should be read from node attributes.
// Dependent on https://github.com/opensearch-project/OpenSearch/pull/9105/
public static final Setting<Boolean> REMOTE_CLUSTER_STATE_ENABLED_SETTING = Setting.boolSetting(
"cluster.remote_store.state.enabled",
false,
Property.NodeScope,
Property.Final
);
/**
* Used to specify default repo to use for cluster state metadata upload
*/
public static final Setting<String> REMOTE_CLUSTER_STATE_REPOSITORY_SETTING = Setting.simpleString(
"cluster.remote_store.state.repository",
"",
Property.NodeScope,
Property.Final
);

private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);

public static final String DELIMITER = "__";
Expand All @@ -115,7 +108,7 @@ public RemoteClusterStateService(
ClusterSettings clusterSettings,
LongSupplier relativeTimeNanosSupplier
) {
assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled";
assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled";
this.nodeId = nodeId;
this.repositoriesService = repositoriesService;
this.settings = settings;
Expand Down Expand Up @@ -384,8 +377,10 @@ void ensureRepositorySet() {
if (blobStoreRepository != null) {
return;
}
assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled";
final String remoteStoreRepo = REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.get(settings);
assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
assert remoteStoreRepo != null : "Remote Cluster State repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@
import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath;
import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used
Expand Down Expand Up @@ -681,7 +682,7 @@ protected Node(
threadPool::relativeTimeInMillis
);
final RemoteClusterStateService remoteClusterStateService;
if (RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) {
if (isRemoteStoreClusterStateEnabled(settings)) {
remoteClusterStateService = new RemoteClusterStateService(
nodeEnvironment.nodeId(),
repositoriesServiceReference::get,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.Node;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

Expand All @@ -36,6 +37,7 @@ public class RemoteStoreNodeAttribute {
public static final String REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX = "remote_store";
public static final String REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.segment.repository";
public static final String REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.translog.repository";
public static final String REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.state.repository";
public static final String REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s.type";
public static final String REMOTE_STORE_REPOSITORY_CRYPTO_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s."
+ CryptoMetadata.CRYPTO_METADATA_KEY;
Expand Down Expand Up @@ -134,6 +136,7 @@ private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) {

repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));

for (String repositoryName : repositoryNames) {
repositoryMetadataList.add(buildRepositoryMetadata(node, repositoryName));
Expand All @@ -146,6 +149,10 @@ public static boolean isRemoteStoreAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty() == false;
}

public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) && isRemoteStoreAttributePresent(settings);
}

public RepositoriesMetadata getRepositoriesMetadata() {
return this.repositoriesMetadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@
import org.opensearch.gateway.GatewayMetaState.RemotePersistedState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.test.EqualsHashCodeTestUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;
import java.util.Locale;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -62,6 +64,9 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -937,9 +942,26 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep
final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, ps1);
persistedStateRegistry.addPersistedState(PersistedStateType.REMOTE, new RemotePersistedState(remoteClusterStateService));
final Settings settings = Settings.builder()

String randomRepoName = "randomRepoName";
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
randomRepoName
);
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
randomRepoName
);

Settings settings = Settings.builder()
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, randomRepoName)
.put(stateRepoTypeAttributeKey, FsRepository.TYPE)
.put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath")
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.build();

final CoordinationState coordinationState = createCoordinationState(persistedStateRegistry, node1, settings);
coordinationState.handlePrePublish(clusterState);
Mockito.verifyNoInteractions(remoteClusterStateService);
Expand Down
Loading