Skip to content

Commit de25df2

Browse files
jed326Jay Deng
authored andcommitted
Add dynamic index and cluster setting for concurrent segment search (#7956)
* Add dynamic index and cluster setting for concurrent segment search Signed-off-by: Jay Deng <jayd0104@gmail.com> * Use feature flagged settings map Signed-off-by: Jay Deng <jayd0104@gmail.com> --------- Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent 78de94d commit de25df2

17 files changed

Lines changed: 333 additions & 30 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
88
- Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452))
99
- Add Remote store as a segment replication source ([#7653](https://github.com/opensearch-project/OpenSearch/pull/7653))
1010
- Implement concurrent aggregations support without profile option ([#7514](https://github.com/opensearch-project/OpenSearch/pull/7514))
11+
- Add dynamic index and cluster setting for concurrent segment search ([#7956](https://github.com/opensearch-project/OpenSearch/pull/7956))
1112

1213
### Dependencies
1314
- Bump `com.azure:azure-storage-common` from 12.21.0 to 12.21.1 (#7566, #7814)

distribution/src/config/opensearch.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,11 @@ ${path.logs}
130130
#
131131
# Gates the search pipeline feature. This feature enables configurable processors
132132
# for search requests and search responses, similar to ingest pipelines.
133+
#
133134
#opensearch.experimental.feature.search_pipeline.enabled: false
135+
#
136+
#
137+
# Gates the concurrent segment search feature. This feature enables concurrent segment search in a separate
138+
# index searcher threadpool.
139+
#
140+
#opensearch.experimental.feature.concurrent_segment_search.enabled: false

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,8 @@ public void apply(Settings value, Settings current, Settings previous) {
670670
IndicesService.CLUSTER_REMOTE_STORE_REPOSITORY_SETTING,
671671
IndicesService.CLUSTER_REMOTE_TRANSLOG_STORE_ENABLED_SETTING,
672672
IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING
673-
)
673+
),
674+
List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH),
675+
List.of(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
674676
);
675677
}

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
235235
IndexMetadata.INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING,
236236
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING,
237237
IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
238-
)
238+
),
239+
FeatureFlags.CONCURRENT_SEGMENT_SEARCH,
240+
List.of(IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING)
239241
);
240242

241243
public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS);

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,13 @@ public final class IndexSettings {
589589
Property.IndexScope
590590
);
591591

592+
public static final Setting<Boolean> INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING = Setting.boolSetting(
593+
"index.search.concurrent_segment_search.enabled",
594+
false,
595+
Property.IndexScope,
596+
Property.Dynamic
597+
);
598+
592599
private final Index index;
593600
private final Version version;
594601
private final Logger logger;
@@ -1590,7 +1597,13 @@ public void setDefaultSearchPipeline(String defaultSearchPipeline) {
15901597
if (FeatureFlags.isEnabled(SEARCH_PIPELINE)) {
15911598
this.defaultSearchPipeline = defaultSearchPipeline;
15921599
} else {
1593-
throw new SettingsException("Unsupported setting: " + DEFAULT_SEARCH_PIPELINE.getKey());
1600+
throw new SettingsException(
1601+
"Unable to update setting: "
1602+
+ DEFAULT_SEARCH_PIPELINE.getKey()
1603+
+ ". This is an experimental feature that is currently disabled, please enable the "
1604+
+ SEARCH_PIPELINE
1605+
+ " feature flag first."
1606+
);
15941607
}
15951608
}
15961609
}

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.opensearch.common.lucene.search.Queries;
5050
import org.opensearch.common.unit.TimeValue;
5151
import org.opensearch.common.util.BigArrays;
52+
import org.opensearch.common.util.FeatureFlags;
5253
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
5354
import org.opensearch.index.IndexService;
5455
import org.opensearch.index.IndexSettings;
@@ -104,6 +105,8 @@
104105
import java.util.function.Function;
105106
import java.util.function.LongSupplier;
106107

108+
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
109+
107110
/**
108111
* The main search context used during search phase
109112
*
@@ -869,6 +872,25 @@ public Profilers getProfilers() {
869872
return profilers;
870873
}
871874

875+
/**
876+
* Returns concurrent segment search status for the search context
877+
*/
878+
@Override
879+
public boolean isConcurrentSegmentSearchEnabled() {
880+
if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)
881+
&& (clusterService != null)
882+
&& (searcher().getExecutor() != null)) {
883+
return indexService.getIndexSettings()
884+
.getSettings()
885+
.getAsBoolean(
886+
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(),
887+
clusterService.getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
888+
);
889+
} else {
890+
return false;
891+
}
892+
}
893+
872894
public void setProfilers(Profilers profilers) {
873895
this.profilers = profilers;
874896
}

