Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationShardStats;
Expand Down Expand Up @@ -175,17 +174,6 @@ private IndexShard getIndexShard(ClusterState state, ShardRouting routing, Strin
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName);
}

/**
* Fetch IndexShard by shardId, multiple shards per node allowed.
*/
protected IndexShard getIndexShard(String node, ShardId shardId, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> id = indexService.shardIds().stream().filter(sid -> sid == shardId.id()).findFirst();
return indexService.getShard(id.get());
}

/**
* Fetch IndexShard, assumes only a single shard per node.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
Expand All @@ -130,7 +131,8 @@ public IndexStatsIT(Settings settings) {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() }
);
}

Expand Down Expand Up @@ -175,7 +177,7 @@ public void testFieldDataStats() throws InterruptedException {
ensureGreen();
client().prepareIndex("test").setId("1").setSource("field", "value1", "field2", "value1").execute().actionGet();
client().prepareIndex("test").setId("2").setSource("field", "value2", "field2", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
refreshAndWaitForReplication();
indexRandomForConcurrentSearch("test");

NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
Expand Down Expand Up @@ -299,7 +301,7 @@ public void testClearAllCaches() throws Exception {
client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client().prepareIndex("test").setId("1").setSource("field", "value1").execute().actionGet();
client().prepareIndex("test").setId("2").setSource("field", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
refreshAndWaitForReplication();
indexRandomForConcurrentSearch("test");

NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
Expand Down Expand Up @@ -667,7 +669,7 @@ public void testSimpleStats() throws Exception {
client().prepareIndex("test1").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test1").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();
refresh();
refreshAndWaitForReplication();

NumShards test1 = getNumShards("test1");
long test1ExpectedWrites = 2 * test1.dataCopies;
Expand All @@ -682,7 +684,13 @@ public void testSimpleStats() throws Exception {
assertThat(stats.getPrimaries().getIndexing().getTotal().getIndexFailedCount(), equalTo(0L));
assertThat(stats.getPrimaries().getIndexing().getTotal().isThrottled(), equalTo(false));
assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis(), equalTo(0L));
assertThat(stats.getTotal().getIndexing().getTotal().getIndexCount(), equalTo(totalExpectedWrites));

// This assert should not be done on segrep enabled indices because we are asserting Indexing/Write operations count on
// all primary and replica shards. But in case of segrep, Indexing/Write operation don't happen on replica shards. So we can
// ignore this assert check for segrep enabled indices.
if (isSegmentReplicationEnabledForIndex("test1") == false && isSegmentReplicationEnabledForIndex("test2") == false) {
assertThat(stats.getTotal().getIndexing().getTotal().getIndexCount(), equalTo(totalExpectedWrites));
}
assertThat(stats.getTotal().getStore(), notNullValue());
assertThat(stats.getTotal().getMerge(), notNullValue());
assertThat(stats.getTotal().getFlush(), notNullValue());
Expand Down Expand Up @@ -825,6 +833,7 @@ public void testMergeStats() {
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
stats = client().admin().indices().prepareStats().setMerge(true).execute().actionGet();

refreshAndWaitForReplication();
assertThat(stats.getTotal().getMerge(), notNullValue());
assertThat(stats.getTotal().getMerge().getTotal(), greaterThan(0L));
}
Expand All @@ -851,7 +860,7 @@ public void testSegmentsStats() {

client().admin().indices().prepareFlush().get();
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
client().admin().indices().prepareRefresh().get();
refreshAndWaitForReplication();
stats = client().admin().indices().prepareStats().setSegments(true).get();

assertThat(stats.getTotal().getSegments(), notNullValue());
Expand All @@ -869,7 +878,7 @@ public void testAllFlags() throws Exception {
client().prepareIndex("test_index").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test_index_2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();

client().admin().indices().prepareRefresh().execute().actionGet();
refreshAndWaitForReplication();
IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats();
Flag[] values = CommonStatsFlags.Flag.values();
for (Flag flag : values) {
Expand Down Expand Up @@ -1453,6 +1462,7 @@ public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() {
.get()
.status()
);
refreshAndWaitForReplication();
ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).setTranslog(true).get().getShards()[0];
RemoteSegmentStats remoteSegmentStatsFromIndexStats = shard.getStats().getSegments().getRemoteSegmentStats();
assertZeroRemoteSegmentStats(remoteSegmentStatsFromIndexStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.recovery;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.refresh.RefreshResponse;
Expand All @@ -52,10 +54,11 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.DocsStats;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -69,12 +72,26 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoTimeout;

public class RecoveryWhileUnderLoadIT extends OpenSearchIntegTestCase {
public class RecoveryWhileUnderLoadIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {

public RecoveryWhileUnderLoadIT(Settings settings) {
super(settings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT).build() },
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() }
);
}

private final Logger logger = LogManager.getLogger(RecoveryWhileUnderLoadIT.class);

public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin {
Expand Down Expand Up @@ -150,7 +167,7 @@ public void testRecoverWhileUnderLoadAllocateReplicasTest() throws Exception {
logger.info("--> indexing threads stopped");

logger.info("--> refreshing the index");
refreshAndAssert();
assertAfterRefreshAndWaitForReplication();
logger.info("--> verifying indexed content");
iterateAssertCount(numberOfShards, 10, indexer.getIds());
}
Expand Down Expand Up @@ -211,7 +228,7 @@ public void testRecoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest() thr
logger.info("--> indexing threads stopped");

logger.info("--> refreshing the index");
refreshAndAssert();
assertAfterRefreshAndWaitForReplication();
logger.info("--> verifying indexed content");
iterateAssertCount(numberOfShards, 10, indexer.getIds());
}
Expand Down Expand Up @@ -325,7 +342,7 @@ public void testRecoverWhileUnderLoadWithReducedAllowedNodes() throws Exception
);

logger.info("--> refreshing the index");
refreshAndAssert();
assertAfterRefreshAndWaitForReplication();
logger.info("--> verifying indexed content");
iterateAssertCount(numberOfShards, 10, indexer.getIds());
}
Expand Down Expand Up @@ -375,7 +392,7 @@ public void testRecoverWhileRelocating() throws Exception {
ensureGreen(TimeValue.timeValueMinutes(5));

logger.info("--> refreshing the index");
refreshAndAssert();
assertAfterRefreshAndWaitForReplication();
logger.info("--> verifying indexed content");
iterateAssertCount(numShards, 10, indexer.getIds());
}
Expand Down Expand Up @@ -474,10 +491,11 @@ private void logSearchResponse(int numberOfShards, long numberOfDocs, int iterat
);
}

private void refreshAndAssert() throws Exception {
private void assertAfterRefreshAndWaitForReplication() throws Exception {
assertBusy(() -> {
RefreshResponse actionGet = client().admin().indices().prepareRefresh().get();
assertAllSuccessful(actionGet);
}, 5, TimeUnit.MINUTES);
waitForReplication();
}
}
Loading