Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Use env variable (OPENSEARCH_FIPS_MODE) to enable opensearch to run in FIPS enforced mode instead of checking for existence of bcFIPS jars ([#20625](https://github.com/opensearch-project/OpenSearch/pull/20625))
- Update streaming flag to use search request context ([#20530](https://github.com/opensearch-project/OpenSearch/pull/20530))
- Move pull-based ingestion classes from experimental to publicAPI ([#20704](https://github.com/opensearch-project/OpenSearch/pull/20704))
- Use caching to avoid excessive creation of NonClosingReaderWrapper instances ([#20921](https://github.com/opensearch-project/OpenSearch/pull/20921))

### Fixed
- Relax index template pattern overlap check to use minimum-string heuristic, allowing distinguishable multi-wildcard patterns at the same priority ([#20702](https://github.com/opensearch-project/OpenSearch/pull/20702))
Expand Down
36 changes: 31 additions & 5 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
Expand Down Expand Up @@ -321,6 +322,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl

private final IndexingOperationListener indexingOperationListeners;
private final Runnable globalCheckpointSyncer;
private final ConcurrentHashMap<DirectoryReader, DirectoryReader> readerWrapperCache = new ConcurrentHashMap<>();
Comment thread
kkewwei marked this conversation as resolved.
Outdated

Runnable getGlobalCheckpointSyncer() {
return globalCheckpointSyncer;
Expand Down Expand Up @@ -2226,7 +2228,9 @@ private Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
: "DirectoryReader must be an instance or OpenSearchDirectoryReader";
boolean success = false;
try {
final Engine.Searcher newSearcher = readerWrapper == null ? searcher : wrapSearcher(searcher, readerWrapper);
final Engine.Searcher newSearcher = readerWrapper == null
? searcher
: wrapSearcher(searcher, readerWrapper, readerWrapperCache);
assert newSearcher != null;
success = true;
return newSearcher;
Expand All @@ -2245,15 +2249,36 @@ private Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
public static Engine.Searcher wrapSearcher(
Engine.Searcher engineSearcher,
CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper
) throws IOException {
return wrapSearcher(engineSearcher, readerWrapper, null);
}

public static Engine.Searcher wrapSearcher(
Engine.Searcher engineSearcher,
CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper,
ConcurrentHashMap<DirectoryReader, DirectoryReader> readerWrapperCache
Comment thread
kkewwei marked this conversation as resolved.
Outdated
) throws IOException {
assert readerWrapper != null;
final OpenSearchDirectoryReader openSearchDirectoryReader = OpenSearchDirectoryReader.getOpenSearchDirectoryReader(
engineSearcher.getDirectoryReader()
);
DirectoryReader directoryReader = engineSearcher.getDirectoryReader();
final OpenSearchDirectoryReader openSearchDirectoryReader = OpenSearchDirectoryReader.getOpenSearchDirectoryReader(directoryReader);
if (openSearchDirectoryReader == null) {
throw new IllegalStateException("Can't wrap non opensearch directory reader");
}
NonClosingReaderWrapper nonClosingReaderWrapper = new NonClosingReaderWrapper(engineSearcher.getDirectoryReader());

DirectoryReader nonClosingReaderWrapper;
if (readerWrapperCache == null) {
Comment thread
kkewwei marked this conversation as resolved.
Outdated
nonClosingReaderWrapper = new NonClosingReaderWrapper(directoryReader);
} else {
nonClosingReaderWrapper = readerWrapperCache.computeIfAbsent(directoryReader, key -> {
try {
OpenSearchDirectoryReader.addReaderCloseListener(key, cacheKey -> readerWrapperCache.remove(key));
return new NonClosingReaderWrapper(directoryReader);
} catch (IOException e) {
readerWrapperCache.remove(key);
Comment thread
kkewwei marked this conversation as resolved.
Outdated
throw new OpenSearchException("failed to wrap searcher", e);
}
});
}
DirectoryReader reader = readerWrapper.apply(nonClosingReaderWrapper);
if (reader != nonClosingReaderWrapper) {
if (reader.getReaderCacheHelper() != openSearchDirectoryReader.getReaderCacheHelper()) {
Expand Down Expand Up @@ -2350,6 +2375,7 @@ public void close(String reason, boolean flushEngine, boolean deleted) throws IO
changeState(IndexShardState.CLOSED, reason);
}
} finally {
readerWrapperCache.clear();
final Indexer engine = this.currentEngineReference.getAndSet(null);
try {
if (engine != null && flushEngine) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,62 @@ public void testNoWrap() throws IOException {
IOUtils.close(writer, dir);
}

public void testCacheWrapperReader() throws IOException {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
IndexWriter writer = new IndexWriter(dir, iwc);
Document doc = new Document();
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
DirectoryReader open = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "_na_", 1));
IndexSearcher searcher = new IndexSearcher(open);
assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits.value());
searcher.setSimilarity(iwc.getSimilarity());
final AtomicInteger closeCalls = new AtomicInteger(0);
CheckedFunction<DirectoryReader, DirectoryReader, IOException> wrapper = reader -> new FieldMaskingReader(
"field",
reader,
closeCalls
);
ConcurrentHashMap<DirectoryReader, DirectoryReader> readerWrapperCache = new ConcurrentHashMap<>();
Engine.Searcher wrap = IndexShard.wrapSearcher(
new Engine.Searcher(
"foo",
open,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
() -> {}
),
wrapper,
readerWrapperCache
);
wrap.close();
assertEquals(1, readerWrapperCache.size());
DirectoryReader nonClosingReaderWrapper = readerWrapperCache.get(open);
assertNotNull(nonClosingReaderWrapper);
wrap = IndexShard.wrapSearcher(
new Engine.Searcher(
"foo",
open,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
() -> {}
),
wrapper,
readerWrapperCache
);
wrap.close();
assertEquals(1, readerWrapperCache.size());
DirectoryReader newNonClosingReaderWrapper = readerWrapperCache.get(open);
assertEquals(newNonClosingReaderWrapper, nonClosingReaderWrapper);
IOUtils.close(open, writer, dir);

assertTrue(readerWrapperCache.isEmpty());
}

private static class FieldMaskingReader extends FilterDirectoryReader {
private final String field;
private final AtomicInteger closeCalls;
Expand Down
Loading