Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
449be28
Add peer recovery planners to take into account available snapshots
fcofdez Jul 29, 2021
63269b1
Renaming mistake
fcofdez Jul 29, 2021
7302d3e
Wire everything together
fcofdez Jul 30, 2021
7e12ecf
Merge remote-tracking branch 'origin/master' into peer-recovery-from-…
fcofdez Jul 30, 2021
0f76146
Renaming mistake
fcofdez Jul 30, 2021
2fecdc9
Make setting static
fcofdez Jul 30, 2021
9bf26ca
Remove dead code
fcofdez Jul 30, 2021
a4f803e
Add a setting to specify which repository to use during peer recoveries
fcofdez Aug 3, 2021
5029e16
Merge remote-tracking branch 'origin/master' into peer-recovery-from-…
fcofdez Aug 3, 2021
b1d5520
Simplify code
fcofdez Aug 3, 2021
eb86720
Merge remote-tracking branch 'origin/master' into peer-recovery-from-…
fcofdez Aug 3, 2021
a44509e
Fix error
fcofdez Aug 3, 2021
ce1c94b
Add dynamic settings
fcofdez Aug 3, 2021
661b51e
Add test to RecoverySourceHandlerTests
fcofdez Aug 3, 2021
4b43100
Generalize expected error
fcofdez Aug 3, 2021
4f6c493
Precommit
fcofdez Aug 3, 2021
4e65240
Update docs/reference/modules/indices/recovery.asciidoc
fcofdez Aug 4, 2021
21772c7
Update docs/reference/modules/indices/recovery.asciidoc
fcofdez Aug 4, 2021
9e9174f
Remove unnecessary annotation
fcofdez Aug 4, 2021
2c58137
Merge remote-tracking branch 'origin/master' into peer-recovery-from-…
fcofdez Aug 4, 2021
fde53c9
Support only 1 repository
fcofdez Aug 4, 2021
ceeae01
Merge remote-tracking branch 'origin/master' into peer-recovery-from-…
fcofdez Aug 4, 2021
05bdcd9
Revert "Support only 1 repository"
fcofdez Aug 4, 2021
34047af
Take into account repo path
fcofdez Aug 4, 2021
ddf8cfd
Take into account node versions
fcofdez Aug 4, 2021
b09b0b5
Merge remote-tracking branch 'origin/master' into peer-recovery-from-…
fcofdez Aug 4, 2021
9a77bf3
Merge remote-tracking branch 'origin/master' into peer-recovery-from-…
fcofdez Aug 8, 2021
ff1f120
Simplify
fcofdez Aug 8, 2021
18959b6
Get rid of repository setting and use per repository setting
fcofdez Aug 8, 2021
dc83c55
Review comments
fcofdez Aug 9, 2021
c1a1bbb
Add logic to recover files from snapshot
fcofdez Aug 2, 2021
33bb1dc
Default use_snapshots
fcofdez Aug 9, 2021
697589a
Fix tests
fcofdez Aug 9, 2021
f6df2a3
Fix test mistake
fcofdez Aug 9, 2021
2d82d9c
Fix compilation
fcofdez Aug 9, 2021
058495b
Invalid setting
fcofdez Aug 9, 2021
0ed510f
Only create snapshot when the blocked action is recover snapshot file
fcofdez Aug 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/reference/modules/indices/recovery.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,18 @@ and may interfere with indexing, search, and other activities in your cluster.
Do not increase this setting without carefully verifying that your cluster has
the resources available to handle the extra load that will result.

`indices.recovery.use_snapshots`::
(<<cluster-update-settings,Dynamic>>, Expert) Enables snapshot-based peer recoveries.
+
{es} recovers replicas and relocates primary shards using the _peer recovery_
process, which involves constructing a new copy of a shard on the target node.
When `indices.recovery.use_snapshots` is `false` {es} will construct this new
copy by transferring the index data from the current primary. When this setting
is `true` {es} will attempt to copy the index data from a recent snapshot
first, and will only copy data from the primary if it cannot identify a
suitable snapshot.
+
Setting this option to `true` reduces your operating costs if your cluster runs
in an environment where the node-to-node data transfer costs are higher than
the costs of recovering data from a snapshot. It also reduces the amount of
work that the primary must do during a recovery.
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.upgrades;

