Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add tracing support for StreamingRestChannel ([#20361](https://github.com/opensearch-project/OpenSearch/pull/20361))
- Introduce new libs/netty4 module to share common implementation between netty-based plugins and modules (transport-netty4, transport-reactor-netty4) ([#20447](https://github.com/opensearch-project/OpenSearch/pull/20447))
- Add validation to make crypto store settings immutable ([#20123](https://github.com/opensearch-project/OpenSearch/pull/20123))
- Adds search support for context aware segments ([#19558](https://github.com/opensearch-project/OpenSearch/pull/19558))
- Introduce concurrent translog recovery to accelerate segment replication primary promotion ([#20251](https://github.com/opensearch-project/OpenSearch/pull/20251))
- Update to `almalinux:10` ([#20482](https://github.com/opensearch-project/OpenSearch/pull/20482))

Expand Down

Large diffs are not rendered by default.

32 changes: 28 additions & 4 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -753,10 +754,18 @@ public final SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searche
return acquireSearcherSupplier(wrapper, SearcherScope.EXTERNAL);
}

public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {
return acquireSearcherSupplier(wrapper, scope, Optional.empty());
}

/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {
public SearcherSupplier acquireSearcherSupplier(
Function<Searcher, Searcher> wrapper,
SearcherScope scope,
Optional<Set<String>> criteria
) throws EngineException {
/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
Expand All @@ -766,7 +775,13 @@ public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wra
Releasable releasable = store::decRef;
try {
ReferenceManager<OpenSearchDirectoryReader> referenceManager = getReferenceManager(scope);
OpenSearchDirectoryReader acquire = referenceManager.acquire();
OpenSearchDirectoryReader acquire;
assert criteria != null;
if (criteria.isPresent() && !criteria.get().isEmpty()) {
acquire = referenceManager.acquire().getCriteriaBasedReader(criteria.get());
} else {
acquire = referenceManager.acquire();
}
SearcherSupplier reader = new SearcherSupplier(wrapper) {
@Override
public Searcher acquireSearcherInternal(String source) {
Expand Down Expand Up @@ -814,13 +829,22 @@ public final Searcher acquireSearcher(String source) throws EngineException {
}

public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException {
return acquireSearcher(source, scope, Function.identity());
return acquireSearcher(source, scope, Function.identity(), Optional.empty());
}

public Searcher acquireSearcher(String source, SearcherScope scope, Function<Searcher, Searcher> wrapper) throws EngineException {
return acquireSearcher(source, scope, wrapper, Optional.empty());
}

public Searcher acquireSearcher(
String source,
SearcherScope scope,
Function<Searcher, Searcher> wrapper,
Optional<Set<String>> contextAwareGroupingCriteria
) throws EngineException {
SearcherSupplier releasable = null;
try {
SearcherSupplier reader = releasable = acquireSearcherSupplier(wrapper, scope);
SearcherSupplier reader = releasable = acquireSearcherSupplier(wrapper, scope, contextAwareGroupingCriteria);
Searcher searcher = reader.acquireSearcher(source);
releasable = null;
return new Searcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.script.ContextAwareGroupingScript;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -104,7 +105,7 @@ private static ContextAwareGroupingFieldMapper toType(FieldMapper in) {
}

Script s = Script.parse(o);
if (!s.getLang().equals(DEFAULT_SCRIPT_LANG)) {
if (s.getType() != ScriptType.STORED && !s.getLang().equals(DEFAULT_SCRIPT_LANG)) {
throw new MapperParsingException("context_aware_grouping only supports painless script");
}
return s;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.query;

/**
* Interface for query builders that contain a filter component. This interface enables
* context-aware searching by allowing plugin queries to expose their filter queries
* for criteria extraction.
*
* <p>Query builders implementing this interface indicate they contain a filter query
* that should be considered during context-aware criteria extraction. This allows
* custom query types to participate in segment-level filtering optimizations.
*
* @opensearch.internal
*/
public interface FilterAwareQueryBuilder {

/**
* Returns the filter query component of this query builder.
*
* @return The QueryBuilder representing the filter part of this query.
* This filter will be analyzed during context-aware criteria extraction.
*/
QueryBuilder filterQueryBuilder();
}
28 changes: 25 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2186,29 +2186,51 @@ public Engine.SearcherSupplier acquireSearcherSupplier() {
return acquireSearcherSupplier(Engine.SearcherScope.EXTERNAL);
}

/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public Engine.SearcherSupplier acquireSearcherSupplier(Set<String> groupingCriterias) {
return acquireSearcherSupplier(Engine.SearcherScope.EXTERNAL, groupingCriterias);
}

/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope) {
return acquireSearcherSupplier(scope, null);
}

/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope, Set<String> groupingCriterias) {
readAllowed();
markSearcherAccessed();
final Engine engine = getEngine();
return engine.acquireSearcherSupplier(this::wrapSearcher, scope);
return engine.acquireSearcherSupplier(this::wrapSearcher, scope, Optional.ofNullable(groupingCriterias));
}

public Engine.Searcher acquireSearcher(String source) {
return acquireSearcher(source, Engine.SearcherScope.EXTERNAL);
return acquireSearcher(source, Engine.SearcherScope.EXTERNAL, null);
}

public Engine.Searcher acquireSearcher(String source, Set<String> contextAwareGroupingCriteria) {
return acquireSearcher(source, Engine.SearcherScope.EXTERNAL, contextAwareGroupingCriteria);
}

private void markSearcherAccessed() {
lastSearcherAccess.lazySet(threadPool.relativeTimeInMillis());
}

private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) {
return acquireSearcher(source, scope, null);
}

private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope, Set<String> contextAwareGroupingCriteria) {
readAllowed();
markSearcherAccessed();
final Engine engine = getEngine();
return engine.acquireSearcher(source, scope, this::wrapSearcher);
return engine.acquireSearcher(source, scope, this::wrapSearcher, Optional.ofNullable(contextAwareGroupingCriteria));
}

private Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
Expand Down
16 changes: 14 additions & 2 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.collapse.CollapseContext;
import org.opensearch.search.contextaware.ContextAwareCriteriaQueryExtraction;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.dfs.DfsPhase;
import org.opensearch.search.dfs.DfsSearchResult;
Expand Down Expand Up @@ -1084,7 +1085,7 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request, boolean
}
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard shard = indexService.getShard(request.shardId().id());
Engine.SearcherSupplier reader = shard.acquireSearcherSupplier();
Engine.SearcherSupplier reader = shard.acquireSearcherSupplier(getContextAwareGroupingCriteria(indexService, request));
return createAndPutReaderContext(request, indexService, shard, reader, keepStatesInContext);
}

Expand Down Expand Up @@ -1844,6 +1845,7 @@ private CanMatchResponse canMatch(ShardSearchRequest request, boolean checkRefre
final IndexService indexService;
final Engine.Searcher canMatchSearcher;
final boolean hasRefreshPending;

if (readerContext != null) {
indexService = readerContext.indexService();
canMatchSearcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
Expand All @@ -1852,7 +1854,10 @@ private CanMatchResponse canMatch(ShardSearchRequest request, boolean checkRefre
indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().getId());
hasRefreshPending = indexShard.hasRefreshPending() && checkRefreshPending;
canMatchSearcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
canMatchSearcher = indexShard.acquireSearcher(
Engine.CAN_MATCH_SEARCH_SOURCE,
getContextAwareGroupingCriteria(indexService, request)
);
}

try (Releasable ignored2 = canMatchSearcher) {
Expand Down Expand Up @@ -1883,6 +1888,13 @@ private CanMatchResponse canMatch(ShardSearchRequest request, boolean checkRefre
}
}

private Set<String> getContextAwareGroupingCriteria(IndexService indexService, ShardSearchRequest request) {
if (indexService.getIndexSettings().isContextAwareEnabled() && request.source() != null) {
return new ContextAwareCriteriaQueryExtraction(indexService.mapperService()).extractCriteria(request.source().query());
}
return null;
}

public static boolean canMatchSearchAfter(
FieldDoc searchAfter,
MinAndMax<?> minMax,
Expand Down
Loading
Loading