Skip to content

Commit 843ea40

Browse files
committed
Refactor code
Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent f80f4ac commit 843ea40

3 files changed

Lines changed: 48 additions & 28 deletions

File tree

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.opensearch.core.util.FileSystemUtils;
2121
import org.opensearch.index.IndexModule;
2222
import org.opensearch.index.IndexSettings;
23+
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
2324
import org.opensearch.indices.replication.common.ReplicationType;
2425
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
2526
import org.opensearch.test.FeatureFlagSetter;
@@ -31,6 +32,7 @@
3132
import java.util.Collections;
3233
import java.util.Locale;
3334
import java.util.Set;
35+
import java.util.concurrent.TimeUnit;
3436
import java.util.stream.Collectors;
3537

3638
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
@@ -83,19 +85,44 @@ public void teardown() {
8385
public void testRemoteRefreshRetryOnFailure() throws Exception {
8486

8587
Path location = randomRepoPath().toAbsolutePath();
86-
logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, location);
88+
setup(location, randomDoubleBetween(0.1, 0.25, true), "metadata");
8789

90+
// Here we are having flush/refresh after each iteration of indexing. However, the refresh will not always succeed
91+
// due to IOExceptions that are thrown while doing uploadBlobs.
92+
indexData(randomIntBetween(5, 10), randomBoolean());
93+
logger.info("--> Indexed data");
94+
95+
// TODO - Once the segments stats api is available, we need to verify that there were failed upload attempts.
96+
IndicesStatsResponse response = client().admin().indices().stats(new IndicesStatsRequest()).get();
97+
assertEquals(1, response.getShards().length);
98+
99+
String indexUuid = response.getShards()[0].getShardRouting().index().getUUID();
100+
Path segmentDataRepoPath = location.resolve(String.format(Locale.ROOT, "%s/0/segments/data", indexUuid));
101+
String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid);
102+
103+
logger.info("--> Verify that the segment files are same on local and repository eventually");
104+
// This can take time as the retry interval is exponential and maxed at 30s
105+
assertBusy(() -> {
106+
Set<String> filesInLocal = getSegmentFiles(location.getRoot().resolve(segmentDataLocalPath));
107+
Set<String> filesInRepo = getSegmentFiles(segmentDataRepoPath);
108+
assertTrue(filesInRepo.containsAll(filesInLocal));
109+
}, 60, TimeUnit.SECONDS);
110+
}
111+
112+
private void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) {
113+
logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation);
88114
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
89115
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
90116
// repository creation can happen without failure.
91117
createRepository(
92118
REPOSITORY_NAME,
93119
"mock",
94120
Settings.builder()
95-
.put("location", location)
96-
.put("random_control_io_exception_rate", randomIntBetween(10, 25) / 100f)
121+
.put("location", repoLocation)
122+
.put("random_control_io_exception_rate", ioFailureRate)
97123
.put("skip_exception_on_verification_file", true)
98124
.put("skip_exception_on_list_blobs", true)
125+
.put("max_failure_number", Long.MAX_VALUE)
99126
);
100127

101128
internalCluster().startDataOnlyNodes(1);
@@ -105,24 +132,6 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {
105132
logger.info("--> Cluster is yellow with no initializing shards");
106133
ensureGreen(INDEX_NAME);
107134
logger.info("--> Cluster is green");
108-
109-
// Here we are having flush/refresh after each iteration of indexing. However, the refresh will not always succeed
110-
// due to IOExceptions that are thrown while doing uploadBlobs.
111-
indexData(randomIntBetween(5, 10), randomBoolean());
112-
logger.info("--> Indexed data");
113-
114-
// TODO - Once the segments stats api is available, we need to verify that there were failed upload attempts.
115-
IndicesStatsResponse response = client().admin().indices().stats(new IndicesStatsRequest()).get();
116-
assertEquals(1, response.getShards().length);
117-
118-
String indexUuid = response.getShards()[0].getShardRouting().index().getUUID();
119-
Path segmentDataRepoPath = location.resolve(String.format(Locale.ROOT, "%s/0/segments/data", indexUuid));
120-
String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid);
121-
122-
logger.info("--> Verify that the segment files are same on local and repository eventually");
123-
assertBusy(
124-
() -> assertEquals(getSegmentFiles(location.getRoot().resolve(segmentDataLocalPath)), getSegmentFiles(segmentDataRepoPath))
125-
);
126135
}
127136

