Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))
- Add support for ignoring missing Javadoc on generated code using annotation ([#7604](https://github.com/opensearch-project/OpenSearch/pull/7604))
- Implement concurrent aggregations support without profile option ([#7514](https://github.com/opensearch-project/OpenSearch/pull/7514))
- Add dynamic index and cluster setting for concurrent segment search ([#7956](https://github.com/opensearch-project/OpenSearch/pull/7956))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
7 changes: 7 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,11 @@ ${path.logs}
#
# Gates the search pipeline feature. This feature enables configurable processors
# for search requests and search responses, similar to ingest pipelines.
#
#opensearch.experimental.feature.search_pipeline.enabled: false
#
#
# Gates the concurrent segment search feature. This feature enables concurrent segment search in a separate
# index searcher threadpool.
#
#opensearch.experimental.feature.concurrent_segment_search.enabled: false
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,8 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesService.CLUSTER_REMOTE_STORE_REPOSITORY_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_STORE_ENABLED_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING
)
),
List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH),
List.of(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
)
),
FeatureFlags.CONCURRENT_SEGMENT_SEARCH,
List.of(IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING)
);

public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS);
Expand Down
15 changes: 14 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,13 @@ public final class IndexSettings {
Property.IndexScope
);

public static final Setting<Boolean> INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING = Setting.boolSetting(
"index.search.concurrent_segment_search.enabled",
false,
Property.IndexScope,
Property.Dynamic
);

