Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776))
- Add dynamic action retry timeout setting ([#14022](https://github.com/opensearch-project/OpenSearch/issues/14022))
- Add capability to disable source recovery_source for an index ([#13590](https://github.com/opensearch-project/OpenSearch/pull/13590))
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
- Add upload flow for writing routing table to remote store ([#13870](https://github.com/opensearch-project/OpenSearch/pull/13870))
- Add dynamic action retry timeout setting ([#14022](https://github.com/opensearch-project/OpenSearch/issues/14022))
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,9 +738,7 @@ public boolean equals(Object o) {
IndexShardRoutingTable that = (IndexShardRoutingTable) o;

if (!shardId.equals(that.shardId)) return false;
if (!shards.equals(that.shards)) return false;

return true;
return shards.size() == that.shards.size() && shards.containsAll(that.shards) && that.shards.containsAll(shards);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.remote;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

/**
* A Service which provides APIs to upload and download routing table from remote store.
*
* @opensearch.internal
*/
public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService {

/**
* This setting is used to set the remote routing table store blob store path type strategy.
*/
public static final Setting<RemoteStoreEnums.PathType> REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING = new Setting<>(
"cluster.remote_store.routing_table.path_type",
RemoteStoreEnums.PathType.HASHED_PREFIX.toString(),
RemoteStoreEnums.PathType::parseString,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* This setting is used to set the remote routing table store blob store path hash algorithm strategy.
* This setting will come to effect if the {@link #REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING}
* is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}.
*/
public static final Setting<RemoteStoreEnums.PathHashAlgorithm> REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING = new Setting<>(
"cluster.remote_store.routing_table.path_hash_algo",
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64.toString(),
RemoteStoreEnums.PathHashAlgorithm::parseString,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
public static final String DELIMITER = "__";
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";

private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private BlobStoreRepository blobStoreRepository;
private RemoteStoreEnums.PathType pathType;
private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo;

public InternalRemoteRoutingTableService(
Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings
) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.pathType = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING);
this.pathHashAlgo = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING);
clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, this::setPathTypeSetting);
clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting);
}

private void setPathTypeSetting(RemoteStoreEnums.PathType pathType) {
this.pathType = pathType;
}

private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashAlgo) {
this.pathHashAlgo = pathHashAlgo;
}

public List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable) {
return new ArrayList<>(routingTable.indicesRouting().values());
}

/**
* Returns diff between the two routing tables, which includes upserts and deletes.
* @param before previous routing table
* @param after current routing table
* @return diff of the previous and current routing table
*/
public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapDiff(
RoutingTable before,
RoutingTable after
) {
return DiffableUtils.diff(
before.getIndicesRouting(),
after.getIndicesRouting(),
DiffableUtils.getStringKeySerializer(),
CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER
);
}

/**
* Create async action for writing one {@code IndexRoutingTable} to remote store
* @param clusterState current cluster state
* @param indexRouting indexRoutingTable to write to remote store
* @param latchedActionListener listener for handling async action response
* @param clusterBasePath base path for remote file
* @return returns runnable async action
*/
public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
ClusterState clusterState,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
BlobPath clusterBasePath
) {

BlobPath indexRoutingPath = clusterBasePath.add(INDEX_ROUTING_PATH_TOKEN);
BlobPath path = pathType.path(
RemoteStorePathStrategy.BasePathInput.builder().basePath(indexRoutingPath).indexUUID(indexRouting.getIndex().getUUID()).build(),
pathHashAlgo
);
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path);

final String fileName = getIndexRoutingFileName(clusterState.term(), clusterState.version());

ActionListener<Void> completionListener = ActionListener.wrap(
resp -> latchedActionListener.onResponse(
new ClusterMetadataManifest.UploadedIndexMetadata(
indexRouting.getIndex().getName(),
indexRouting.getIndex().getUUID(),
path.buildAsString() + fileName,
INDEX_ROUTING_METADATA_PREFIX
)
),
ex -> latchedActionListener.onFailure(
new RemoteClusterStateService.RemoteStateTransferException(
"Exception in writing index to remote store: " + indexRouting.getIndex().toString(),
ex
)
)
);

return () -> uploadIndex(indexRouting, fileName, blobContainer, completionListener);
}

/**
* Combines IndicesRoutingMetadata from previous manifest and current uploaded indices, removes deleted indices.
* @param previousManifest previous manifest, used to get all existing indices routing paths
* @param indicesRoutingUploaded current uploaded indices routings
* @param indicesRoutingToDelete indices to delete
* @return combined list of metadata
*/
public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting(
ClusterMetadataManifest previousManifest,
List<ClusterMetadataManifest.UploadedIndexMetadata> indicesRoutingUploaded,
List<String> indicesRoutingToDelete
) {
final Map<String, ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndicesRouting = previousManifest.getIndicesRouting()
.stream()
.collect(Collectors.toMap(ClusterMetadataManifest.UploadedIndexMetadata::getIndexName, Function.identity()));

indicesRoutingUploaded.forEach(
uploadedIndexRouting -> allUploadedIndicesRouting.put(uploadedIndexRouting.getIndexName(), uploadedIndexRouting)
);
indicesRoutingToDelete.forEach(allUploadedIndicesRouting::remove);

return new ArrayList<>(allUploadedIndicesRouting.values());
}

private void uploadIndex(
IndexRoutingTable indexRouting,
String fileName,
BlobContainer blobContainer,
ActionListener<Void> completionListener
) {
RemoteIndexRoutingTable indexRoutingInput = new RemoteIndexRoutingTable(indexRouting);
BytesReference bytesInput = null;
try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
indexRoutingInput.writeTo(streamOutput);
bytesInput = streamOutput.bytes();
} catch (IOException e) {
logger.error("Failed to serialize IndexRoutingTable for [{}]: [{}]", indexRouting, e);
completionListener.onFailure(e);
return;
}

if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) {
try {
blobContainer.writeBlob(fileName, bytesInput.streamInput(), bytesInput.length(), true);
completionListener.onResponse(null);
} catch (IOException e) {
logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e);
completionListener.onFailure(e);
}
return;
}

try (IndexInput input = new ByteArrayIndexInput("indexrouting", BytesReference.toBytes(bytesInput))) {
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
fileName,
fileName,
input.length(),
true,
WritePriority.URGENT,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
null,
false
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(
remoteTransferContainer.createWriteContext(),
completionListener
);
} catch (IOException e) {
logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e);
completionListener.onFailure(e);
}
} catch (IOException e) {
logger.error(
"Failed to create transfer object for IndexRoutingTable for remote store upload for indexRouting [{}]: [{}]",
indexRouting,
e
);
completionListener.onFailure(e);
}
}

private String getIndexRoutingFileName(long term, long version) {
return String.join(
DELIMITER,
INDEX_ROUTING_FILE_PREFIX,
RemoteStoreUtils.invertLong(term),
RemoteStoreUtils.invertLong(version),
RemoteStoreUtils.invertLong(System.currentTimeMillis())
);
}

@Override
protected void doClose() throws IOException {
if (blobStoreRepository != null) {
IOUtils.close(blobStoreRepository);
}
}

@Override
protected void doStart() {
assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
assert remoteStoreRepo != null : "Remote routing table repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
}

@Override
protected void doStop() {}

}
Loading