128137
/**
@@ -134,15 +143,20 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {
134143
private Set<String> getSegmentFiles(Path location) {
135144
try {
136145
return Arrays.stream(FileSystemUtils.files(location))
137-
.filter(path -> path.getFileName().startsWith("_"))
146+
.filter(path -> path.getFileName().toString().startsWith("_"))
138147
.map(path -> path.getFileName().toString())
148+
.map(this::getLocalSegmentFilename)
139149
.collect(Collectors.toSet());
140150
} catch (IOException exception) {
141151
logger.error("Exception occurred while getting segment files", exception);
142152
}
143153
return Collections.emptySet();
144154
}
145155

156+
private String getLocalSegmentFilename(String remoteFilename) {
157+
return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0];
158+
}
159+
146160
private IndexResponse indexSingleDoc() {
147161
return client().prepareIndex(INDEX_NAME)
148162
.setId(UUIDs.randomBase64UUID())
@@ -153,7 +167,7 @@ private IndexResponse indexSingleDoc() {
153167
private void indexData(int numberOfIterations, boolean invokeFlush) {
154168
logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush);
155169
for (int i = 0; i < numberOfIterations; i++) {
156-
int numberOfOperations = randomIntBetween(20, 50);
170+
int numberOfOperations = randomIntBetween(1, 5);
157171
logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i);
158172
for (int j = 0; j < numberOfOperations; j++) {
159173
indexSingleDoc();

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
6565
/**
6666
* In an exponential back off setup, the maximum retry interval after the retry interval increases exponentially.
6767
*/
68-
private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS = 30_000;
68+
private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS = 10_000;
6969

7070
/**
7171
* Exponential back off policy with max retry interval.

server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.io.IOException;
3030
import java.util.Collections;
3131
import java.util.Map;
32+
import java.util.Objects;
3233
import java.util.concurrent.CountDownLatch;
3334
import java.util.concurrent.atomic.AtomicLong;
3435

@@ -251,7 +252,7 @@ public void testRefreshSuccessOnThirdAttemptAttempt() throws Exception {
251252
}
252253

253254
private void mockIndexShardWithRetryAndScheduleRefresh(
254-
int SucceedOnAttempt,
255+
int succeedOnAttempt,
255256
CountDownLatch refreshCountLatch,
256257
CountDownLatch successLatch
257258
) throws IOException {
@@ -289,22 +290,27 @@ private void mockIndexShardWithRetryAndScheduleRefresh(
289290
when(shard.getThreadPool()).thenReturn(threadPool);
290291

291292
// Mock indexShard.getReplicationTracker().isPrimaryMode()
293+
292294
doAnswer(invocation -> {
293-
refreshCountLatch.countDown();
295+
if (Objects.nonNull(refreshCountLatch)) {
296+
refreshCountLatch.countDown();
297+
}
294298
return indexShard.getReplicationTracker();
295299
}).when(shard).getReplicationTracker();
296300

297301
AtomicLong counter = new AtomicLong();
298302
// Mock indexShard.getSegmentInfosSnapshot()
299303
doAnswer(invocation -> {
300-
if (counter.incrementAndGet() <= SucceedOnAttempt - 1) {
304+
if (counter.incrementAndGet() <= succeedOnAttempt - 1) {
301305
throw new RuntimeException("Inducing failure in upload");
302306
}
303307
return indexShard.getSegmentInfosSnapshot();
304308
}).when(shard).getSegmentInfosSnapshot();
305309

306310
doAnswer(invocation -> {
307-
successLatch.countDown();
311+
if (Objects.nonNull(successLatch)) {
312+
successLatch.countDown();
313+
}
308314
return indexShard.getEngine();
309315
}).when(shard).getEngine();
310316

0 commit comments

Comments
 (0)