Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -137,8 +137,8 @@ public void testReturnsEmptyListWhenThereAreNotAvailableRepositories() throws Ex
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build());
ShardId shardId = getShardIdForIndex(indexName);

List<ShardSnapshot> shardSnapshotData = getShardSnapshotShard(shardId);
assertThat(shardSnapshotData, is(empty()));
Optional<ShardSnapshot> shardSnapshot = getLatestShardSnapshot(shardId);
assertThat(shardSnapshot.isPresent(), is(equalTo(false)));
}

public void testOnlyFetchesSnapshotFromEnabledRepositories() throws Exception {
Expand Down Expand Up @@ -171,18 +171,22 @@ public void testOnlyFetchesSnapshotFromEnabledRepositories() throws Exception {
createSnapshot(repositoryName, snapshotName, indexName);
}

List<ShardSnapshot> shardSnapshotDataForShard = getShardSnapshotShard(shardId);
Optional<ShardSnapshot> latestShardSnapshot = getLatestShardSnapshot(shardId);

assertThat(shardSnapshotDataForShard.size(), is(equalTo(numberOfRecoveryEnabledRepositories)));
for (ShardSnapshot shardSnapshotData : shardSnapshotDataForShard) {
assertThat(recoveryEnabledRepos.contains(shardSnapshotData.getRepository()), is(equalTo(true)));
assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0)));
if (numberOfRecoveryEnabledRepositories == 0) {
assertThat(latestShardSnapshot.isPresent(), is(equalTo(false)));
} else {
assertThat(latestShardSnapshot.isPresent(), is(equalTo(true)));

ShardSnapshot shardSnapshotData = latestShardSnapshot.get();
ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo();
assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId)));
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName)));
assertThat(recoveryEnabledRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(true)));
assertThat(nonEnabledRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(false)));

assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0)));

assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId)));
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName)));
}
}

Expand All @@ -199,12 +203,14 @@ public void testFailingReposAreTreatedAsNonExistingShardSnapshots() throws Excep

int numberOfFailingRepos = randomIntBetween(1, 3);
List<Tuple<String, Path>> failingRepos = new ArrayList<>();
List<String> failingRepoNames = new ArrayList<>();
for (int i = 0; i < numberOfFailingRepos; i++) {
String repositoryName = "failing-repo-" + i;
Path repoPath = randomRepoPath();
createRepository(repositoryName, FailingRepoPlugin.TYPE, repoPath, true);
createSnapshot(repositoryName, snapshotName, indexName);
failingRepos.add(Tuple.tuple(repositoryName, repoPath));
failingRepoNames.add(repositoryName);
}

int numberOfWorkingRepositories = randomIntBetween(0, 4);
Expand All @@ -227,20 +233,25 @@ public void testFailingReposAreTreatedAsNonExistingShardSnapshots() throws Excep
assertAcked(client().admin().cluster().preparePutRepository(failingRepo.v1())
.setType(FailingRepoPlugin.TYPE)
.setVerify(false)
.setSettings(Settings.builder().put(repoFailureType, true).put("location", randomRepoPath()))
.setSettings(Settings.builder().put(repoFailureType, true).put("location", failingRepo.v2()))
);
}

List<ShardSnapshot> shardSnapshotDataForShard = getShardSnapshotShard(shardId);
Optional<ShardSnapshot> latestShardSnapshot = getLatestShardSnapshot(shardId);

if (numberOfWorkingRepositories == 0) {
assertThat(latestShardSnapshot.isPresent(), is(equalTo(false)));
} else {
assertThat(latestShardSnapshot.isPresent(), is(equalTo(true)));
ShardSnapshot shardSnapshotData = latestShardSnapshot.get();
ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo();
assertThat(workingRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(true)));
assertThat(failingRepoNames.contains(shardSnapshotInfo.getRepository()), is(equalTo(false)));

assertThat(shardSnapshotDataForShard.size(), is(equalTo(numberOfWorkingRepositories)));
for (ShardSnapshot shardSnapshotData : shardSnapshotDataForShard) {
assertThat(workingRepos.contains(shardSnapshotData.getRepository()), is(equalTo(true)));
assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0)));

ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo();
assertThat(shardSnapshotInfo.getShardId(), equalTo(shardId));
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), equalTo(snapshotName));
assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId)));
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName)));
}
}

Expand Down Expand Up @@ -268,15 +279,15 @@ protected boolean masterSupportsFetchingLatestSnapshots() {
}
};

PlainActionFuture<List<ShardSnapshot>> latestSnapshots = PlainActionFuture.newFuture();
PlainActionFuture<Optional<ShardSnapshot>> latestSnapshots = PlainActionFuture.newFuture();
shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, latestSnapshots);
assertThat(latestSnapshots.actionGet(), is(empty()));
assertThat(latestSnapshots.actionGet().isPresent(), is(equalTo(false)));
}

