Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606))
- Fix remote shards balance ([#15335](https://github.com/opensearch-project/OpenSearch/pull/15335))
- Always use `constant_score` query for `match_only_text` field ([#16964](https://github.com/opensearch-project/OpenSearch/pull/16964))
- Fix Shallow copy snapshot failures on closed index ([#16868](https://github.com/opensearch-project/OpenSearch/pull/16868))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -1078,4 +1081,79 @@ public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws Interrup
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}

public void testSuccessfulShallowV1SnapshotPostIndexClose() throws Exception {
internalCluster().startClusterManagerOnlyNode();
String dataNode = internalCluster().startDataOnlyNodes(1).get(0);
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
ensureGreen(INDEX_NAME);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "0ms"));

assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

logger.info("Create shallow snapshot setting enabled repo");
String shallowSnapshotRepoName = "shallow-snapshot-repo-name";
Path shallowSnapshotRepoPath = randomRepoPath();
Settings.Builder settings = Settings.builder()
.put("location", shallowSnapshotRepoPath)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), Boolean.TRUE);
createRepository(shallowSnapshotRepoName, "fs", settings);

for (int i = 0; i < 10; i++) {
indexBulk(INDEX_NAME, 1);
}
flushAndRefresh(INDEX_NAME);

logger.info("Verify shallow snapshot created before close");
final String snapshot1 = "snapshot1";
SnapshotInfo snapshotInfo1 = internalCluster().client()
.admin()
.cluster()
.prepareCreateSnapshot(shallowSnapshotRepoName, snapshot1)
.setIndices(INDEX_NAME)
.setWaitForCompletion(true)
.get()
.getSnapshotInfo();

assertEquals(SnapshotState.SUCCESS, snapshotInfo1.state());
assertTrue(snapshotInfo1.successfulShards() > 0);
assertEquals(0, snapshotInfo1.failedShards());

for (int i = 0; i < 10; i++) {
indexBulk(INDEX_NAME, 1);
}

// close index
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(1000);
logger.info("Verify shallow snapshot created after close");
final String snapshot2 = "snapshot2";

SnapshotInfo snapshotInfo2 = internalCluster().client()
.admin()
.cluster()
.prepareCreateSnapshot(shallowSnapshotRepoName, snapshot2)
.setIndices(INDEX_NAME)
.setWaitForCompletion(true)
.get()
.getSnapshotInfo();

assertEquals(SnapshotState.SUCCESS, snapshotInfo2.state());
assertTrue(snapshotInfo2.successfulShards() > 0);
assertEquals(0, snapshotInfo2.failedShards());

// delete the index
cluster().wipeIndices(INDEX_NAME);
// try restoring the snapshot
RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(shallowSnapshotRepoName, snapshot2)
.setWaitForCompletion(true)
.execute()
.actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen(INDEX_NAME);
flushAndRefresh(INDEX_NAME);
assertBusy(() -> { assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), 20); });
}
}
16 changes: 16 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,22 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() {
return luceneVersion == null ? indexSettings.getIndexVersionCreated().luceneVersion : luceneVersion;
}

/**
* Fetches the last remote uploaded segment metadata file
* @return {@link RemoteSegmentMetadata}
* @throws IOException
*/
public RemoteSegmentMetadata fetchLastRemoteUploadedSegmentMetadata() throws IOException {
if (!indexSettings.isAssignedOnRemoteNode()) {
throw new IllegalStateException("Index is not assigned on Remote Node");
}
RemoteSegmentMetadata lastUploadedMetadata = getRemoteDirectory().readLatestMetadataFile();
if (lastUploadedMetadata == null) {
throw new FileNotFoundException("No metadata file found in remote store");
}
return lastUploadedMetadata;
}

