Skip to content
Merged
22 changes: 14 additions & 8 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
Expand Down Expand Up @@ -170,7 +170,7 @@ protected static boolean isMergedSegment(LeafReader reader) {
return IndexWriter.SOURCE_MERGE.equals(source);
}

protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) {
protected Searcher newSearcher(String source, IndexSearcher searcher, ReferenceManager<IndexSearcher> manager) {
return new EngineSearcher(source, searcher, manager, store, logger);
}

Expand Down Expand Up @@ -531,7 +531,7 @@ public final Searcher acquireSearcher(String source, SearcherScope scope) throws
* the searcher is acquired. */
store.incRef();
try {
final SearcherManager manager = getSearcherManager(source, scope); // can never be null
final ReferenceManager<IndexSearcher> manager = getSearcherManager(source, scope); // can never be null
/* This might throw NPE but that's fine we will run ensureOpen()
* in the catch block and throw the right exception */
final IndexSearcher searcher = manager.acquire();
Expand Down Expand Up @@ -585,7 +585,7 @@ public CommitStats commitStats() {
/**
* Read the last segments info from the commit pointed to by the searcher manager
*/
protected static SegmentInfos readLastCommittedSegmentInfos(final SearcherManager sm, final Store store) throws IOException {
protected static SegmentInfos readLastCommittedSegmentInfos(final ReferenceManager<IndexSearcher> sm, final Store store) throws IOException {
IndexSearcher searcher = sm.acquire();
try {
IndexCommit latestCommit = ((DirectoryReader) searcher.getIndexReader()).getIndexCommit();
Expand Down Expand Up @@ -787,13 +787,19 @@ public int compare(Segment o1, Segment o2) {
public final boolean refreshNeeded() {
if (store.tryIncRef()) {
/*
we need to inc the store here since searcherManager.isSearcherCurrent()
acquires a searcher internally and that might keep a file open on the
we need to inc the store here since we acquire a searcher and that might keep a file open on the
store. this violates the assumption that all files are closed when
the store is closed so we need to make sure we increment it here
*/
try {
return getSearcherManager("refresh_needed", SearcherScope.EXTERNAL).isSearcherCurrent() == false;
ReferenceManager<IndexSearcher> manager = getSearcherManager("refresh_needed", SearcherScope.EXTERNAL);
final IndexSearcher searcher = manager.acquire();
try {
final IndexReader r = searcher.getIndexReader();
return ((DirectoryReader) r).isCurrent() == false;
} finally {
manager.release(searcher);
}
} catch (IOException e) {
logger.error("failed to access searcher manager", e);
failEngine("failed to access searcher manager", e);
Expand Down Expand Up @@ -1331,7 +1337,7 @@ public void release() {
}
}

protected abstract SearcherManager getSearcherManager(String source, SearcherScope scope);
protected abstract ReferenceManager<IndexSearcher> getSearcherManager(String source, SearcherScope scope);

/**
* Method to close the engine while the write lock is held.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.index.store.Store;
Expand All @@ -32,12 +33,12 @@
* Searcher for an Engine
*/
public class EngineSearcher extends Engine.Searcher {
private final SearcherManager manager;
private final ReferenceManager<IndexSearcher> manager;
private final AtomicBoolean released = new AtomicBoolean(false);
private final Store store;
private final Logger logger;

public EngineSearcher(String source, IndexSearcher searcher, SearcherManager manager, Store store, Logger logger) {
public EngineSearcher(String source, IndexSearcher searcher, ReferenceManager<IndexSearcher> manager, Store store, Logger logger) {
super(source, searcher);
this.manager = manager;
this.store = store;
Expand Down
111 changes: 90 additions & 21 deletions core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.LoggerInfoStream;
Expand All @@ -57,7 +58,6 @@
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
Expand Down Expand Up @@ -108,7 +108,7 @@ public class InternalEngine extends Engine {

private final IndexWriter indexWriter;

private final SearcherManager externalSearcherManager;
private final ExternalSearcherManager externalSearcherManager;
private final SearcherManager internalSearcherManager;

private final Lock flushLock = new ReentrantLock();
Expand Down Expand Up @@ -172,7 +172,7 @@ public InternalEngine(EngineConfig engineConfig) {
store.incRef();
IndexWriter writer = null;
Translog translog = null;
SearcherManager externalSearcherManager = null;
ExternalSearcherManager externalSearcherManager = null;
SearcherManager internalSearcherManager = null;
EngineMergeScheduler scheduler = null;
boolean success = false;
Expand Down Expand Up @@ -224,8 +224,8 @@ public InternalEngine(EngineConfig engineConfig) {
throw e;
}
}
internalSearcherManager = createSearcherManager(new SearcherFactory(), false);
externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig), true);
externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig));
internalSearcherManager = externalSearcherManager.internalSearcherManager;
this.internalSearcherManager = internalSearcherManager;
this.externalSearcherManager = externalSearcherManager;
internalSearcherManager.addListener(versionMap);
Expand All @@ -238,7 +238,7 @@ public InternalEngine(EngineConfig engineConfig) {
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(writer, translog, externalSearcherManager, internalSearcherManager, scheduler);
IOUtils.closeWhileHandlingException(writer, translog, internalSearcherManager, externalSearcherManager, scheduler);
if (isClosed.get() == false) {
// failure we need to dec the store reference
store.decRef();
Expand All @@ -248,6 +248,75 @@ public InternalEngine(EngineConfig engineConfig) {
logger.trace("created new InternalEngine");
}

/**
* This reference manager delegates all it's refresh calls to another (internal) SearcherManager
* The main purpose for this is that if we have external refreshes happening we don't issue extra
* refreshes to clear version map memory etc. this can cause excessive segment creation if heavy indexing
* is happening and the refresh interval is low (ie. 1 sec)
*
* This also prevents segment starvation where an internal reader holds on to old segments literally forever
* since no indexing is happening and refreshes are only happening to the external reader manager, while with
* this specialized implementation an external refresh will immediately be reflected on the internal reader
* and old segments can be released in the same way previous version did this (as a side-effect of _refresh)
*/
@SuppressForbidden(reason = "reference counting is required here")
private static final class ExternalSearcherManager extends ReferenceManager<IndexSearcher> {
private final SearcherFactory searcherFactory;
private final SearcherManager internalSearcherManager;

ExternalSearcherManager(SearcherManager internalSearcherManager, SearcherFactory searcherFactory) throws IOException {
IndexSearcher acquire = internalSearcherManager.acquire();
try {
IndexReader indexReader = acquire.getIndexReader();
assert indexReader instanceof ElasticsearchDirectoryReader:
"searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + indexReader;
indexReader.incRef(); // steal the reader - getSearcher will decrement if it fails
current = SearcherManager.getSearcher(searcherFactory, indexReader, null);
} finally {
internalSearcherManager.release(acquire);
}
this.searcherFactory = searcherFactory;
this.internalSearcherManager = internalSearcherManager;
}

@Override
protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException {
// we simply run a blocking refresh on the internal reference manager and then steal it's reader
// it's a save operation since we acquire the reader which incs it's reference but then down the road
// steal it by calling incRef on the "stolen" reader
internalSearcherManager.maybeRefreshBlocking();
IndexSearcher acquire = internalSearcherManager.acquire();
final IndexReader previousReader = referenceToRefresh.getIndexReader();
assert previousReader instanceof ElasticsearchDirectoryReader:
"searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + previousReader;
try {
final IndexReader newReader = acquire.getIndexReader();
if (newReader == previousReader) {
// nothing has changed - both ref managers share the same instance so we can use reference equality
return null;
} else {
newReader.incRef(); // steal the reader - getSearcher will decrement if it fails
return SearcherManager.getSearcher(searcherFactory, newReader, previousReader);
}
} finally {
internalSearcherManager.release(acquire);
}
}

@Override
protected boolean tryIncRef(IndexSearcher reference) {
return reference.getIndexReader().tryIncRef();
}

@Override
protected int getRefCount(IndexSearcher reference) {
return reference.getIndexReader().getRefCount();
}

@Override
protected void decRef(IndexSearcher reference) throws IOException { reference.getIndexReader().decRef(); }
}

@Override
public void restoreLocalCheckpointFromTranslog() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
Expand Down Expand Up @@ -456,18 +525,18 @@ private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean force
return uuid;
}

private SearcherManager createSearcherManager(SearcherFactory searcherFactory, boolean readSegmentsInfo) throws EngineException {
private ExternalSearcherManager createSearcherManager(SearchFactory externalSearcherFactory) throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
SearcherManager internalSearcherManager = null;
try {
try {
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
searcherManager = new SearcherManager(directoryReader, searcherFactory);
if (readSegmentsInfo) {
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store);
}
internalSearcherManager = new SearcherManager(directoryReader, new SearcherFactory());
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(internalSearcherManager, store);
ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager,
externalSearcherFactory);
success = true;
return searcherManager;
return externalSearcherManager;
} catch (IOException e) {
maybeFailEngine("start", e);
try {
Expand All @@ -479,7 +548,7 @@ private SearcherManager createSearcherManager(SearcherFactory searcherFactory, b
}
} finally {
if (success == false) { // release everything we created on a failure
IOUtils.closeWhileHandlingException(searcherManager, indexWriter);
IOUtils.closeWhileHandlingException(internalSearcherManager, indexWriter);
}
}
}
Expand Down Expand Up @@ -1229,24 +1298,24 @@ public void refresh(String source) throws EngineException {
}

final void refresh(String source, SearcherScope scope) throws EngineException {
long bytes = 0;
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
// both refresh types will result in an internal refresh but only the external will also
// pass the new reader reference to the external reader manager.

// this will also cause version map ram to be freed hence we always account for it.
final long bytes = indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
writingBytes.addAndGet(bytes);
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
bytes = indexWriter.ramBytesUsed();
switch (scope) {
case EXTERNAL:
// even though we maintain 2 managers we really do the heavy-lifting only once.
// the second refresh will only do the extra work we have to do for warming caches etc.
writingBytes.addAndGet(bytes);
externalSearcherManager.maybeRefreshBlocking();
// the break here is intentional we never refresh both internal / external together
break;
case INTERNAL:
final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
bytes += versionMapBytes;
writingBytes.addAndGet(bytes);
internalSearcherManager.maybeRefreshBlocking();
break;
default:
Expand Down Expand Up @@ -1709,7 +1778,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
}

@Override
protected SearcherManager getSearcherManager(String source, SearcherScope scope) {
protected ReferenceManager<IndexSearcher> getSearcherManager(String source, SearcherScope scope) {
switch (scope) {
case INTERNAL:
return internalSearcherManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3882,7 +3882,7 @@ public void assertSameReader(Searcher left, Searcher right) {
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
assertEquals(rightLeaves.size(), leftLeaves.size());
for (int i = 0; i < leftLeaves.size(); i++) {
assertSame(leftLeaves.get(i).reader(), rightLeaves.get(0).reader());
assertSame(leftLeaves.get(i).reader(), rightLeaves.get(i).reader());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sneaky :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah suddenly we have a test that triggered it 💃

}
}

Expand All @@ -3891,7 +3891,7 @@ public void assertNotSameReader(Searcher left, Searcher right) {
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
if (rightLeaves.size() == leftLeaves.size()) {
for (int i = 0; i < leftLeaves.size(); i++) {
if (leftLeaves.get(i).reader() != rightLeaves.get(0).reader()) {
if (leftLeaves.get(i).reader() != rightLeaves.get(i).reader()) {
return; // all is well
}
}
Expand Down Expand Up @@ -3919,7 +3919,6 @@ public void testRefreshScopedSearcher() throws IOException {
assertEquals(0, searchSearcher.reader().numDocs());
assertNotSameReader(getSearcher, searchSearcher);
}

engine.refresh("test", Engine.SearcherScope.EXTERNAL);

try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
Expand All @@ -3928,6 +3927,36 @@ public void testRefreshScopedSearcher() throws IOException {
assertEquals(10, searchSearcher.reader().numDocs());
assertSameReader(getSearcher, searchSearcher);
}

// now ensure external refreshes are reflected on the internal reader
final String docId = Integer.toString(10);
final ParsedDocument doc =
testParsedDocument(docId, null, testDocumentWithTextField(), SOURCE, null);
Engine.Index primaryResponse = indexForDoc(doc);
engine.index(primaryResponse);

engine.refresh("test", Engine.SearcherScope.EXTERNAL);

try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
assertEquals(11, getSearcher.reader().numDocs());
assertEquals(11, searchSearcher.reader().numDocs());
assertSameReader(getSearcher, searchSearcher);
}

try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)){
engine.refresh("test", Engine.SearcherScope.INTERNAL);
try (Searcher nextSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)){
assertSame(searcher.searcher(), nextSearcher.searcher());
}
}

try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
engine.refresh("test", Engine.SearcherScope.EXTERNAL);
try (Searcher nextSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
assertSame(searcher.searcher(), nextSearcher.searcher());
}
}
}

public void testSeqNoGenerator() throws IOException {
Expand Down
Loading