Skip to content

Commit d8a19dc

Browse files
jed326Jay Deng
authored andcommitted
Use feature flagged settings map
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent 07aebc8 commit d8a19dc

8 files changed

Lines changed: 71 additions & 54 deletions

File tree

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,6 @@ public void apply(Settings value, Settings current, Settings previous) {
360360
ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
361361
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
362362
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
363-
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING,
364363
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
365364
TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING,
366365
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
@@ -669,6 +668,8 @@ public void apply(Settings value, Settings current, Settings previous) {
669668
IndicesService.CLUSTER_REMOTE_STORE_REPOSITORY_SETTING,
670669
IndicesService.CLUSTER_REMOTE_TRANSLOG_STORE_ENABLED_SETTING,
671670
IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING
672-
)
671+
),
672+
List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH),
673+
List.of(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
673674
);
674675
}

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
198198
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,
199199
IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY,
200200
IndexSettings.DEFAULT_SEARCH_PIPELINE,
201-
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING,
202201

203202
// Settings for Searchable Snapshots
204203
IndexSettings.SEARCHABLE_SNAPSHOT_REPOSITORY,
@@ -236,7 +235,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
236235
IndexMetadata.INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING,
237236
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING,
238237
IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
239-
)
238+
),
239+
FeatureFlags.CONCURRENT_SEGMENT_SEARCH,
240+
List.of(IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING)
240241
);
241242

242243
public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS);

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
import java.util.function.Function;
6363
import java.util.function.UnaryOperator;
6464

65-
import static org.opensearch.common.util.FeatureFlags.CONCURRENT_SEGMENT_SEARCH;
6665
import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
6766
import static org.opensearch.common.util.FeatureFlags.SEARCH_PIPELINE;
6867
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING;
@@ -673,7 +672,6 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
673672
private volatile long mappingTotalFieldsLimit;
674673
private volatile long mappingDepthLimit;
675674
private volatile long mappingFieldNameLengthLimit;
676-
private volatile boolean indexConcurrentSegmentSearchEnabled;
677675

