Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement parallel shard refresh behind cluster settings ([#17782](https://github.com/opensearch-project/OpenSearch/pull/17782))
- Bump OpenSearch Core main branch to 3.0.0 ([#18039](https://github.com/opensearch-project/OpenSearch/pull/18039))
- Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/))

- Enabled Async Shard Batch Fetch by default ([#17987](https://github.com/opensearch-project/OpenSearch/pull/17987))

### Changed
- Change the default max header size from 8KB to 16KB. ([#18024](https://github.com/opensearch-project/OpenSearch/pull/18024))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1505,42 +1505,40 @@ public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception {
allowToCompletePhase1Latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
} finally {
blockRecovery.release();
}
}
connection.sendRequest(requestId, action, request, options);
});
try {
String nodeWithReplica = internalCluster().startDataOnlyNode();
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.routing.allocation.include._name", nodeWithPrimary + "," + nodeWithReplica)
)
);
phase1ReadyBlocked.await();
internalCluster().restartNode(
clusterService().state().nodes().getClusterManagerNode().getName(),
new InternalTestCluster.RestartCallback()
);
internalCluster().ensureAtLeastNumDataNodes(3);
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.putNull("index.routing.allocation.include._name")
)
);
assertFalse(client().admin().cluster().prepareHealth(indexName).setWaitForActiveShards(2).get().isTimedOut());
} finally {
allowToCompletePhase1Latch.countDown();
}
String nodeWithReplica = internalCluster().startDataOnlyNode();
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.routing.allocation.include._name", nodeWithPrimary + "," + nodeWithReplica)
)
);
phase1ReadyBlocked.await();
internalCluster().restartNode(
clusterService().state().nodes().getClusterManagerNode().getName(),
new InternalTestCluster.RestartCallback()
);
internalCluster().ensureAtLeastNumDataNodes(3);

allowToCompletePhase1Latch.countDown();
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).putNull("index.routing.allocation.include._name")
)
);
assertFalse(client().admin().cluster().prepareHealth(indexName).setWaitForActiveShards(2).get().isTimedOut());
ensureGreen(indexName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,19 @@ public interface ExistingShardsAllocator {

/**
* Boolean setting to enable/disable batch allocation of unassigned shards already existing on disk.
* This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate
* This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate
* in one or more go.
*
* Enable this setting if your ExistingShardAllocator is implementing the
* <p>
* This setting is enabled by default. In your ExistingShardAllocator implement the
* {@link ExistingShardsAllocator#allocateAllUnassignedShards(RoutingAllocation, boolean)} method.
* The default implementation of this method is not optimized and assigns shards one by one.
*
* If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be use for it , i.e,
* <p>
* If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be used for it , i.e,
* {@link ShardsBatchGatewayAllocator}.
*
* This setting is experimental at this point.
*/
Setting<Boolean> EXISTING_SHARDS_ALLOCATOR_BATCH_MODE = Setting.boolSetting(
"cluster.allocator.existing_shards_allocator.batch_enabled",
false,
true,
Setting.Property.NodeScope
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
private TimeValue replicaShardsBatchGatewayAllocatorTimeout;
private volatile Priority followUpRerouteTaskPriority;
public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20);
public static final TimeValue DEFAULT_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20);
private final ClusterManagerMetrics clusterManagerMetrics;

/**
Expand All @@ -105,7 +106,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
*/
public static final Setting<TimeValue> PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting(
PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY,
TimeValue.MINUS_ONE,
DEFAULT_ALLOCATOR_TIMEOUT,
TimeValue.MINUS_ONE,
new Setting.Validator<>() {
@Override
Expand All @@ -129,7 +130,7 @@ public void validate(TimeValue timeValue) {
*/
public static final Setting<TimeValue> REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting(
REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY,
TimeValue.MINUS_ONE,
DEFAULT_ALLOCATOR_TIMEOUT,
TimeValue.MINUS_ONE,
new Setting.Validator<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.snapshots.EmptySnapshotsInfoService;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.gateway.TestGatewayAllocator;
import org.opensearch.test.gateway.TestShardBatchGatewayAllocator;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -192,8 +194,10 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
final String unrealisticAllocatorName = "unrealistic";
final Map<String, ExistingShardsAllocator> allocatorMap = new HashMap<>();
final TestGatewayAllocator testGatewayAllocator = new TestGatewayAllocator();
final TestShardBatchGatewayAllocator testShardBatchGatewayAllocator = new TestShardBatchGatewayAllocator();
allocatorMap.put(GatewayAllocator.ALLOCATOR_NAME, testGatewayAllocator);
allocatorMap.put(unrealisticAllocatorName, new UnrealisticAllocator());
allocatorMap.put(ShardsBatchGatewayAllocator.ALLOCATOR_NAME, testShardBatchGatewayAllocator);
allocationService.setExistingShardsAllocators(allocatorMap);

final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,18 @@ public void testTwoLoggersDifferentLevel() {
public void testMultipleSlowLoggersUseSingleLog4jLogger() {
LoggerContext context = (LoggerContext) LogManager.getContext(false);

SearchContext ctx1 = searchContextWithSourceAndTask(createIndex("index-1"));
IndexService index1 = createIndex("index-1");
IndexService index2 = createIndex("index-2");

SearchContext ctx1 = searchContextWithSourceAndTask(index1);
IndexSettings settings1 = new IndexSettings(
createIndexMetadata(SlowLogLevel.WARN, "index-1", UUIDs.randomBase64UUID()),
Settings.EMPTY
);
SearchSlowLog log1 = new SearchSlowLog(settings1);
int numberOfLoggersBefore = context.getLoggers().size();

SearchContext ctx2 = searchContextWithSourceAndTask(createIndex("index-2"));
SearchContext ctx2 = searchContextWithSourceAndTask(index2);
IndexSettings settings2 = new IndexSettings(
createIndexMetadata(SlowLogLevel.TRACE, "index-2", UUIDs.randomBase64UUID()),
Settings.EMPTY
Expand Down
Loading