Skip to content

Commit e039358

Browse files
committed
[Remote segments] Add backpressure in write path on segments lag between local and remote store (opensearch-project#7459)
Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent 92571b7 commit e039358

29 files changed

Lines changed: 815 additions & 411 deletions

server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,7 @@ public static final IndexShard newIndexShard(
678678
cbs,
679679
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
680680
SegmentReplicationCheckpointPublisher.EMPTY,
681+
null,
681682
null
682683
);
683684
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotestore;
10+
11+
import org.junit.Before;
12+
import org.opensearch.action.index.IndexResponse;
13+
import org.opensearch.cluster.metadata.IndexMetadata;
14+
import org.opensearch.common.UUIDs;
15+
import org.opensearch.common.io.FileSystemUtils;
16+
import org.opensearch.common.settings.Settings;
17+
import org.opensearch.common.util.FeatureFlags;
18+
import org.opensearch.index.IndexModule;
19+
import org.opensearch.index.IndexSettings;
20+
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
21+
import org.opensearch.indices.replication.common.ReplicationType;
22+
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
23+
import org.opensearch.test.FeatureFlagSetter;
24+
25+
import java.io.IOException;
26+
import java.nio.file.Path;
27+
import java.util.Arrays;
28+
import java.util.Collections;
29+
import java.util.Set;
30+
import java.util.stream.Collectors;
31+
32+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
33+
34+
public abstract class AbstractRemoteStoreMockRepositoryIntegTestCase extends AbstractSnapshotIntegTestCase {
35+
36+
protected static final String REPOSITORY_NAME = "my-segment-repo-1";
37+
protected static final String INDEX_NAME = "remote-store-test-idx-1";
38+
39+
@Override
40+
protected Settings featureFlagSettings() {
41+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
42+
}
43+
44+
@Before
45+
public void setup() {
46+
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
47+
internalCluster().startClusterManagerOnlyNode();
48+
}
49+
50+
@Override
51+
public Settings indexSettings() {
52+
return remoteStoreIndexSettings(0);
53+
}
54+
55+
protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
56+
return Settings.builder()
57+
.put(super.indexSettings())
58+
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
59+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
60+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
61+
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
62+
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
63+
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
64+
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
65+
.build();
66+
}
67+
68+
protected void deleteRepo() {
69+
logger.info("--> Deleting the repository={}", REPOSITORY_NAME);
70+
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
71+
}
72+
73+
protected void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) {
74+
logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation);
75+
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
76+
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
77+
// repository creation can happen without failure.
78+
createRepository(
79+
REPOSITORY_NAME,
80+
"mock",
81+
Settings.builder()
82+
.put("location", repoLocation)
83+
.put("random_control_io_exception_rate", ioFailureRate)
84+
.put("skip_exception_on_verification_file", true)
85+
.put("skip_exception_on_list_blobs", true)
86+
// Skipping is required for metadata as it is part of recovery
87+
.put("skip_exception_on_blobs", skipExceptionBlobList)
88+
.put("max_failure_number", Long.MAX_VALUE)
89+
);
90+
91+
internalCluster().startDataOnlyNodes(1);
92+
createIndex(INDEX_NAME);
93+
logger.info("--> Created index={}", INDEX_NAME);
94+
ensureYellowAndNoInitializingShards(INDEX_NAME);
95+
logger.info("--> Cluster is yellow with no initializing shards");
96+
ensureGreen(INDEX_NAME);
97+
logger.info("--> Cluster is green");
98+
}
99+
100+
/**
101+
* Gets all segment files which starts with "_". For instance, _0.cfe, _o.cfs etc.
102+
*
103+
* @param location the path to location where segment files are being searched.
104+
* @return set of file names of all segment file or empty set if there was IOException thrown.
105+
*/
106+
protected Set<String> getSegmentFiles(Path location) {
107+
try {
108+
return Arrays.stream(FileSystemUtils.files(location))
109+
.filter(path -> path.getFileName().toString().startsWith("_"))
110+
.map(path -> path.getFileName().toString())
111+
.map(this::getLocalSegmentFilename)
112+
.collect(Collectors.toSet());
113+
} catch (IOException exception) {
114+
logger.error("Exception occurred while getting segment files", exception);
115+
}
116+
return Collections.emptySet();
117+
}
118+
119+
private String getLocalSegmentFilename(String remoteFilename) {
120+
return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0];
121+
}
122+
123+
private IndexResponse indexSingleDoc() {
124+
return client().prepareIndex(INDEX_NAME)
125+
.setId(UUIDs.randomBase64UUID())
126+
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
127+
.get();
128+
}
129+
130+
protected void indexData(int numberOfIterations, boolean invokeFlush) {
131+
logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush);
132+
for (int i = 0; i < numberOfIterations; i++) {
133+
int numberOfOperations = randomIntBetween(1, 5);
134+
logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i);
135+
for (int j = 0; j < numberOfOperations; j++) {
136+
indexSingleDoc();
137+
}
138+
if (invokeFlush) {
139+
flush(INDEX_NAME);
140+
} else {
141+
refresh(INDEX_NAME);
142+
}
143+
}
144+
}
145+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotestore;
10+
11+
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
14+
import org.opensearch.test.OpenSearchIntegTestCase;
15+
16+
import java.nio.file.Path;
17+
18+
import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
19+
20+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
21+
public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
22+
23+
public void testWritesRejected() {
24+
Path location = randomRepoPath().toAbsolutePath();
25+
setup(location, 1d, "metadata");
26+
27+
Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build();
28+
ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin()
29+
.cluster()
30+
.prepareUpdateSettings()
31+
.setPersistentSettings(request)
32+
.get();
33+
assertEquals(clusterUpdateResponse.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true");
34+
35+
logger.info("--> Indexing data");
36+
OpenSearchRejectedExecutionException ex = assertThrows(
37+
OpenSearchRejectedExecutionException.class,
38+
() -> indexData(randomIntBetween(10, 20), randomBoolean())
39+
);
40+
assertTrue(ex.getMessage().contains("rejected execution on primary shard"));
41+
deleteRepo();
42+
}
43+
}

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIT.java renamed to server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import static java.util.Arrays.asList;
2727
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
2828