import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;

public class SnapshotBasedRecoveryIT extends AbstractRollingTestCase {
public void testSnapshotBasedRecovery() throws Exception {
final String indexName = "snapshot_based_recovery";
final String repositoryName = "snapshot_based_recovery_repo";
final int numDocs = 200;
switch (CLUSTER_TYPE) {
case OLD:
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster
createIndex(indexName, settings.build());
ensureGreen(indexName);
indexDocs(indexName, numDocs);
flush(indexName, true);

registerRepository(
repositoryName,
"fs",
true,
Settings.builder()
.put("location", "./snapshot_based_recovery")
.put(BlobStoreRepository.USE_FOR_PEER_RECOVERY_SETTING.getKey(), true)
.build()
);

createSnapshot(repositoryName, "snap", true);

updateIndexSettings(indexName, Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2));
ensureGreen(indexName);
break;
case MIXED:
case UPGRADED:
ensureGreen(indexName);
for (int i = 0; i < 4; i++) {
assertSearchResultsAreCorrect(indexName, numDocs);
}
break;
default:
throw new IllegalStateException("unknown type " + CLUSTER_TYPE);
}
}

private void assertSearchResultsAreCorrect(String indexName, int numDocs) throws IOException {
if (randomBoolean()) {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchAllQuery());
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
List<Map<String, Object>> hits = extractValue(searchResults, "hits.hits");
for (Map<String, Object> hit : hits) {
String docId = extractValue(hit, "_id");
assertThat(Integer.parseInt(docId), allOf(greaterThanOrEqualTo(0), lessThan(numDocs)));
assertThat(extractValue(hit, "_source.field"), equalTo(Integer.parseInt(docId)));
assertThat(extractValue(hit, "_source.text"), equalTo("Some text " + docId));
}
} else {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchQuery("text", "some"));
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
}
}

private static Map<String, Object> search(String index, QueryBuilder query) throws IOException {
final Request request = new Request(HttpPost.METHOD_NAME, '/' + index + "/_search");
request.setJsonEntity(new SearchSourceBuilder().trackTotalHits(true).query(query).toString());

final Response response = client().performRequest(request);
assertOK(response);

final Map<String, Object> responseAsMap = responseAsMap(response);
assertThat(
extractValue(responseAsMap, "_shards.failed"),
equalTo(0)
);
return responseAsMap;
}

private void indexDocs(String indexName, int numDocs) throws IOException {
final StringBuilder bulkBody = new StringBuilder();
for (int i = 0; i < numDocs; i++) {
bulkBody.append("{\"index\":{\"_id\":\"").append(i).append("\"}}\n");
bulkBody.append("{\"field\":").append(i).append(",\"text\":\"Some text ").append(i).append("\"}\n");
}

final Request documents = new Request(HttpPost.METHOD_NAME, '/' + indexName + "/_bulk");
documents.addParameter("refresh", "true");
documents.setJsonEntity(bulkBody.toString());
assertOK(client().performRequest(documents));
}

@SuppressWarnings("unchecked")
private static <T> T extractValue(Map<String, Object> map, String path) {
return (T) XContentMapValues.extractValue(path, map);
}
}
2 changes: 2 additions & 0 deletions qa/snapshot-based-recoveries/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.internal-test-artifact'
31 changes: 31 additions & 0 deletions qa/snapshot-based-recoveries/fs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

apply plugin: 'elasticsearch.java-rest-test'
apply plugin: 'elasticsearch.rest-resources'

dependencies {
javaRestTestImplementation(testArtifact(project(':qa:snapshot-based-recoveries')))
}

final File repoDir = file("$buildDir/testclusters/snapshot-recoveries-repo")

restResources {
restApi {
include 'indices', 'search', 'bulk', 'snapshot'
}
}

tasks.withType(Test).configureEach {
systemProperty 'tests.path.repo', repoDir
}

