Skip to content

Commit e41873e

Browse files
committed
[RW Separation] Add polling segment replication for search replicas (opensearch-project#15627)
(cherry picked from commit 375c0bf) Signed-off-by: Marc Handalian <marc.handalian@gmail.com> (cherry picked from commit d3b3a93) Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
1 parent bf946d9 commit e41873e

12 files changed

Lines changed: 478 additions & 15 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3939
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
4040
- [Range Queries] Add new approximateable query framework to short-circuit range queries ([#13788](https://github.com/opensearch-project/OpenSearch/pull/13788))
4141
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
42-
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
42+
- [Reader Writer Separation] Add experimental search replica shard type to achieve reader writer separation ([#15237](https://github.com/opensearch-project/OpenSearch/pull/15237))
4343
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
4444
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291))
4545
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.replication;
10+
11+
import org.opensearch.cluster.metadata.IndexMetadata;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.common.util.FeatureFlags;
14+
import org.opensearch.test.OpenSearchIntegTestCase;
15+
import org.junit.After;
16+
import org.junit.Before;
17+
18+
import java.nio.file.Path;
19+
20+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
21+
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {
22+
23+
private static final String REPOSITORY_NAME = "test-remote-store-repo";
24+
protected Path absolutePath;
25+
26+
private Boolean useRemoteStore;
27+
28+
@Before
29+
public void randomizeRemoteStoreEnabled() {
30+
useRemoteStore = randomBoolean();
31+
}
32+
33+
@Override
34+
protected Settings nodeSettings(int nodeOrdinal) {
35+
if (useRemoteStore) {
36+
if (absolutePath == null) {
37+
absolutePath = randomRepoPath().toAbsolutePath();
38+
}
39+
return Settings.builder()
40+
.put(super.nodeSettings(nodeOrdinal))
41+
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
42+
.build();
43+
}
44+
return super.nodeSettings(nodeOrdinal);
45+
}
46+
47+
@After
48+
public void teardown() {
49+
if (useRemoteStore) {
50+
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
51+
}
52+
}
53+
54+
@Override
55+
public Settings indexSettings() {
56+
return Settings.builder()
57+
.put(super.indexSettings())
58+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
59+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
60+
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
61+
.build();
62+
}
63+
64+
@Override
65+
protected Settings featureFlagSettings() {
66+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
67+
}
68+
69+
public void testReplication() throws Exception {
70+
internalCluster().startClusterManagerOnlyNode();
71+
final String primary = internalCluster().startDataOnlyNode();
72+
createIndex(INDEX_NAME);
73+
ensureYellowAndNoInitializingShards(INDEX_NAME);
74+
final String replica = internalCluster().startDataOnlyNode();
75+
ensureGreen(INDEX_NAME);
76+
77+
final int docCount = 10;
78+
for (int i = 0; i < docCount; i++) {
79+
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
80+
}
81+
refresh(INDEX_NAME);
82+
waitForSearchableDocs(docCount, primary, replica);
83+
}
84+
85+
}

server/src/main/java/org/opensearch/index/IndexModule.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.opensearch.index.engine.EngineFactory;
7474
import org.opensearch.index.mapper.MapperService;
7575
import org.opensearch.index.shard.IndexEventListener;
76+
import org.opensearch.index.shard.IndexShard;
7677
import org.opensearch.index.shard.IndexingOperationListener;
7778
import org.opensearch.index.shard.SearchOperationListener;
7879
import org.opensearch.index.similarity.SimilarityService;
@@ -729,6 +730,56 @@ public IndexService newIndexService(
729730
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
730731
RecoverySettings recoverySettings,
731732
RemoteStoreSettings remoteStoreSettings
733+
) throws IOException {
734+
return newIndexService(
735+
indexCreationContext,
736+
environment,
737+
xContentRegistry,
738+
shardStoreDeleter,
739+
circuitBreakerService,
740+
bigArrays,
741+
threadPool,
742+
scriptService,
743+
clusterService,
744+
client,
745+
indicesQueryCache,
746+
mapperRegistry,
747+
indicesFieldDataCache,
748+
namedWriteableRegistry,
749+
idFieldDataEnabled,
750+
valuesSourceRegistry,
751+
remoteDirectoryFactory,
752+
translogFactorySupplier,
753+
clusterDefaultRefreshIntervalSupplier,
754+
recoverySettings,
755+
remoteStoreSettings,
756+
(s) -> {}
757+
);
758+
}
759+
760+
public IndexService newIndexService(
761+
IndexService.IndexCreationContext indexCreationContext,
762+
NodeEnvironment environment,
763+
NamedXContentRegistry xContentRegistry,
764+
IndexService.ShardStoreDeleter shardStoreDeleter,
765+
CircuitBreakerService circuitBreakerService,
766+
BigArrays bigArrays,
767+
ThreadPool threadPool,
768+
ScriptService scriptService,
769+
ClusterService clusterService,
770+
Client client,
771+
IndicesQueryCache indicesQueryCache,
772+
MapperRegistry mapperRegistry,
773+
IndicesFieldDataCache indicesFieldDataCache,
774+
NamedWriteableRegistry namedWriteableRegistry,
775+
BooleanSupplier idFieldDataEnabled,
776+
ValuesSourceRegistry valuesSourceRegistry,
777+
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
778+
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
779+
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
780+
RecoverySettings recoverySettings,
781+
RemoteStoreSettings remoteStoreSettings,
782+
Consumer<IndexShard> replicator
732783
) throws IOException {
733784
final IndexEventListener eventListener = freeze();
734785
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
@@ -789,7 +840,8 @@ public IndexService newIndexService(
789840
recoverySettings,
790841
remoteStoreSettings,
791842
fileCache,
792-
compositeIndexSettings
843+
compositeIndexSettings,
844+
replicator
793845
);
794846
success = true;
795847
return indexService;

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@
136136
import static java.util.Collections.emptyMap;
137137
import static java.util.Collections.unmodifiableMap;
138138
import static org.opensearch.common.collect.MapBuilder.newMapBuilder;
139+
import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING;
139140
import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasRemoteStoreSettings;
140141

141142
/**
@@ -174,6 +175,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
174175
private volatile AsyncTranslogFSync fsyncTask;
175176
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
176177
private volatile AsyncRetentionLeaseSyncTask retentionLeaseSyncTask;
178+
private volatile AsyncReplicationTask asyncReplicationTask;
177179

178180
// don't convert to Setting<> and register... we only set this in tests and register via a plugin
179181
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
@@ -194,6 +196,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
194196
private final RemoteStoreSettings remoteStoreSettings;
195197
private final FileCache fileCache;
196198
private final CompositeIndexSettings compositeIndexSettings;
199+
private final Consumer<IndexShard> replicator;
197200

198201
public IndexService(
199202
IndexSettings indexSettings,
@@ -231,7 +234,8 @@ public IndexService(
231234
RecoverySettings recoverySettings,
232235
RemoteStoreSettings remoteStoreSettings,
233236
FileCache fileCache,
234-
CompositeIndexSettings compositeIndexSettings
237+
CompositeIndexSettings compositeIndexSettings,
238+
Consumer<IndexShard> replicator
235239
) {
236240
super(indexSettings);
237241
this.allowExpensiveQueries = allowExpensiveQueries;
@@ -306,11 +310,15 @@ public IndexService(
306310
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
307311
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
308312
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
313+
if (READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(indexSettings.getNodeSettings())) {
314+
this.asyncReplicationTask = new AsyncReplicationTask(this);
315+
}
309316
this.translogFactorySupplier = translogFactorySupplier;
310317
this.recoverySettings = recoverySettings;
311318
this.remoteStoreSettings = remoteStoreSettings;
312319
this.compositeIndexSettings = compositeIndexSettings;
313320
this.fileCache = fileCache;
321+
this.replicator = replicator;
314322
updateFsyncTaskIfNecessary();
315323
}
316324

@@ -463,7 +471,8 @@ public IndexService(
463471
recoverySettings,
464472
remoteStoreSettings,
465473
null,
466-
null
474+
null,
475+
s -> {}
467476
);
468477
}
469478

@@ -472,6 +481,11 @@ static boolean needsMapperService(IndexSettings indexSettings, IndexCreationCont
472481
&& indexCreationContext == IndexCreationContext.CREATE_INDEX); // metadata verification needs a mapper service
473482
}
474483

484+
// visible for tests
485+
AsyncReplicationTask getReplicationTask() {
486+
return asyncReplicationTask;
487+
}
488+
475489
/**
476490
* Context for index creation
477491
*
@@ -1142,11 +1156,22 @@ public synchronized void updateMetadata(final IndexMetadata currentIndexMetadata
11421156
}
11431157
onRefreshIntervalChange();
11441158
updateFsyncTaskIfNecessary();
1159+
if (READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(indexSettings.getNodeSettings())) {
1160+
updateReplicationTask();
1161+
}
11451162
}
11461163

11471164
metadataListeners.forEach(c -> c.accept(newIndexMetadata));
11481165
}
11491166

1167+
private void updateReplicationTask() {
1168+
try {
1169+
asyncReplicationTask.close();
1170+
} finally {
1171+
asyncReplicationTask = new AsyncReplicationTask(this);
1172+
}
1173+
}
1174+
11501175
/**
11511176
* Called whenever the refresh interval changes. This can happen in 2 cases -
11521177
* 1. {@code cluster.default.index.refresh_interval} cluster setting changes. The change would only happen for
@@ -1411,6 +1436,47 @@ public String toString() {
14111436
}
14121437
}
14131438

1439+
final class AsyncReplicationTask extends BaseAsyncTask {
1440+
1441+
AsyncReplicationTask(IndexService indexService) {
1442+
super(indexService, indexService.getRefreshInterval());
1443+
}
1444+
1445+
@Override
1446+
protected void runInternal() {
1447+
indexService.maybeSyncSegments(false);
1448+
}
1449+
1450+
@Override
1451+
protected String getThreadPool() {
1452+
return ThreadPool.Names.GENERIC;
1453+
}
1454+
1455+
@Override
1456+
public String toString() {
1457+
return "replication";
1458+
}
1459+
1460+
@Override
1461+
protected boolean mustReschedule() {
1462+
return indexSettings.isSegRepEnabledOrRemoteNode() && super.mustReschedule();
1463+
}
1464+
}
1465+
1466+
private void maybeSyncSegments(boolean force) {
1467+
if (getRefreshInterval().millis() > 0 || force) {
1468+
for (IndexShard shard : this.shards.values()) {
1469+
try {
1470+
if (shard.routingEntry().isSearchOnly() && shard.routingEntry().active()) {
1471+
replicator.accept(shard);
1472+
}
1473+
} catch (IndexShardClosedException | AlreadyClosedException ex) {
1474+
// do nothing
1475+
}
1476+
}
1477+
}
1478+
}
1479+
14141480
final class AsyncTrimTranslogTask extends BaseAsyncTask {
14151481

14161482
AsyncTrimTranslogTask(IndexService indexService) {

server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1253,12 +1253,13 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
12531253
return this.latestReplicationCheckpoint;
12541254
}
12551255

1256-
private boolean isPrimaryRelocation(String allocationId) {
1256+
// skip any shard that is a relocating primary or search only replica (not tracked by primary)
1257+
private boolean shouldSkipReplicationTimer(String allocationId) {
12571258
Optional<ShardRouting> shardRouting = routingTable.shards()
12581259
.stream()
12591260
.filter(routing -> routing.allocationId().getId().equals(allocationId))
12601261
.findAny();
1261-
return shardRouting.isPresent() && shardRouting.get().primary();
1262+
return shardRouting.isPresent() && (shardRouting.get().primary() || shardRouting.get().isSearchOnly());
12621263
}
12631264

12641265
private void createReplicationLagTimers() {
@@ -1270,7 +1271,7 @@ private void createReplicationLagTimers() {
12701271
// it is possible for a shard to be in-sync but not yet removed from the checkpoints collection after a failover event.
12711272
if (cps.inSync
12721273
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
1273-
&& isPrimaryRelocation(allocationId) == false
1274+
&& shouldSkipReplicationTimer(allocationId) == false
12741275
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
12751276
&& (indexSettings.isSegRepLocalEnabled() == true
12761277
|| isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(allocationId).currentNodeId()))) {
@@ -1304,7 +1305,7 @@ public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpo
13041305
final CheckpointState cps = e.getValue();
13051306
if (cps.inSync
13061307
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
1307-
&& isPrimaryRelocation(e.getKey()) == false
1308+
&& shouldSkipReplicationTimer(e.getKey()) == false
13081309
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
13091310
&& cps.checkpointTimers.containsKey(latestReplicationCheckpoint)) {
13101311
cps.checkpointTimers.get(latestReplicationCheckpoint).start();
@@ -1332,7 +1333,7 @@ public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats
13321333
entry -> entry.getKey().equals(this.shardAllocationId) == false
13331334
&& entry.getValue().inSync
13341335
&& replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false
1335-
&& isPrimaryRelocation(entry.getKey()) == false
1336+
&& shouldSkipReplicationTimer(entry.getKey()) == false
13361337
/*Check if the current primary shard is migrating to remote and
13371338
all the other shard copies of the same index still hasn't completely moved over
13381339
to the remote enabled nodes. Ensures that:

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ public class IndicesService extends AbstractLifecycleComponent
361361
private final SearchRequestStats searchRequestStats;
362362
private final FileCache fileCache;
363363
private final CompositeIndexSettings compositeIndexSettings;
364+
private final Consumer<IndexShard> replicator;
364365

365366
@Override
366367
protected void doStart() {
@@ -397,7 +398,8 @@ public IndicesService(
397398
CacheService cacheService,
398399
RemoteStoreSettings remoteStoreSettings,
399400
FileCache fileCache,
400-
CompositeIndexSettings compositeIndexSettings
401+
CompositeIndexSettings compositeIndexSettings,
402+
Consumer<IndexShard> replicator
401403
) {
402404
this.settings = settings;
403405
this.threadPool = threadPool;
@@ -506,6 +508,7 @@ protected void closeInternal() {
506508
this.remoteStoreSettings = remoteStoreSettings;
507509
this.compositeIndexSettings = compositeIndexSettings;
508510
this.fileCache = fileCache;
511+
this.replicator = replicator;
509512
}
510513

511514
public IndicesService(
@@ -629,6 +632,7 @@ public IndicesService(
629632
cacheService,
630633
remoteStoreSettings,
631634
null,
635+
null,
632636
null
633637
);
634638
}
@@ -1046,7 +1050,8 @@ private synchronized IndexService createIndexService(
10461050
translogFactorySupplier,
10471051
this::getClusterDefaultRefreshInterval,
10481052
this.recoverySettings,
1049-
this.remoteStoreSettings
1053+
this.remoteStoreSettings,
1054+
replicator
10501055
);
10511056
}
10521057

0 commit comments

Comments
 (0)