29-
public class RemoteStoreBaseIT extends OpenSearchIntegTestCase {
29+
public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
3030
protected static final String REPOSITORY_NAME = "test-remore-store-repo";
3131
protected static final int SHARD_COUNT = 1;
3232
protected static final int REPLICA_COUNT = 1;

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
3535

3636
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
37-
public class RemoteStoreIT extends RemoteStoreBaseIT {
37+
public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase {
3838

3939
private static final String INDEX_NAME = "remote-store-test-idx-1";
4040
private static final String TOTAL_OPERATIONS = "total-operations";

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java

Lines changed: 11 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -8,79 +8,21 @@
88

99
package org.opensearch.remotestore;
1010

11-
import org.junit.After;
12-
import org.junit.Before;
11+
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
1312
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
1413
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
15-
import org.opensearch.action.index.IndexResponse;
16-
import org.opensearch.cluster.metadata.IndexMetadata;
17-
import org.opensearch.common.UUIDs;
18-
import org.opensearch.common.io.FileSystemUtils;
1914
import org.opensearch.common.settings.Settings;
20-
import org.opensearch.common.util.FeatureFlags;
21-
import org.opensearch.index.IndexModule;
22-
import org.opensearch.index.IndexSettings;
23-
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
24-
import org.opensearch.indices.replication.common.ReplicationType;
25-
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
26-
import org.opensearch.test.FeatureFlagSetter;
2715
import org.opensearch.test.OpenSearchIntegTestCase;
2816

29-
import java.io.IOException;
3017
import java.nio.file.Path;
31-
import java.util.Arrays;
32-
import java.util.Collections;
3318
import java.util.Locale;
3419
import java.util.Set;
3520
import java.util.concurrent.TimeUnit;
36-
import java.util.stream.Collectors;
3721

38-
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
22+
import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
3923

4024
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
41-
public class RemoteStoreRefreshListenerIT extends AbstractSnapshotIntegTestCase {
42-
43-
private static final String REPOSITORY_NAME = "my-segment-repo-1";
44-
private static final String INDEX_NAME = "remote-store-test-idx-1";
45-
46-
@Override
47-
protected Settings featureFlagSettings() {
48-
return Settings.builder()
49-
.put(super.featureFlagSettings())
50-
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
51-
.put(FeatureFlags.REMOTE_STORE, "true")
52-
.build();
53-
}
54-
55-
@Before
56-
public void setup() {
57-
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
58-
internalCluster().startClusterManagerOnlyNode();
59-
}
60-
61-
@Override
62-
public Settings indexSettings() {
63-
return remoteStoreIndexSettings(0);
64-
}
65-
66-
private Settings remoteStoreIndexSettings(int numberOfReplicas) {
67-
return Settings.builder()
68-
.put(super.indexSettings())
69-
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
70-
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
71-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
72-
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
73-
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
74-
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
75-
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
76-
.build();
77-
}
78-
79-
@After
80-
public void teardown() {
81-
logger.info("--> Deleting the repository={}", REPOSITORY_NAME);
82-
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
83-
}
25+
public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
8426

8527
public void testRemoteRefreshRetryOnFailure() throws Exception {
8628

@@ -107,76 +49,16 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {
10749
Set<String> filesInRepo = getSegmentFiles(segmentDataRepoPath);
10850
assertTrue(filesInRepo.containsAll(filesInLocal));
10951
}, 60, TimeUnit.SECONDS);
52+
deleteRepo();
11053
}
11154

112-
private void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) {
113-
logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation);
114-
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
115-
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
116-
// repository creation can happen without failure.
117-
createRepository(
118-
REPOSITORY_NAME,
119-
"mock",
120-
Settings.builder()
121-
.put("location", repoLocation)
122-
.put("random_control_io_exception_rate", ioFailureRate)
123-
.put("skip_exception_on_verification_file", true)
124-
.put("skip_exception_on_list_blobs", true)
125-
.put("max_failure_number", Long.MAX_VALUE)
126-
);
127-
128-
internalCluster().startDataOnlyNodes(1);
129-
createIndex(INDEX_NAME);
130-
logger.info("--> Created index={}", INDEX_NAME);
131-
ensureYellowAndNoInitializingShards(INDEX_NAME);
132-
logger.info("--> Cluster is yellow with no initializing shards");
133-
ensureGreen(INDEX_NAME);
134-
logger.info("--> Cluster is green");
135-
}
136-
137-
/**
138-
* Gets all segment files which starts with "_". For instance, _0.cfe, _o.cfs etc.
139-
*
140-
* @param location the path to location where segment files are being searched.
141-
* @return set of file names of all segment file or empty set if there was IOException thrown.
142-
*/
143-
private Set<String> getSegmentFiles(Path location) {
144-
try {
145-
return Arrays.stream(FileSystemUtils.files(location))
146-
.filter(path -> path.getFileName().toString().startsWith("_"))
147-
.map(path -> path.getFileName().toString())
148-
.map(this::getLocalSegmentFilename)
149-
.collect(Collectors.toSet());
150-
} catch (IOException exception) {
151-
logger.error("Exception occurred while getting segment files", exception);
152-
}
153-
return Collections.emptySet();
154-
}
155-
156-
private String getLocalSegmentFilename(String remoteFilename) {
157-
return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0];
158-
}
159-
160-
private IndexResponse indexSingleDoc() {
161-
return client().prepareIndex(INDEX_NAME)
162-
.setId(UUIDs.randomBase64UUID())
163-
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
164-
.get();
165-
}
55+
public void testRemoteRefreshSegmentPressureSettingChanged() {
56+
Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build();
57+
ClusterUpdateSettingsResponse response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get();
58+
assertEquals(response.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true");
16659

167-
private void indexData(int numberOfIterations, boolean invokeFlush) {
168-
logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush);
169-
for (int i = 0; i < numberOfIterations; i++) {
170-
int numberOfOperations = randomIntBetween(1, 5);
171-
logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i);
172-
for (int j = 0; j < numberOfOperations; j++) {
173-
indexSingleDoc();
174-
}
175-
if (invokeFlush) {
176-
flush(INDEX_NAME);
177-
} else {
178-
refresh(INDEX_NAME);
179-
}
180-
}
60+
request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), false).build();
61+
response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get();
62+
assertEquals(response.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "false");
18163
}
18264
}

server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
2929

3030
@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0)
31-
public class ReplicaToPrimaryPromotionIT extends RemoteStoreBaseIT {
31+
public class ReplicaToPrimaryPromotionIT extends RemoteStoreBaseIntegTestCase {
3232
private int shard_count = 5;
3333

3434
@Override

0 commit comments

Comments
 (0)