/**
* Creates a new {@link IndexCommit} snapshot from the currently running engine. All resources referenced by this
* commit won't be freed until the commit / snapshot is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,45 @@ default void snapshotRemoteStoreIndexShard(
throw new UnsupportedOperationException();
}

/**
* Adds a reference of remote store data for a index commit point.
* <p>
* The index commit point can be obtained by using {@link org.opensearch.index.engine.Engine#acquireLastIndexCommit} method.
* Or for closed index can be obtained by reading last remote uploaded metadata by using {@link org.opensearch.index.shard.IndexShard#fetchLastRemoteUploadedSegmentMetadata()} method.
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
* @param store store to be snapshotted
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used
* to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier
* snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit}
* @param snapshotStatus snapshot status
* @param primaryTerm current Primary Term
* @param commitGeneration current commit generation
* @param startTime start time of the snapshot commit, this will be used as the start time for snapshot.
* @param indexFilesToFileLengthMap map of index files to file length
* @param listener listener invoked on completion
*/
default void snapshotRemoteStoreIndexShard(
Store store,
SnapshotId snapshotId,
IndexId indexId,
@Nullable IndexCommit snapshotIndexCommit,
@Nullable String shardStateIdentifier,
IndexShardSnapshotStatus snapshotStatus,
long primaryTerm,
long commitGeneration,
long startTime,
@Nullable Map<String, Long> indexFilesToFileLengthMap,
ActionListener<String> listener
) {
throw new UnsupportedOperationException();
}