server/src/main/java/org/opensearch/search/SearchModule.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,9 @@
273273
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
274274
import org.opensearch.search.fetch.subphase.highlight.PlainHighlighter;
275275
import org.opensearch.search.fetch.subphase.highlight.UnifiedHighlighter;
276-
import org.opensearch.search.query.ConcurrentQueryPhaseSearcher;
277276
import org.opensearch.search.query.QueryPhase;
278277
import org.opensearch.search.query.QueryPhaseSearcher;
278+
import org.opensearch.search.query.QueryPhaseSearcherWrapper;
279279
import org.opensearch.search.rescore.QueryRescorerBuilder;
280280
import org.opensearch.search.rescore.RescorerBuilder;
281281
import org.opensearch.search.sort.FieldSortBuilder;
@@ -1294,8 +1294,8 @@ private QueryPhaseSearcher registerQueryPhaseSearcher(List<SearchPlugin> plugins
12941294
}
12951295
}
12961296

1297-
if (searcher == null && FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) {
1298-
searcher = new ConcurrentQueryPhaseSearcher();
1297+
if (searcher == null) {
1298+
searcher = new QueryPhaseSearcherWrapper();
12991299
}
13001300
return searcher;
13011301
}
@@ -1326,14 +1326,7 @@ public FetchPhase getFetchPhase() {
13261326
}
13271327

13281328
public QueryPhase getQueryPhase() {
1329-
QueryPhase queryPhase;
1330-
if (queryPhaseSearcher == null) {
1331-
// use the defaults
1332-
queryPhase = new QueryPhase();
1333-
} else {
1334-
queryPhase = new QueryPhase(queryPhaseSearcher);
1335-
}
1336-
return queryPhase;
1329+
return new QueryPhase(queryPhaseSearcher);
13371330
}
13381331

13391332
public @Nullable ExecutorService getIndexSearcherExecutor(ThreadPool pool) {

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
248248
Property.NodeScope
249249
);
250250

251+
public static final Setting<Boolean> CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING = Setting.boolSetting(
252+
"search.concurrent_segment_search.enabled",
253+
true,
254+
Property.Dynamic,
255+
Property.NodeScope
256+
);
257+
251258
public static final int DEFAULT_SIZE = 10;
252259
public static final int DEFAULT_FROM = 0;
253260

server/src/main/java/org/opensearch/search/internal/SearchContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,13 @@ public final void assignRescoreDocIds(RescoreDocIds rescoreDocIds) {
366366
*/
367367
public abstract Profilers getProfilers();
368368

369+
/**
370+
* Returns concurrent segment search status for the search context
371+
*/
372+
public boolean isConcurrentSegmentSearchEnabled() {
373+
return false;
374+
}
375+
369376
/**
370377
* Adds a releasable that will be freed when this context is closed.
371378
*/

server/src/main/java/org/opensearch/search/query/ConcurrentQueryPhaseSearcher.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,7 @@ protected boolean searchWithCollector(
4848
boolean hasFilterCollector,
4949
boolean hasTimeout
5050
) throws IOException {
51-
boolean couldUseConcurrentSegmentSearch = allowConcurrentSegmentSearch(searcher);
52-
53-
if (couldUseConcurrentSegmentSearch) {
54-
LOGGER.debug("Using concurrent search over index segments (experimental)");
55-
return searchWithCollectorManager(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
56-
} else {
57-
return super.searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
58-
}
51+
return searchWithCollectorManager(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
5952
}
6053

6154
private static boolean searchWithCollectorManager(
@@ -108,9 +101,4 @@ private static boolean searchWithCollectorManager(
108101
public AggregationProcessor aggregationProcessor(SearchContext searchContext) {
109102
return aggregationProcessor;
110103
}
111-
112-
private static boolean allowConcurrentSegmentSearch(final ContextIndexSearcher searcher) {
113-
return (searcher.getExecutor() != null);
114-
}
115-
116104
}

0 commit comments

Comments
 (0)