private List<ShardSnapshot> getShardSnapshotShard(ShardId shardId) throws Exception {
private Optional<ShardSnapshot> getLatestShardSnapshot(ShardId shardId) throws Exception {
ShardSnapshotsService shardSnapshotsService = getShardSnapshotsService();

PlainActionFuture<List<ShardSnapshot>> future = PlainActionFuture.newFuture();
PlainActionFuture<Optional<ShardSnapshot>> future = PlainActionFuture.newFuture();
shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, future);
return future.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -54,7 +53,7 @@ public void testGetShardSnapshotFromUnknownRepoReturnsAnError() throws Exception

if (useMultipleUnknownRepositories) {
GetShardSnapshotResponse response = responseFuture.get();
assertThat(response.getRepositoryShardSnapshots(), is(anEmptyMap()));
assertThat(response.getLatestShardSnapshot().isPresent(), is(equalTo(false)));

final Map<String, RepositoryException> failures = response.getRepositoryFailures();
for (String repository : repositories) {
Expand Down Expand Up @@ -209,9 +208,11 @@ public void testGetShardSnapshotFailureHandlingLetOtherRepositoriesRequestsMakeP
final String indexName = "test-idx";
createIndexWithContent(indexName);

createSnapshot(failingRepoName, "empty-snap", Collections.singletonList(indexName));
int snapshotIdx = 0;
createSnapshot(failingRepoName, "empty-snap-" + snapshotIdx++, Collections.singletonList(indexName));
SnapshotInfo latestSnapshot = null;
for (String workingRepoName : workingRepoNames) {
createSnapshot(workingRepoName, "empty-snap", Collections.singletonList(indexName));
latestSnapshot = createSnapshot(workingRepoName, "empty-snap-" + snapshotIdx++, Collections.singletonList(indexName));
}

final MockRepository repository = getRepositoryOnMaster(failingRepoName);
Expand Down Expand Up @@ -240,11 +241,17 @@ public void testGetShardSnapshotFailureHandlingLetOtherRepositoriesRequestsMakeP

for (String workingRepoName : workingRepoNames) {
assertThat(response.getFailureForRepository(workingRepoName).isPresent(), is(equalTo(false)));
assertThat(response.getIndexShardSnapshotInfoForRepository(workingRepoName).isPresent(), equalTo(true));
}

Optional<ShardSnapshotInfo> shardSnapshotInfoOpt = response.getLatestShardSnapshot();

assertThat(shardSnapshotInfoOpt.isPresent(), equalTo(true));
ShardSnapshotInfo shardSnapshotInfo = shardSnapshotInfoOpt.get();
assertThat(shardSnapshotInfo.getSnapshot(), equalTo(latestSnapshot.snapshot()));
assertThat(shardSnapshotInfo.getRepository(), equalTo(latestSnapshot.repository()));
}

public void testGetShardSnapshotInMultipleRepositories() {
public void testGetShardSnapshotInMultipleRepositoriesReturnsTheLatestSnapshot() {
int repoCount = randomIntBetween(2, 10);
List<String> repositories = new ArrayList<>();
for (int i = 0; i < repoCount; i++) {
Expand All @@ -256,21 +263,21 @@ public void testGetShardSnapshotInMultipleRepositories() {
final String indexName = "test-idx";
createIndexWithContent(indexName);

Map<String, SnapshotInfo> repositorySnapshots = new HashMap<>();
int snapshotIdx = 0;
SnapshotInfo expectedLatestSnapshot = null;
for (String repository : repositories) {
repositorySnapshots.put(repository, createSnapshot(repository, "snap-1", Collections.singletonList(indexName)));
expectedLatestSnapshot = createSnapshot(repository, "snap-" + snapshotIdx++, Collections.singletonList(indexName));
}

GetShardSnapshotResponse response = getLatestSnapshotForShardFuture(repositories, indexName, 0).actionGet();

for (String repository : repositories) {
assertThat(response.getFailureForRepository(repository).isPresent(), is(equalTo(false)));
Optional<ShardSnapshotInfo> shardSnapshotInfoOpt = response.getIndexShardSnapshotInfoForRepository(repository);
assertThat(shardSnapshotInfoOpt.isPresent(), equalTo(true));
assertThat(response.getRepositoryFailures(), is(anEmptyMap()));
Optional<ShardSnapshotInfo> shardSnapshotInfoOpt = response.getLatestShardSnapshot();

ShardSnapshotInfo shardSnapshotInfo = shardSnapshotInfoOpt.get();
assertThat(shardSnapshotInfo.getSnapshot(), equalTo(repositorySnapshots.get(repository).snapshot()));
}
assertThat(shardSnapshotInfoOpt.isPresent(), equalTo(true));
ShardSnapshotInfo shardSnapshotInfo = shardSnapshotInfoOpt.get();
assertThat(shardSnapshotInfo.getSnapshot(), equalTo(expectedLatestSnapshot.snapshot()));
assertThat(shardSnapshotInfo.getRepository(), equalTo(expectedLatestSnapshot.repository()));
}

public void testFailedSnapshotsAreNotReturned() throws Exception {
Expand Down Expand Up @@ -308,12 +315,13 @@ public void testFailedSnapshotsAreNotReturned() throws Exception {
Optional<ShardSnapshotInfo> latestSnapshotForShard = getLatestSnapshotForShard(repoName, indexName, 0);
assertThat(latestSnapshotForShard.isPresent(), equalTo(true));
assertThat(latestSnapshotForShard.get().getSnapshot(), equalTo(snapshotInfo.snapshot()));
assertThat(latestSnapshotForShard.get().getRepository(), equalTo(snapshotInfo.repository()));
}

private Optional<ShardSnapshotInfo> getLatestSnapshotForShard(String repository, String indexName, int shard) {
final GetShardSnapshotResponse response = getLatestSnapshotForShardFuture(Collections.singletonList(repository), indexName, shard)
.actionGet();
return response.getIndexShardSnapshotInfoForRepository(repository);
return response.getLatestShardSnapshot();
}

private PlainActionFuture<GetShardSnapshotResponse> getLatestSnapshotForShardFuture(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardSnapshotInfo;

Expand All @@ -20,38 +21,34 @@
import java.util.Optional;

public class GetShardSnapshotResponse extends ActionResponse {
public static GetShardSnapshotResponse EMPTY = new GetShardSnapshotResponse(Collections.emptyMap(), Collections.emptyMap());
public static GetShardSnapshotResponse EMPTY = new GetShardSnapshotResponse(null, Collections.emptyMap());

private final Map<String, ShardSnapshotInfo> repositoryShardSnapshots;
private final ShardSnapshotInfo latestShardSnapshot;
private final Map<String, RepositoryException> repositoryFailures;

GetShardSnapshotResponse(Map<String, ShardSnapshotInfo> repositoryShardSnapshots, Map<String, RepositoryException> repositoryFailures) {
this.repositoryShardSnapshots = repositoryShardSnapshots;
GetShardSnapshotResponse(@Nullable ShardSnapshotInfo latestShardSnapshot, Map<String, RepositoryException> repositoryFailures) {
this.latestShardSnapshot = latestShardSnapshot;
this.repositoryFailures = repositoryFailures;
}

GetShardSnapshotResponse(StreamInput in) throws IOException {
super(in);
this.repositoryShardSnapshots = in.readMap(StreamInput::readString, ShardSnapshotInfo::new);
this.latestShardSnapshot = in.readOptionalWriteable(ShardSnapshotInfo::new);
this.repositoryFailures = in.readMap(StreamInput::readString, RepositoryException::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(repositoryShardSnapshots, StreamOutput::writeString, (o, info) -> info.writeTo(o));
out.writeOptionalWriteable(latestShardSnapshot);
out.writeMap(repositoryFailures, StreamOutput::writeString, (o, err) -> err.writeTo(o));
}

public Optional<ShardSnapshotInfo> getIndexShardSnapshotInfoForRepository(String repositoryName) {
return Optional.ofNullable(repositoryShardSnapshots.get(repositoryName));
}

public Optional<RepositoryException> getFailureForRepository(String repository) {
return Optional.ofNullable(repositoryFailures.get(repository));
}

public Map<String, ShardSnapshotInfo> getRepositoryShardSnapshots() {
return repositoryShardSnapshots;
public Optional<ShardSnapshotInfo> getLatestShardSnapshot() {
return Optional.ofNullable(latestShardSnapshot);
}

public Map<String, RepositoryException> getRepositoryFailures() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.transport.TransportService;

import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -40,7 +41,8 @@
import java.util.stream.Collectors;

public class TransportGetShardSnapshotAction extends TransportMasterNodeAction<GetShardSnapshotRequest, GetShardSnapshotResponse> {

private static final Comparator<ShardSnapshotInfo> LATEST_SNAPSHOT_COMPARATOR = Comparator.comparing(ShardSnapshotInfo::getStartedAt)
.thenComparing(snapshotInfo -> snapshotInfo.getSnapshot().getSnapshotId());
private final IndexSnapshotsService indexSnapshotsService;

@Inject
Expand Down Expand Up @@ -120,19 +122,19 @@ private void getShardSnapshots(
private GetShardSnapshotResponse transformToResponse(
Collection<Tuple<Optional<ShardSnapshotInfo>, RepositoryException>> shardSnapshots
) {
final Map<String, ShardSnapshotInfo> repositoryShardSnapshot = shardSnapshots.stream()
final Optional<ShardSnapshotInfo> latestSnapshot = shardSnapshots.stream()
.map(Tuple::v1)
.filter(Objects::nonNull)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toMap(ShardSnapshotInfo::getRepository, Function.identity()));
.max(LATEST_SNAPSHOT_COMPARATOR);

final Map<String, RepositoryException> failures = shardSnapshots.stream()
.map(Tuple::v2)
.filter(Objects::nonNull)
.collect(Collectors.toMap(RepositoryException::repository, Function.identity()));

return new GetShardSnapshotResponse(repositoryShardSnapshot, failures);
return new GetShardSnapshotResponse(latestSnapshot.orElse(null), failures);
}

private Set<String> getRequestedRepositories(GetShardSnapshotRequest request, ClusterState state) {
Expand Down
Loading