From 04692db7214ba80271244d575db0ca08e03327dd Mon Sep 17 00:00:00 2001 From: Manik Garg Date: Thu, 17 Apr 2025 12:55:43 +0530 Subject: [PATCH] Enabled Async Shard Batch Fetch by default Signed-off-by: Manik Garg --- CHANGELOG.md | 2 +- .../indices/recovery/IndexRecoveryIT.java | 62 +++++++++---------- .../allocation/ExistingShardsAllocator.java | 14 ++--- .../gateway/ShardsBatchGatewayAllocator.java | 5 +- .../allocation/AllocationServiceTests.java | 4 ++ .../opensearch/index/SearchSlowLogTests.java | 7 ++- 6 files changed, 49 insertions(+), 45 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e1a670a87b85d..3078a1a34441b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement parallel shard refresh behind cluster settings ([#17782](https://github.com/opensearch-project/OpenSearch/pull/17782)) - Bump OpenSearch Core main branch to 3.0.0 ([#18039](https://github.com/opensearch-project/OpenSearch/pull/18039)) - Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/)) - +- Enabled Async Shard Batch Fetch by default ([#17987](https://github.com/opensearch-project/OpenSearch/pull/17987)) ### Changed - Change the default max header size from 8KB to 16KB. ([#18024](https://github.com/opensearch-project/OpenSearch/pull/18024)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java index 9d893cb6f33c7..da0bce5cfa144 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java @@ -1505,42 +1505,40 @@ public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception { allowToCompletePhase1Latch.await(); } catch (InterruptedException e) { throw new AssertionError(e); + } finally { + blockRecovery.release(); } } connection.sendRequest(requestId, action, request, options); }); - try { - String nodeWithReplica = internalCluster().startDataOnlyNode(); - assertAcked( - client().admin() - .indices() - .prepareUpdateSettings(indexName) - .setSettings( - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) - .put("index.routing.allocation.include._name", nodeWithPrimary + "," + nodeWithReplica) - ) - ); - phase1ReadyBlocked.await(); - internalCluster().restartNode( - clusterService().state().nodes().getClusterManagerNode().getName(), - new InternalTestCluster.RestartCallback() - ); - internalCluster().ensureAtLeastNumDataNodes(3); - assertAcked( - client().admin() - .indices() - .prepareUpdateSettings(indexName) - .setSettings( - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) - .putNull("index.routing.allocation.include._name") - ) - ); - assertFalse(client().admin().cluster().prepareHealth(indexName).setWaitForActiveShards(2).get().isTimedOut()); - } finally { - allowToCompletePhase1Latch.countDown(); - } + String nodeWithReplica = internalCluster().startDataOnlyNode(); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.routing.allocation.include._name", nodeWithPrimary + "," + nodeWithReplica) + ) + ); + phase1ReadyBlocked.await(); + internalCluster().restartNode( + clusterService().state().nodes().getClusterManagerNode().getName(), + new InternalTestCluster.RestartCallback() + ); + internalCluster().ensureAtLeastNumDataNodes(3); + + allowToCompletePhase1Latch.countDown(); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).putNull("index.routing.allocation.include._name") + ) + ); + assertFalse(client().admin().cluster().prepareHealth(indexName).setWaitForActiveShards(2).get().isTimedOut()); ensureGreen(indexName); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java index eb7a1e7209c37..893376abf07c1 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java @@ -64,21 +64,19 @@ public interface ExistingShardsAllocator { /** * Boolean setting to enable/disable batch allocation of unassigned shards already existing on disk. - * This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate + * This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate * in one or more go. - * - * Enable this setting if your ExistingShardAllocator is implementing the + *

+ * This setting is enabled by default. In your ExistingShardAllocator implement the * {@link ExistingShardsAllocator#allocateAllUnassignedShards(RoutingAllocation, boolean)} method. * The default implementation of this method is not optimized and assigns shards one by one. - * - * If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be use for it , i.e, + *

+ * If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be used for it , i.e, * {@link ShardsBatchGatewayAllocator}. - * - * This setting is experimental at this point. */ Setting EXISTING_SHARDS_ALLOCATOR_BATCH_MODE = Setting.boolSetting( "cluster.allocator.existing_shards_allocator.batch_enabled", - false, + true, Setting.Property.NodeScope ); diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 82229f244239f..ce14cb3442f75 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -85,6 +85,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { private TimeValue replicaShardsBatchGatewayAllocatorTimeout; private volatile Priority followUpRerouteTaskPriority; public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20); + public static final TimeValue DEFAULT_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20); private final ClusterManagerMetrics clusterManagerMetrics; /** @@ -105,7 +106,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { */ public static final Setting PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting( PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, - TimeValue.MINUS_ONE, + DEFAULT_ALLOCATOR_TIMEOUT, TimeValue.MINUS_ONE, new Setting.Validator<>() { @Override @@ -129,7 +130,7 @@ public void validate(TimeValue timeValue) { */ public static final Setting REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting( REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, - TimeValue.MINUS_ONE, + DEFAULT_ALLOCATOR_TIMEOUT, TimeValue.MINUS_ONE, new Setting.Validator<>() { @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java index b1f4b45bb2441..d3ac8b3c198f1 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java @@ -59,12 +59,14 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.gateway.GatewayAllocator; +import org.opensearch.gateway.ShardsBatchGatewayAllocator; import org.opensearch.snapshots.EmptySnapshotsInfoService; import org.opensearch.telemetry.metrics.Histogram; import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.gateway.TestGatewayAllocator; +import org.opensearch.test.gateway.TestShardBatchGatewayAllocator; import java.util.Arrays; import java.util.Collections; @@ -192,8 +194,10 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing final String unrealisticAllocatorName = "unrealistic"; final Map allocatorMap = new HashMap<>(); final TestGatewayAllocator testGatewayAllocator = new TestGatewayAllocator(); + final TestShardBatchGatewayAllocator testShardBatchGatewayAllocator = new TestShardBatchGatewayAllocator(); allocatorMap.put(GatewayAllocator.ALLOCATOR_NAME, testGatewayAllocator); allocatorMap.put(unrealisticAllocatorName, new UnrealisticAllocator()); + allocatorMap.put(ShardsBatchGatewayAllocator.ALLOCATOR_NAME, testShardBatchGatewayAllocator); allocationService.setExistingShardsAllocators(allocatorMap); final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); diff --git a/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java b/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java index 0c0dec29c9dbf..c681fb077efe0 100644 --- a/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java +++ b/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java @@ -211,7 +211,10 @@ public void testTwoLoggersDifferentLevel() { public void testMultipleSlowLoggersUseSingleLog4jLogger() { LoggerContext context = (LoggerContext) LogManager.getContext(false); - SearchContext ctx1 = searchContextWithSourceAndTask(createIndex("index-1")); + IndexService index1 = createIndex("index-1"); + IndexService index2 = createIndex("index-2"); + + SearchContext ctx1 = searchContextWithSourceAndTask(index1); IndexSettings settings1 = new IndexSettings( createIndexMetadata(SlowLogLevel.WARN, "index-1", UUIDs.randomBase64UUID()), Settings.EMPTY @@ -219,7 +222,7 @@ public void testMultipleSlowLoggersUseSingleLog4jLogger() { SearchSlowLog log1 = new SearchSlowLog(settings1); int numberOfLoggersBefore = context.getLoggers().size(); - SearchContext ctx2 = searchContextWithSourceAndTask(createIndex("index-2")); + SearchContext ctx2 = searchContextWithSourceAndTask(index2); IndexSettings settings2 = new IndexSettings( createIndexMetadata(SlowLogLevel.TRACE, "index-2", UUIDs.randomBase64UUID()), Settings.EMPTY