Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.snapshots.RestoreService;
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -33,14 +33,14 @@
public final class TransportRestoreRemoteStoreAction extends TransportClusterManagerNodeAction<
RestoreRemoteStoreRequest,
RestoreRemoteStoreResponse> {
private final RestoreService restoreService;
private final RemoteStoreRestoreService restoreService;

@Inject
public TransportRestoreRemoteStoreAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
RestoreService restoreService,
RemoteStoreRestoreService restoreService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
Expand Down Expand Up @@ -84,20 +84,17 @@ protected void clusterManagerOperation(
final ClusterState state,
final ActionListener<RestoreRemoteStoreResponse> listener
) {
restoreService.restoreFromRemoteStore(
request,
ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(
clusterService,
restoreCompletionResponse,
delegatedListener,
RestoreRemoteStoreResponse::new
);
} else {
delegatedListener.onResponse(new RestoreRemoteStoreResponse(restoreCompletionResponse.getRestoreInfo()));
}
})
);
restoreService.restore(request, ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(
clusterService,
restoreCompletionResponse,
delegatedListener,
RestoreRemoteStoreResponse::new
);
} else {
delegatedListener.onResponse(new RestoreRemoteStoreResponse(restoreCompletionResponse.getRestoreInfo()));
}
}));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.recovery;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.repositories.IndexId;
import org.opensearch.snapshots.RestoreInfo;
import org.opensearch.snapshots.RestoreService;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;

/**
* Service responsible for restoring index data from remote store
*
* @opensearch.internal
*/
public class RemoteStoreRestoreService {
private static final Logger logger = LogManager.getLogger(RemoteStoreRestoreService.class);

private final ClusterService clusterService;

private final AllocationService allocationService;

public RemoteStoreRestoreService(ClusterService clusterService, AllocationService allocationService) {
this.clusterService = clusterService;
this.allocationService = allocationService;
}

public void restore(RestoreRemoteStoreRequest request, final ActionListener<RestoreService.RestoreCompletionResponse> listener) {
clusterService.submitStateUpdateTask("restore[remote_store]", new ClusterStateUpdateTask() {
final String restoreUUID = UUIDs.randomBase64UUID();
RestoreInfo restoreInfo = null;

@Override
public ClusterState execute(ClusterState currentState) {
// Updating cluster state
ClusterState.Builder builder = ClusterState.builder(currentState);
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());

List<String> indicesToBeRestored = new ArrayList<>();
int totalShards = 0;
for (String index : request.indices()) {
IndexMetadata currentIndexMetadata = currentState.metadata().index(index);
if (currentIndexMetadata == null) {
// ToDo: Handle index metadata does not exist case. (GitHub #3457)
logger.warn("Remote store restore is not supported for non-existent index. Skipping: {}", index);
continue;
}
if (currentIndexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)) {
IndexMetadata updatedIndexMetadata = currentIndexMetadata;
Map<ShardId, ShardRouting> activeInitializingShards = new HashMap<>();
if (request.restoreAllShards()) {
if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) {
throw new IllegalStateException(
"cannot restore index ["
+ index
+ "] because an open index "
+ "with same name already exists in the cluster. Close the existing index"
);
}
updatedIndexMetadata = IndexMetadata.builder(currentIndexMetadata)
.state(IndexMetadata.State.OPEN)
.version(1 + currentIndexMetadata.getVersion())
.mappingVersion(1 + currentIndexMetadata.getMappingVersion())
.settingsVersion(1 + currentIndexMetadata.getSettingsVersion())
.aliasesVersion(1 + currentIndexMetadata.getAliasesVersion())
.build();
} else {
activeInitializingShards = currentState.routingTable()
.index(index)
.shards()
.values()
.stream()
.map(IndexShardRoutingTable::primaryShard)
.filter(shardRouting -> shardRouting.unassigned() == false)
.collect(Collectors.toMap(ShardRouting::shardId, Function.identity()));
}

IndexId indexId = new IndexId(index, updatedIndexMetadata.getIndexUUID());

RecoverySource.RemoteStoreRecoverySource recoverySource = new RecoverySource.RemoteStoreRecoverySource(
restoreUUID,
updatedIndexMetadata.getCreationVersion(),
indexId
);
rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource, activeInitializingShards);
blocks.updateBlocks(updatedIndexMetadata);
mdBuilder.put(updatedIndexMetadata, true);
indicesToBeRestored.add(index);
totalShards += updatedIndexMetadata.getNumberOfShards();
} else {
logger.warn("Remote store is not enabled for index: {}", index);
}
}

restoreInfo = new RestoreInfo("remote_store", indicesToBeRestored, totalShards, totalShards);

RoutingTable rt = rtBuilder.build();
ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build();
return allocationService.reroute(updatedState, "restored from remote store");
}

@Override
public void onFailure(String source, Exception e) {
logger.warn("failed to restore from remote store", e);
listener.onFailure(e);
}

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new RestoreService.RestoreCompletionResponse(restoreUUID, null, restoreInfo));
}
});

}
}
6 changes: 6 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.index.IndexingPressureService;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheCleaner;
Expand Down Expand Up @@ -947,6 +948,10 @@ protected Node(
indicesService,
clusterInfoService::getClusterInfo
);
RemoteStoreRestoreService remoteStoreRestoreService = new RemoteStoreRestoreService(
clusterService,
clusterModule.getAllocationService()
);

final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(
settings,
Expand Down Expand Up @@ -1143,6 +1148,7 @@ protected Node(
b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);
b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus);
b.bind(RestoreService.class).toInstance(restoreService);
b.bind(RemoteStoreRestoreService.class).toInstance(remoteStoreRestoreService);
b.bind(RerouteService.class).toInstance(rerouteService);
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
b.bind(FsHealthService.class).toInstance(fsHealthService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ public static final class RestoreCompletionResponse {
private final Snapshot snapshot;
private final RestoreInfo restoreInfo;

private RestoreCompletionResponse(final String uuid, final Snapshot snapshot, final RestoreInfo restoreInfo) {
public RestoreCompletionResponse(final String uuid, final Snapshot snapshot, final RestoreInfo restoreInfo) {
this.uuid = uuid;
this.snapshot = snapshot;
this.restoreInfo = restoreInfo;
Expand Down