Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ public static final IndexShard newIndexShard(
cbs,
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.Before;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.test.FeatureFlagSetter;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public abstract class AbstractRemoteStoreMockRepositoryIntegTestCase extends AbstractSnapshotIntegTestCase {

protected static final String REPOSITORY_NAME = "my-segment-repo-1";
protected static final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

@Before
public void setup() {
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
internalCluster().startClusterManagerOnlyNode();
}

@Override
public Settings indexSettings() {
return remoteStoreIndexSettings(0);
}

protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(super.indexSettings())
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

protected void deleteRepo() {
logger.info("--> Deleting the repository={}", REPOSITORY_NAME);
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

protected void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) {
logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation);
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
// repository creation can happen without failure.
createRepository(
REPOSITORY_NAME,
"mock",
Settings.builder()
.put("location", repoLocation)
.put("random_control_io_exception_rate", ioFailureRate)
.put("skip_exception_on_verification_file", true)
.put("skip_exception_on_list_blobs", true)
// Skipping is required for metadata as it is part of recovery
.put("skip_exception_on_blobs", skipExceptionBlobList)
.put("max_failure_number", Long.MAX_VALUE)
);

internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME);
logger.info("--> Created index={}", INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
logger.info("--> Cluster is yellow with no initializing shards");
ensureGreen(INDEX_NAME);
logger.info("--> Cluster is green");
}

/**
* Gets all segment files which starts with "_". For instance, _0.cfe, _o.cfs etc.
*
* @param location the path to location where segment files are being searched.
* @return set of file names of all segment file or empty set if there was IOException thrown.
*/
protected Set<String> getSegmentFiles(Path location) {
try {
return Arrays.stream(FileSystemUtils.files(location))
.filter(path -> path.getFileName().toString().startsWith("_"))
.map(path -> path.getFileName().toString())
.map(this::getLocalSegmentFilename)
.collect(Collectors.toSet());
} catch (IOException exception) {
logger.error("Exception occurred while getting segment files", exception);
}
return Collections.emptySet();
}

private String getLocalSegmentFilename(String remoteFilename) {
return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0];
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}

protected void indexData(int numberOfIterations, boolean invokeFlush) {
logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush);
for (int i = 0; i < numberOfIterations; i++) {
int numberOfOperations = randomIntBetween(1, 5);
logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc();
}
if (invokeFlush) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
Comment thread
ashking94 marked this conversation as resolved.

public void testWritesRejected() {
Path location = randomRepoPath().toAbsolutePath();
setup(location, 1d, "metadata");

Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build();
ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(request)
.get();
assertEquals(clusterUpdateResponse.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true");

logger.info("--> Indexing data");
OpenSearchRejectedExecutionException ex = assertThrows(
OpenSearchRejectedExecutionException.class,
() -> indexData(randomIntBetween(10, 20), randomBoolean())
);
assertTrue(ex.getMessage().contains("rejected execution on primary shard"));
deleteRepo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static java.util.Arrays.asList;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreBaseIT extends OpenSearchIntegTestCase {
public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remore-store-repo";
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreIT extends RemoteStoreBaseIT {
public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";
private static final String TOTAL_OPERATIONS = "total-operations";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,79 +8,21 @@

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreRefreshListenerIT extends AbstractSnapshotIntegTestCase {

private static final String REPOSITORY_NAME = "my-segment-repo-1";
private static final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.put(FeatureFlags.REMOTE_STORE, "true")
.build();
}

@Before
public void setup() {
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
internalCluster().startClusterManagerOnlyNode();
}

@Override
public Settings indexSettings() {
return remoteStoreIndexSettings(0);
}

private Settings remoteStoreIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(super.indexSettings())
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

@After
public void teardown() {
logger.info("--> Deleting the repository={}", REPOSITORY_NAME);
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}
public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {

public void testRemoteRefreshRetryOnFailure() throws Exception {

Expand All @@ -107,76 +49,16 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {
Set<String> filesInRepo = getSegmentFiles(segmentDataRepoPath);
assertTrue(filesInRepo.containsAll(filesInLocal));
}, 60, TimeUnit.SECONDS);
deleteRepo();
}

private void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) {
logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation);
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
// repository creation can happen without failure.
createRepository(
REPOSITORY_NAME,
"mock",
Settings.builder()
.put("location", repoLocation)
.put("random_control_io_exception_rate", ioFailureRate)
.put("skip_exception_on_verification_file", true)
.put("skip_exception_on_list_blobs", true)
.put("max_failure_number", Long.MAX_VALUE)
);

internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME);
logger.info("--> Created index={}", INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
logger.info("--> Cluster is yellow with no initializing shards");
ensureGreen(INDEX_NAME);
logger.info("--> Cluster is green");
}

/**
* Gets all segment files which starts with "_". For instance, _0.cfe, _o.cfs etc.
*
* @param location the path to location where segment files are being searched.
* @return set of file names of all segment file or empty set if there was IOException thrown.
*/
private Set<String> getSegmentFiles(Path location) {
try {
return Arrays.stream(FileSystemUtils.files(location))
.filter(path -> path.getFileName().toString().startsWith("_"))
.map(path -> path.getFileName().toString())
.map(this::getLocalSegmentFilename)
.collect(Collectors.toSet());
} catch (IOException exception) {
logger.error("Exception occurred while getting segment files", exception);
}
return Collections.emptySet();
}

private String getLocalSegmentFilename(String remoteFilename) {
return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0];
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}
public void testRemoteRefreshSegmentPressureSettingChanged() {
Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build();
ClusterUpdateSettingsResponse response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get();
assertEquals(response.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true");

private void indexData(int numberOfIterations, boolean invokeFlush) {
logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush);
for (int i = 0; i < numberOfIterations; i++) {
int numberOfOperations = randomIntBetween(1, 5);
logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc();
}
if (invokeFlush) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}
}
request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), false).build();
response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get();
assertEquals(response.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "false");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0)
public class ReplicaToPrimaryPromotionIT extends RemoteStoreBaseIT {
public class ReplicaToPrimaryPromotionIT extends RemoteStoreBaseIntegTestCase {
private int shard_count = 5;

@Override
Expand Down
Loading