diff --git a/CHANGELOG.md b/CHANGELOG.md index c75677224ac94..4baaf5625b473 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005)) - Add BindableServices extension point to transport-grpc-spi ([#19304](https://github.com/opensearch-project/OpenSearch/pull/19304)) - Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929)) +- Handle deleted documents for filter rewrite subaggregation optimization ([#19643](https://github.com/opensearch-project/OpenSearch/pull/19643)) - Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635)) - Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523)) - Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629)) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java index 639f3477b7868..15889538d60c0 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java @@ -115,7 +115,9 @@ public boolean tryOptimize( return false; } - if (leafCtx.reader().hasDeletions()) return false; + // Since we explicitly create bitset of matching docIds for each bucket + // in case of sub-aggregations, deleted documents can be filtered out + if (leafCtx.reader().hasDeletions() && hasSubAgg == false) return false; PointValues values = leafCtx.reader().getPointValues(aggregatorBridge.fieldType.name()); if (values == null) return false; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/PointTreeTraversal.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/PointTreeTraversal.java index 075e17a695133..36cfb80d93f7b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/PointTreeTraversal.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/PointTreeTraversal.java @@ -91,10 +91,13 @@ private static void intersectWithRanges(PointValues.IntersectVisitor visitor, Po switch (r) { case CELL_INSIDE_QUERY: - collector.countNode((int) pointTree.size()); if (collector.hasSubAgg()) { + // counter for top level agg is handled by sub agg collect pointTree.visitDocIDs(visitor); } else { + // count node should be invoked only in absence of + // sub agg to not include the delete documents + collector.countNode((int) pointTree.size()); collector.visitInner(); } break; @@ -128,9 +131,10 @@ public void visit(DocIdSetIterator iterator) throws IOException { @Override public void visit(int docID, byte[] packedValue) throws IOException { visitPoints(packedValue, () -> { - collector.count(); if (collector.hasSubAgg()) { collector.collectDocId(docID); + } else { + collector.count(); } }); } @@ -140,9 +144,10 @@ public void visit(DocIdSetIterator iterator, byte[] packedValue) throws IOExcept visitPoints(packedValue, () -> { // note: iterator can only iterate once for (int doc = iterator.nextDoc(); doc != NO_MORE_DOCS; doc = iterator.nextDoc()) { - collector.count(); if (collector.hasSubAgg()) { collector.collectDocId(doc); + } else { + collector.count(); } } }); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java index 5c1f21b22e646..e23af22a698d1 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.util.BitDocIdSet; +import org.apache.lucene.util.Bits; import org.apache.lucene.util.FixedBitSet; import org.opensearch.search.aggregations.BucketCollector; import org.opensearch.search.aggregations.LeafBucketCollector; @@ -37,6 +38,7 @@ public class SubAggRangeCollector extends SimpleRangeCollector { private final BucketCollector collectableSubAggregators; private final LeafReaderContext leafCtx; + private final Bits liveDocs; private final FixedBitSet bitSet; private final BitDocIdSet bitDocIdSet; @@ -53,6 +55,7 @@ public SubAggRangeCollector( this.getBucketOrd = getBucketOrd; this.collectableSubAggregators = subAggCollectorParam.collectableSubAggregators(); this.leafCtx = subAggCollectorParam.leafCtx(); + this.liveDocs = leafCtx.reader().getLiveDocs(); int numDocs = leafCtx.reader().maxDoc(); bitSet = new FixedBitSet(numDocs); bitDocIdSet = new BitDocIdSet(bitSet); @@ -63,14 +66,38 @@ public boolean hasSubAgg() { return true; } + private boolean isDocLive(int docId) { + return liveDocs == null || liveDocs.get(docId); + } + + @Override + public void countNode(int count) { + throw new UnsupportedOperationException("countNode should be unreachable"); + } + + @Override + public void count() { + throw new UnsupportedOperationException("countNode should be unreachable"); + } + @Override public void collectDocId(int docId) { - bitSet.set(docId); + if (isDocLive(docId)) { + counter++; + bitSet.set(docId); + } } @Override public void collectDocIdSet(DocIdSetIterator iter) throws IOException { - bitSet.or(iter); + // Explicitly OR iter intoBitSet to filter out deleted docs + iter.nextDoc(); + for (int doc = iter.docID(); doc < DocIdSetIterator.NO_MORE_DOCS; doc = iter.nextDoc()) { + if (isDocLive(doc)) { + counter++; + bitSet.set(doc); + } + } } @Override diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java index 4221a2837387d..be5add530b406 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java @@ -10,9 +10,12 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.LongField; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; @@ -74,6 +77,7 @@ public class FilterRewriteSubAggTests extends AggregatorTestCase { new TestDoc(2, Instant.parse("2020-03-01T01:00:00Z")), new TestDoc(3, Instant.parse("2020-03-01T02:00:00Z")), new TestDoc(4, Instant.parse("2020-03-01T03:00:00Z")), + new TestDoc(4, Instant.parse("2020-03-01T04:00:00Z"), true), new TestDoc(5, Instant.parse("2020-03-01T04:00:00Z")), new TestDoc(6, Instant.parse("2020-03-01T04:00:00Z")) ); @@ -112,7 +116,7 @@ public void testDateHisto() throws IOException { dateFieldName ).calendarInterval(DateHistogramInterval.HOUR).subAggregation(AggregationBuilders.stats(statsAggName).field(longFieldName)); - InternalDateHistogram result = executeAggregation(DEFAULT_DATA, dateHistogramAggregationBuilder, true); + InternalDateHistogram result = executeAggregation(DEFAULT_DATA, dateHistogramAggregationBuilder, false); // Verify results List buckets = result.getBuckets(); @@ -337,11 +341,35 @@ private Directory setupIndex(List docs, boolean random) throws IOExcept for (TestDoc doc : docs) { indexWriter.addDocument(doc.toDocument()); } + + indexWriter.commit(); + } + + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig().setCodec(TestUtil.getDefaultCodec()))) { + for (TestDoc doc : docs) { + if (doc.deleted) { + BooleanQuery.Builder booleanQueryBuilder = new BooleanQuery.Builder(); + booleanQueryBuilder.add(LongPoint.newRangeQuery(longFieldName, doc.metric, doc.metric), BooleanClause.Occur.MUST); + booleanQueryBuilder.add( + LongField.newRangeQuery( + dateFieldName, + dateFieldType.parse(doc.timestamp.toString()), + dateFieldType.parse(doc.timestamp.toString()) + ), + BooleanClause.Occur.MUST + ); + indexWriter.deleteDocuments(booleanQueryBuilder.build()); + } + } + + indexWriter.commit(); } } else { try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { for (TestDoc doc : docs) { - indexWriter.addDocument(doc.toDocument()); + if (!doc.deleted) { + indexWriter.addDocument(doc.toDocument()); + } } } } @@ -413,10 +441,16 @@ private InternalAggregation.ReduceContext createReduceContext( private class TestDoc { private final long metric; private final Instant timestamp; + private final boolean deleted; public TestDoc(long metric, Instant timestamp) { + this(metric, timestamp, false); + } + + public TestDoc(long metric, Instant timestamp, boolean deleted) { this.metric = metric; this.timestamp = timestamp; + this.deleted = deleted; } public ParseContext.Document toDocument() {