private final Index index;
private final Version version;
private final Logger logger;
Expand Down Expand Up @@ -1590,7 +1597,13 @@ public void setDefaultSearchPipeline(String defaultSearchPipeline) {
if (FeatureFlags.isEnabled(SEARCH_PIPELINE)) {
this.defaultSearchPipeline = defaultSearchPipeline;
} else {
throw new SettingsException("Unsupported setting: " + DEFAULT_SEARCH_PIPELINE.getKey());
throw new SettingsException(
"Unable to update setting: "
+ DEFAULT_SEARCH_PIPELINE.getKey()
+ ". This is an experimental feature that is currently disabled, please enable the "
+ SEARCH_PIPELINE
+ " feature flag first."
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.lease.Releasables;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.IndexService;
Expand Down Expand Up @@ -104,6 +105,8 @@
import java.util.function.Function;
import java.util.function.LongSupplier;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;

/**
* The main search context used during search phase
*
Expand Down Expand Up @@ -869,6 +872,25 @@ public Profilers getProfilers() {
return profilers;
}

/**
* Returns concurrent segment search status for the search context
*/
@Override
public boolean isConcurrentSegmentSearchEnabled() {
if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)
&& (clusterService != null)
&& (searcher().getExecutor() != null)) {
return indexService.getIndexSettings()
.getSettings()
.getAsBoolean(
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(),
clusterService.getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
} else {
return false;
}
}

public void setProfilers(Profilers profilers) {
this.profilers = profilers;
}
Expand Down
15 changes: 4 additions & 11 deletions server/src/main/java/org/opensearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
import org.opensearch.search.fetch.subphase.highlight.PlainHighlighter;
import org.opensearch.search.fetch.subphase.highlight.UnifiedHighlighter;
import org.opensearch.search.query.ConcurrentQueryPhaseSearcher;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QueryPhaseSearcher;
import org.opensearch.search.query.QueryPhaseSearcherWrapper;
import org.opensearch.search.rescore.QueryRescorerBuilder;
import org.opensearch.search.rescore.RescorerBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
Expand Down Expand Up @@ -1258,8 +1258,8 @@ private QueryPhaseSearcher registerQueryPhaseSearcher(List<SearchPlugin> plugins
}
}

if (searcher == null && FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) {
searcher = new ConcurrentQueryPhaseSearcher();
if (searcher == null) {
searcher = new QueryPhaseSearcherWrapper();
}
return searcher;
}
Expand Down Expand Up @@ -1290,14 +1290,7 @@ public FetchPhase getFetchPhase() {
}

public QueryPhase getQueryPhase() {
QueryPhase queryPhase;
if (queryPhaseSearcher == null) {
// use the defaults
queryPhase = new QueryPhase();
} else {
queryPhase = new QueryPhase(queryPhaseSearcher);
}
return queryPhase;
return new QueryPhase(queryPhaseSearcher);
}

public @Nullable ExecutorService getIndexSearcherExecutor(ThreadPool pool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope
);

public static final Setting<Boolean> CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING = Setting.boolSetting(
"search.concurrent_segment_search.enabled",
true,
Property.Dynamic,
Property.NodeScope
);

public static final int DEFAULT_SIZE = 10;
public static final int DEFAULT_FROM = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,13 @@ public final void assignRescoreDocIds(RescoreDocIds rescoreDocIds) {
*/
public abstract Profilers getProfilers();

/**
* Returns concurrent segment search status for the search context
*/
public boolean isConcurrentSegmentSearchEnabled() {
return false;
}

/**
* Adds a releasable that will be freed when this context is closed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,7 @@ protected boolean searchWithCollector(
boolean hasFilterCollector,
boolean hasTimeout
) throws IOException {
boolean couldUseConcurrentSegmentSearch = allowConcurrentSegmentSearch(searcher);

if (couldUseConcurrentSegmentSearch) {
LOGGER.debug("Using concurrent search over index segments (experimental)");
return searchWithCollectorManager(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
} else {
return super.searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
}
return searchWithCollectorManager(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
}

private static boolean searchWithCollectorManager(
Expand Down Expand Up @@ -108,9 +101,4 @@ private static boolean searchWithCollectorManager(
public AggregationProcessor aggregationProcessor(SearchContext searchContext) {
return aggregationProcessor;
}

private static boolean allowConcurrentSegmentSearch(final ContextIndexSearcher searcher) {
return (searcher.getExecutor() != null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.query;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.Query;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.search.aggregations.AggregationProcessor;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.SearchContext;
import org.apache.lucene.search.CollectorManager;

import java.io.IOException;
import java.util.LinkedList;

/**
* Wrapper class for QueryPhaseSearcher that handles path selection for concurrent vs
* non-concurrent search query phase and aggregation processor.
*
* @opensearch.internal
*/
public class QueryPhaseSearcherWrapper implements QueryPhaseSearcher {
private static final Logger LOGGER = LogManager.getLogger(QueryPhaseSearcherWrapper.class);
private final QueryPhaseSearcher defaultQueryPhaseSearcher;
private final QueryPhaseSearcher concurrentQueryPhaseSearcher;

public QueryPhaseSearcherWrapper() {
this.defaultQueryPhaseSearcher = new QueryPhase.DefaultQueryPhaseSearcher();
this.concurrentQueryPhaseSearcher = FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)
? new ConcurrentQueryPhaseSearcher()
: null;
}

/**
* Perform search using {@link CollectorManager}
*
* @param searchContext search context
* @param searcher context index searcher
* @param query query
* @param hasTimeout "true" if timeout was set, "false" otherwise
* @return is rescoring required or not
* @throws java.io.IOException IOException
*/
@Override
public boolean searchWith(
SearchContext searchContext,
ContextIndexSearcher searcher,
Query query,
LinkedList<QueryCollectorContext> collectors,
boolean hasFilterCollector,
boolean hasTimeout
) throws IOException {
if (searchContext.isConcurrentSegmentSearchEnabled()) {
LOGGER.info("Using concurrent search over segments (experimental)");
return concurrentQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
} else {
return defaultQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
}
}

/**
* {@link AggregationProcessor} to use to setup and post process aggregation related collectors during search request
* @param searchContext search context
* @return {@link AggregationProcessor} to use
*/
@Override
public AggregationProcessor aggregationProcessor(SearchContext searchContext) {
if (searchContext.isConcurrentSegmentSearchEnabled()) {
LOGGER.info("Using concurrent search over segments (experimental)");
return concurrentQueryPhaseSearcher.aggregationProcessor(searchContext);
} else {
return defaultQueryPhaseSearcher.aggregationProcessor(searchContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.opensearch.common.settings.Setting.Property;
import org.hamcrest.Matchers;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexSettings;
import org.opensearch.search.SearchService;
import org.opensearch.test.FeatureFlagSetter;

import java.util.Arrays;
Expand Down Expand Up @@ -282,4 +284,55 @@ public void testDynamicIndexSettingsRegistration() {
() -> module.registerDynamicSetting(Setting.floatSetting("index.custom.setting2", 1.0f, Property.IndexScope))
);
}

public void testConcurrentSegmentSearchClusterSettings() {
// Test that we throw an exception without the feature flag
Settings settings = Settings.builder().put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build();
SettingsException ex = expectThrows(SettingsException.class, () -> new SettingsModule(settings));
assertEquals(
"unknown setting ["
+ SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey()
+ "] please check that any required plugins are installed, or check the breaking "
+ "changes documentation for removed settings",
ex.getMessage()
);

// Test that the settings updates correctly with the feature flag
FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH);
boolean settingValue = randomBoolean();
Settings settingsWithFeatureFlag = Settings.builder()
.put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), settingValue)
.build();
SettingsModule settingsModule = new SettingsModule(settingsWithFeatureFlag);
assertEquals(settingValue, SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.get(settingsModule.getSettings()));
}

public void testConcurrentSegmentSearchIndexSettings() {
Settings.Builder target = Settings.builder().put(Settings.EMPTY);
Settings.Builder update = Settings.builder();

// Test that we throw an exception without the feature flag
SettingsModule module = new SettingsModule(Settings.EMPTY);
IndexScopedSettings indexScopedSettings = module.getIndexScopedSettings();
expectThrows(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SettingsException.class,
() -> indexScopedSettings.updateDynamicSettings(
Settings.builder().put(IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(),
target,
update,
"node"
)
);

// Test that the settings updates correctly with the feature flag
FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH);
SettingsModule moduleWithFeatureFlag = new SettingsModule(Settings.EMPTY);
IndexScopedSettings indexScopedSettingsWithFeatureFlag = moduleWithFeatureFlag.getIndexScopedSettings();
indexScopedSettingsWithFeatureFlag.updateDynamicSettings(
Settings.builder().put(IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(),
target,
update,
"node"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.search.query.ConcurrentQueryPhaseSearcher;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QueryPhaseSearcher;
import org.opensearch.search.query.QueryPhaseSearcherWrapper;
import org.opensearch.search.rescore.QueryRescorerBuilder;
import org.opensearch.search.rescore.RescoreContext;
import org.opensearch.search.rescore.RescorerBuilder;
Expand Down Expand Up @@ -425,7 +426,7 @@ public void testDefaultQueryPhaseSearcher() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList());
TestSearchContext searchContext = new TestSearchContext(null);
QueryPhase queryPhase = searchModule.getQueryPhase();
assertTrue(queryPhase.getQueryPhaseSearcher() instanceof QueryPhase.DefaultQueryPhaseSearcher);
assertTrue(queryPhase.getQueryPhaseSearcher() instanceof QueryPhaseSearcherWrapper);
assertTrue(queryPhase.getQueryPhaseSearcher().aggregationProcessor(searchContext) instanceof DefaultAggregationProcessor);
}

Expand All @@ -434,8 +435,9 @@ public void testConcurrentQueryPhaseSearcher() {
FeatureFlags.initializeFeatureFlags(settings);
SearchModule searchModule = new SearchModule(settings, Collections.emptyList());
TestSearchContext searchContext = new TestSearchContext(null);
searchContext.setConcurrentSegmentSearchEnabled(true);
QueryPhase queryPhase = searchModule.getQueryPhase();
assertTrue(queryPhase.getQueryPhaseSearcher() instanceof ConcurrentQueryPhaseSearcher);
assertTrue(queryPhase.getQueryPhaseSearcher() instanceof QueryPhaseSearcherWrapper);
assertTrue(queryPhase.getQueryPhaseSearcher().aggregationProcessor(searchContext) instanceof ConcurrentAggregationProcessor);
FeatureFlags.initializeFeatureFlags(Settings.EMPTY);
}
Expand Down
Loading