/**
* Restores snapshot of the shard.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3744,6 +3744,33 @@ private void writeAtomic(BlobContainer container, final String blobName, final B
}
}

@Override
public void snapshotRemoteStoreIndexShard(
Store store,
SnapshotId snapshotId,
IndexId indexId,
IndexCommit snapshotIndexCommit,
@Nullable String shardStateIdentifier,
IndexShardSnapshotStatus snapshotStatus,
long primaryTerm,
long startTime,
ActionListener<String> listener
) {
snapshotRemoteStoreIndexShard(
store,
snapshotId,
indexId,
snapshotIndexCommit,
shardStateIdentifier,
snapshotStatus,
primaryTerm,
snapshotIndexCommit.getGeneration(),
startTime,
null,
listener
);
}

@Override
public void snapshotRemoteStoreIndexShard(
Store store,
Expand All @@ -3753,27 +3780,38 @@ public void snapshotRemoteStoreIndexShard(
String shardStateIdentifier,
IndexShardSnapshotStatus snapshotStatus,
long primaryTerm,
long commitGeneration,
long startTime,
Map<String, Long> indexFilesToFileLengthMap,
ActionListener<String> listener
) {
if (isReadOnly()) {
listener.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository"));
return;
}

final ShardId shardId = store.shardId();
try {
final String generation = snapshotStatus.generation();
logger.info("[{}] [{}] shallow copy snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation);
final BlobContainer shardContainer = shardContainer(indexId, shardId);

long indexTotalFileSize = 0;
// local store is being used here to fetch the files metadata instead of remote store as currently
// remote store is mirroring the local store.
List<String> fileNames = new ArrayList<>(snapshotIndexCommit.getFileNames());
Store.MetadataSnapshot commitSnapshotMetadata = store.getMetadata(snapshotIndexCommit);
for (String fileName : fileNames) {
indexTotalFileSize += commitSnapshotMetadata.get(fileName).length();
List<String> fileNames;

if (snapshotIndexCommit != null) {
// local store is being used here to fetch the files metadata instead of remote store as currently
// remote store is mirroring the local store.
fileNames = new ArrayList<>(snapshotIndexCommit.getFileNames());
Store.MetadataSnapshot commitSnapshotMetadata = store.getMetadata(snapshotIndexCommit);
for (String fileName : fileNames) {
indexTotalFileSize += commitSnapshotMetadata.get(fileName).length();
}
} else {
fileNames = new ArrayList<>(indexFilesToFileLengthMap.keySet());
indexTotalFileSize = indexFilesToFileLengthMap.values().stream().mapToLong(Long::longValue).sum();
}

int indexTotalNumberOfFiles = fileNames.size();

snapshotStatus.moveToStarted(
Expand All @@ -3784,7 +3822,7 @@ public void snapshotRemoteStoreIndexShard(
indexTotalFileSize
);

final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration());
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(commitGeneration);

// now create and write the commit point
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
Expand All @@ -3795,7 +3833,7 @@ public void snapshotRemoteStoreIndexShard(
snapshotId.getName(),
lastSnapshotStatus.getIndexVersion(),
primaryTerm,
snapshotIndexCommit.getGeneration(),
commitGeneration,
lastSnapshotStatus.getStartTime(),
threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(),
indexTotalNumberOfFiles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.opensearch.cluster.SnapshotsInProgress.ShardState;
import org.opensearch.cluster.SnapshotsInProgress.State;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
Expand All @@ -63,6 +64,7 @@
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus.Stage;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.RepositoriesService;
Expand All @@ -74,7 +76,6 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -371,7 +372,9 @@ private void snapshot(
ActionListener<String> listener
) {
try {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShardOrNull(shardId.id());
final boolean closedIndex = indexService.getMetadata().getState() == IndexMetadata.State.CLOSE;
if (indexShard.routingEntry().primary() == false) {
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
}
Expand All @@ -398,36 +401,56 @@ private void snapshot(
if (remoteStoreIndexShallowCopy && indexShard.indexSettings().isRemoteStoreEnabled()) {
long startTime = threadPool.relativeTimeInMillis();
long primaryTerm = indexShard.getOperationPrimaryTerm();
// we flush first to make sure we get the latest writes snapshotted
wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true);
IndexCommit snapshotIndexCommit = wrappedSnapshot.get();
long commitGeneration = snapshotIndexCommit.getGeneration();
long commitGeneration = 0L;
Map<String, Long> indexFilesToFileLengthMap = null;
IndexCommit snapshotIndexCommit = null;

try {
if (closedIndex) {
RemoteSegmentMetadata lastRemoteUploadedIndexCommit = indexShard.fetchLastRemoteUploadedSegmentMetadata();
indexFilesToFileLengthMap = lastRemoteUploadedIndexCommit.getMetadata()
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getLength()));
primaryTerm = lastRemoteUploadedIndexCommit.getPrimaryTerm();
commitGeneration = lastRemoteUploadedIndexCommit.getGeneration();
} else {
wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true);
snapshotIndexCommit = wrappedSnapshot.get();
commitGeneration = snapshotIndexCommit.getGeneration();
}
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
} catch (NoSuchFileException e) {
wrappedSnapshot.close();
logger.warn(
"Exception while acquiring lock on primaryTerm = {} and generation = {}",
primaryTerm,
commitGeneration
);
indexShard.flush(new FlushRequest(shardId.getIndexName()).force(true));
wrappedSnapshot = indexShard.acquireLastIndexCommit(false);
snapshotIndexCommit = wrappedSnapshot.get();
commitGeneration = snapshotIndexCommit.getGeneration();
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
} catch (IOException e) {
if (closedIndex) {
logger.warn("Exception while reading latest metadata file from remote store");
listener.onFailure(e);
} else {
wrappedSnapshot.close();
logger.warn(
"Exception while acquiring lock on primaryTerm = {} and generation = {}",
primaryTerm,
commitGeneration
);
indexShard.flush(new FlushRequest(shardId.getIndexName()).force(true));
wrappedSnapshot = indexShard.acquireLastIndexCommit(false);
snapshotIndexCommit = wrappedSnapshot.get();
commitGeneration = snapshotIndexCommit.getGeneration();
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
}
}
try {
repository.snapshotRemoteStoreIndexShard(
indexShard.store(),
snapshot.getSnapshotId(),
indexId,
snapshotIndexCommit,
getShardStateId(indexShard, snapshotIndexCommit),
null,
snapshotStatus,
primaryTerm,
commitGeneration,
startTime,
ActionListener.runBefore(listener, wrappedSnapshot::close)
indexFilesToFileLengthMap,
closedIndex ? listener : ActionListener.runBefore(listener, wrappedSnapshot::close)
);
} catch (IndexShardSnapshotFailedException e) {
logger.error(
Expand Down
Loading