678676
/**
679677
* The maximum number of refresh listeners allows on this shard.
@@ -838,7 +836,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
838836
mergeOnFlushEnabled = scopedSettings.get(INDEX_MERGE_ON_FLUSH_ENABLED);
839837
setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY));
840838
defaultSearchPipeline = scopedSettings.get(DEFAULT_SEARCH_PIPELINE);
841-
indexConcurrentSegmentSearchEnabled = scopedSettings.get(INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING);
842839

843840
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
844841
scopedSettings.addSettingsUpdateConsumer(
@@ -913,7 +910,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
913910
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled);
914911
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_POLICY, this::setMergeOnFlushPolicy);
915912
scopedSettings.addSettingsUpdateConsumer(DEFAULT_SEARCH_PIPELINE, this::setDefaultSearchPipeline);
916-
scopedSettings.addSettingsUpdateConsumer(INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING, this::setIndexConcurrentSegmentSearchEnabled);
917913
}
918914

919915
private void setSearchIdleAfter(TimeValue searchIdleAfter) {
@@ -1610,22 +1606,4 @@ public void setDefaultSearchPipeline(String defaultSearchPipeline) {
16101606
);
16111607
}
16121608
}
1613-
1614-
public void setIndexConcurrentSegmentSearchEnabled(boolean indexConcurrentSegmentSearchEnabled) {
1615-
if (FeatureFlags.isEnabled(CONCURRENT_SEGMENT_SEARCH)) {
1616-
this.indexConcurrentSegmentSearchEnabled = indexConcurrentSegmentSearchEnabled;
1617-
} else {
1618-
throw new SettingsException(
1619-
"Unable to update setting: "
1620-
+ INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey()
1621-
+ ". This is an experimental feature that is currently disabled, please enable the "
1622-
+ CONCURRENT_SEGMENT_SEARCH
1623-
+ " feature flag first."
1624-
);
1625-
}
1626-
}
1627-
1628-
public boolean isIndexConcurrentSegmentSearchEnabled() {
1629-
return indexConcurrentSegmentSearchEnabled;
1630-
}
16311609
}

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.opensearch.common.lucene.search.Queries;
4949
import org.opensearch.common.unit.TimeValue;
5050
import org.opensearch.common.util.BigArrays;
51+
import org.opensearch.common.util.FeatureFlags;
5152
import org.opensearch.core.common.lease.Releasables;
5253
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
5354
import org.opensearch.index.IndexService;
@@ -876,11 +877,11 @@ public Profilers getProfilers() {
876877
*/
877878
@Override
878879
public boolean isConcurrentSegmentSearchEnabled() {
879-
if (clusterService != null) {
880+
if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH) && (clusterService != null)) {
880881
return indexService.getIndexSettings()
881882
.getSettings()
882883
.getAsBoolean(
883-
"index.search.concurrent_segment_search.enabled",
884+
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(),
884885
clusterService.getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
885886
);
886887
} else {

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,9 @@
6161
import org.opensearch.common.settings.Setting;
6262
import org.opensearch.common.settings.Setting.Property;
6363
import org.opensearch.common.settings.Settings;
64-
import org.opensearch.common.settings.SettingsException;
6564
import org.opensearch.common.unit.TimeValue;
6665
import org.opensearch.common.util.BigArrays;
6766
import org.opensearch.common.util.CollectionUtils;
68-
import org.opensearch.common.util.FeatureFlags;
6967
import org.opensearch.common.util.concurrent.ConcurrentCollections;
7068
import org.opensearch.common.util.concurrent.ConcurrentMapLong;
7169
import org.opensearch.common.util.io.IOUtils;
@@ -159,7 +157,6 @@
159157
import static org.opensearch.common.unit.TimeValue.timeValueHours;
160158
import static org.opensearch.common.unit.TimeValue.timeValueMillis;
161159
import static org.opensearch.common.unit.TimeValue.timeValueMinutes;
162-
import static org.opensearch.common.util.FeatureFlags.CONCURRENT_SEGMENT_SEARCH;
163160

164161
/**
165162
* The main search service
@@ -293,7 +290,6 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
293290
private volatile int maxOpenScrollContext;
294291

295292
private volatile int maxOpenPitContext;
296-
private volatile boolean clusterConcurrentSegmentSearchEnabled;
297293

298294
private final Cancellable keepAliveReaper;
299295

@@ -366,10 +362,6 @@ public SearchService(
366362

367363
lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings);
368364
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);
369-
370-
clusterConcurrentSegmentSearchEnabled = CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.get(settings);
371-
clusterService.getClusterSettings()
372-
.addSettingsUpdateConsumer(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING, this::setClusterConcurrentSegmentSearchEnabled);
373365
}
374366

375367
private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
@@ -446,20 +438,6 @@ private void setLowLevelCancellation(Boolean lowLevelCancellation) {
446438
this.lowLevelCancellation = lowLevelCancellation;
447439
}
448440

449-
private void setClusterConcurrentSegmentSearchEnabled(boolean clusterConcurrentSegmentSearchEnabled) {
450-
if (FeatureFlags.isEnabled(CONCURRENT_SEGMENT_SEARCH)) {
451-
this.clusterConcurrentSegmentSearchEnabled = clusterConcurrentSegmentSearchEnabled;
452-
} else {
453-
throw new SettingsException(
454-
"Unable to update setting: "
455-
+ CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey()
456-
+ ". This is an experimental feature that is currently disabled, please enable the "
457-
+ CONCURRENT_SEGMENT_SEARCH
458-
+ " feature flag first."
459-
);
460-
}
461-
}
462-
463441
@Override
464442
public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRemovalReason reason) {
465443
// once an index is removed due to deletion or closing, we can just clean up all the pending search context information

server/src/main/java/org/opensearch/search/query/ConcurrentQueryPhaseSearcher.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
*/
3333
public class ConcurrentQueryPhaseSearcher extends DefaultQueryPhaseSearcher {
3434
private static final Logger LOGGER = LogManager.getLogger(ConcurrentQueryPhaseSearcher.class);
35+
private final AggregationProcessor aggregationProcessor = new ConcurrentAggregationProcessor();
3536

3637
/**
3738
* Default constructor
@@ -103,6 +104,11 @@ private static boolean searchWithCollectorManager(
103104
return topDocsFactory.shouldRescore();
104105
}
105106

107+
@Override
108+
public AggregationProcessor aggregationProcessor(SearchContext searchContext) {
109+
return aggregationProcessor;
110+
}
111+
106112
private static boolean allowConcurrentSegmentSearch(final ContextIndexSearcher searcher) {
107113
return (searcher.getExecutor() != null);
108114
}

server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.apache.lucene.search.Query;
1414
import org.opensearch.common.util.FeatureFlags;
1515
import org.opensearch.search.aggregations.AggregationProcessor;
16-
import org.opensearch.search.aggregations.ConcurrentAggregationProcessor;
1716
import org.opensearch.search.internal.ContextIndexSearcher;
1817
import org.opensearch.search.internal.SearchContext;
1918
import org.apache.lucene.search.CollectorManager;
@@ -59,7 +58,7 @@ public boolean searchWith(
5958
boolean hasTimeout
6059
) throws IOException {
6160
if (searchContext.isConcurrentSegmentSearchEnabled()) {
62-
LOGGER.debug("Using concurrent search over segments (experimental)");
61+
LOGGER.info("Using concurrent search over segments (experimental)");
6362
return concurrentQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
6463
} else {
6564
return defaultQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
@@ -74,8 +73,8 @@ public boolean searchWith(
7473
@Override
7574
public AggregationProcessor aggregationProcessor(SearchContext searchContext) {
7675
if (searchContext.isConcurrentSegmentSearchEnabled()) {
77-
LOGGER.debug("Using concurrent search over segments (experimental)");
78-
return new ConcurrentAggregationProcessor();
76+
LOGGER.info("Using concurrent search over segments (experimental)");
77+
return concurrentQueryPhaseSearcher.aggregationProcessor(searchContext);
7978
} else {
8079
return defaultQueryPhaseSearcher.aggregationProcessor(searchContext);
8180
}

server/src/test/java/org/opensearch/common/settings/SettingsModuleTests.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.opensearch.common.settings.Setting.Property;
3737
import org.hamcrest.Matchers;
3838
import org.opensearch.common.util.FeatureFlags;
39+
import org.opensearch.index.IndexSettings;
40+
import org.opensearch.search.SearchService;
3941
import org.opensearch.test.FeatureFlagSetter;
4042

4143
import java.util.Arrays;
@@ -282,4 +284,55 @@ public void testDynamicIndexSettingsRegistration() {
282284
() -> module.registerDynamicSetting(Setting.floatSetting("index.custom.setting2", 1.0f, Property.IndexScope))
283285
);
284286
}
287+
288+
public void testConcurrentSegmentSearchClusterSettings() {
289+
// Test that we throw an exception without the feature flag
290+
Settings settings = Settings.builder().put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build();
291+
SettingsException ex = expectThrows(SettingsException.class, () -> new SettingsModule(settings));
292+
assertEquals(
293+
"unknown setting ["
294+
+ SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey()
295+
+ "] please check that any required plugins are installed, or check the breaking "
296+
+ "changes documentation for removed settings",
297+
ex.getMessage()
298+
);
299+
300+
// Test that the settings updates correctly with the feature flag
301+
FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH);
302+
boolean settingValue = randomBoolean();
303+
Settings settingsWithFeatureFlag = Settings.builder()
304+
.put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), settingValue)
305+
.build();
306+
SettingsModule settingsModule = new SettingsModule(settingsWithFeatureFlag);
307+
assertEquals(settingValue, SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.get(settingsModule.getSettings()));
308+
}
309+
310+
public void testConcurrentSegmentSearchIndexSettings() {
311+
Settings.Builder target = Settings.builder().put(Settings.EMPTY);
312+
Settings.Builder update = Settings.builder();
313+
314+
// Test that we throw an exception without the feature flag
315+
SettingsModule module = new SettingsModule(Settings.EMPTY);
316+
IndexScopedSettings indexScopedSettings = module.getIndexScopedSettings();
317+
expectThrows(
318+
SettingsException.class,
319+
() -> indexScopedSettings.updateDynamicSettings(
320+
Settings.builder().put(IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(),
321+
target,
322+
update,
323+
"node"
324+
)
325+
);
326+
327+
// Test that the settings updates correctly with the feature flag
328+
FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH);
329+
SettingsModule moduleWithFeatureFlag = new SettingsModule(Settings.EMPTY);
330+
IndexScopedSettings indexScopedSettingsWithFeatureFlag = moduleWithFeatureFlag.getIndexScopedSettings();
331+
indexScopedSettingsWithFeatureFlag.updateDynamicSettings(
332+
Settings.builder().put(IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(),
333+
target,
334+
update,
335+
"node"
336+
);
337+
}
285338
}

0 commit comments

Comments
 (0)