Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS;
import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA;
import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA;
import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase {

private static String INDEX_NAME = "test-index";

@Before
public void setup() {
asyncUploadMockFsRepo = false;
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL, "true").build();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
String routingTableRepoName = "remote-routing-repo";
String routingTableRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
routingTableRepoName
);
String routingTableRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
routingTableRepoName
);
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName)
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.build();
}

public void testPublication() throws Exception {
// create cluster with multi node (3 master + 2 data)
prepareCluster(3, 2, INDEX_NAME, 1, 2);
ensureStableCluster(5);
ensureGreen(INDEX_NAME);
// update settings on a random node
assertAcked(
internalCluster().client()
.admin()
.cluster()
.updateSettings(
new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "10mb").build()
)
)
.actionGet()
);

RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);

Map<String, Integer> globalMetadataFiles = getMetadataFiles(repository, RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN);

assertTrue(globalMetadataFiles.containsKey(COORDINATION_METADATA));
assertTrue(globalMetadataFiles.containsKey(SETTING_METADATA));
assertTrue(globalMetadataFiles.containsKey(TRANSIENT_SETTING_METADATA));
assertTrue(globalMetadataFiles.containsKey(TEMPLATES_METADATA));
assertTrue(globalMetadataFiles.keySet().stream().anyMatch(key -> key.startsWith(CUSTOM_METADATA)));

Map<String, Integer> ephemeralMetadataFiles = getMetadataFiles(
repository,
RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN
);

assertTrue(ephemeralMetadataFiles.containsKey(CLUSTER_BLOCKS));
assertTrue(ephemeralMetadataFiles.containsKey(DISCOVERY_NODES));

Map<String, Integer> manifestFiles = getMetadataFiles(repository, RemoteClusterMetadataManifest.MANIFEST);
assertTrue(manifestFiles.containsKey(RemoteClusterMetadataManifest.MANIFEST));

// get settings from each node and verify that it is updated
Settings settings = clusterService().getSettings();
logger.info("settings : {}", settings);
for (Client client : clients()) {
ClusterStateResponse response = client.admin().cluster().prepareState().clear().setMetadata(true).get();
String refreshSetting = response.getState()
.metadata()
.settings()
.get(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey());
assertEquals("10mb", refreshSetting);
}
}

private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException {
BlobPath metadataPath = repository.basePath()
.add(
Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
)
.add(RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN)
.add(getClusterState().metadata().clusterUUID())
.add(subDirectory);
return repository.blobStore().blobContainer(metadataPath).listBlobs().keySet().stream().map(fileName -> {
logger.info(fileName);
return fileName.split(DELIMITER)[0];
}).collect(Collectors.toMap(Function.identity(), key -> 1, Integer::sum));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ UploadedMetadataResults writeMetadataInParallel(
new RemoteDiscoveryNodes(
clusterState.nodes(),
clusterState.version(),
clusterState.stateUUID(),
clusterState.metadata().clusterUUID(),
blobStoreRepository.getCompressor()
),
listener
Expand Down