testClusters.all {
numberOfNodes = 3
setting 'path.repo', repoDir.absolutePath
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.recovery;

import org.elasticsearch.common.settings.Settings;

public class FsSnapshotBasedRecoveryIT extends AbstractSnapshotBasedRecoveryRestTestCase {

@Override
protected String repositoryType() {
return "fs";
}

@Override
protected Settings repositorySettings() {
return Settings.builder()
.put("location", System.getProperty("tests.path.repo"))
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.recovery;

import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.rest.ESRestTestCase;

import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;

public abstract class AbstractSnapshotBasedRecoveryRestTestCase extends ESRestTestCase {
private static final String REPOSITORY_NAME = "repository";
private static final String SNAPSHOT_NAME = "snapshot-for-recovery";

protected abstract String repositoryType();

protected abstract Settings repositorySettings();

public void testRecoveryUsingSnapshots() throws Exception {
final String repositoryType = repositoryType();
Settings repositorySettings = Settings.builder().put(repositorySettings())
.put(BlobStoreRepository.USE_FOR_PEER_RECOVERY_SETTING.getKey(), true)
.build();

registerRepository(REPOSITORY_NAME, repositoryType, true, repositorySettings);

final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);

createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
ensureGreen(indexName);

final int numDocs = randomIntBetween(1, 500);
indexDocs(indexName, numDocs);

forceMerge(indexName, randomBoolean(), randomBoolean());

deleteSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME, true);
createSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME, true);

// Add a new replica
updateIndexSettings(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1));
ensureGreen(indexName);

for (int i = 0; i < 4; i++) {
assertSearchResultsAreCorrect(indexName, numDocs);
}
deleteSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME, false);
}

private void assertSearchResultsAreCorrect(String indexName, int numDocs) throws IOException {
if (randomBoolean()) {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchAllQuery());
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
List<Map<String, Object>> hits = extractValue(searchResults, "hits.hits");
for (Map<String, Object> hit : hits) {
String docId = extractValue(hit, "_id");
assertThat(Integer.parseInt(docId), allOf(greaterThanOrEqualTo(0), lessThan(numDocs)));
assertThat(extractValue(hit, "_source.field"), equalTo(Integer.parseInt(docId)));
assertThat(extractValue(hit, "_source.text"), equalTo("Some text " + docId));
}
} else {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchQuery("text", "some"));
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
}
}

private static void forceMerge(String index, boolean onlyExpungeDeletes, boolean flush) throws IOException {
final Request request = new Request(HttpPost.METHOD_NAME, '/' + index + "/_forcemerge");
request.addParameter("only_expunge_deletes", Boolean.toString(onlyExpungeDeletes));
request.addParameter("flush", Boolean.toString(flush));
assertOK(client().performRequest(request));
}

private void indexDocs(String indexName, int numDocs) throws IOException {
final StringBuilder bulkBody = new StringBuilder();
for (int i = 0; i < numDocs; i++) {
bulkBody.append("{\"index\":{\"_id\":\"").append(i).append("\"}}\n");
bulkBody.append("{\"field\":").append(i).append(",\"text\":\"Some text ").append(i).append("\"}\n");
}

final Request documents = new Request(HttpPost.METHOD_NAME, '/' + indexName + "/_bulk");
documents.addParameter("refresh", Boolean.TRUE.toString());
documents.setJsonEntity(bulkBody.toString());
assertOK(client().performRequest(documents));
}

private static Map<String, Object> search(String index, QueryBuilder query) throws IOException {
final Request request = new Request(HttpPost.METHOD_NAME, '/' + index + "/_search");
request.setJsonEntity(new SearchSourceBuilder().trackTotalHits(true).query(query).toString());

final Response response = client().performRequest(request);
assertOK(response);

final Map<String, Object> responseAsMap = responseAsMap(response);
assertThat(
extractValue(responseAsMap, "_shards.failed"),
equalTo(0)
);
return responseAsMap;
}

@SuppressWarnings("unchecked")
private static <T> T extractValue(Map<String, Object> map, String path) {
return (T) XContentMapValues.extractValue(path, map);
}
}
Loading