From f031019377ba032ec7251c45ec1a169489a5feb9 Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Tue, 14 Oct 2025 21:52:08 -0700 Subject: [PATCH 1/5] Handle deleted documents for filter rewrite subaggregation optimization Signed-off-by: Ankit Jain --- .../FilterRewriteOptimizationContext.java | 4 +++- .../rangecollector/SubAggRangeCollector.java | 15 +++++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java index 639f3477b7868..15889538d60c0 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java @@ -115,7 +115,9 @@ public boolean tryOptimize( return false; } - if (leafCtx.reader().hasDeletions()) return false; + // Since we explicitly create bitset of matching docIds for each bucket + // in case of sub-aggregations, deleted documents can be filtered out + if (leafCtx.reader().hasDeletions() && hasSubAgg == false) return false; PointValues values = leafCtx.reader().getPointValues(aggregatorBridge.fieldType.name()); if (values == null) return false; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java index 5c1f21b22e646..cd6b5a22fd0f7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.util.BitDocIdSet; +import org.apache.lucene.util.Bits; import org.apache.lucene.util.FixedBitSet; import org.opensearch.search.aggregations.BucketCollector; import org.opensearch.search.aggregations.LeafBucketCollector; @@ -37,6 +38,7 @@ public class SubAggRangeCollector extends SimpleRangeCollector { private final BucketCollector collectableSubAggregators; private final LeafReaderContext leafCtx; + private final Bits liveDocs; private final FixedBitSet bitSet; private final BitDocIdSet bitDocIdSet; @@ -53,6 +55,7 @@ public SubAggRangeCollector( this.getBucketOrd = getBucketOrd; this.collectableSubAggregators = subAggCollectorParam.collectableSubAggregators(); this.leafCtx = subAggCollectorParam.leafCtx(); + this.liveDocs = leafCtx.reader().getLiveDocs(); int numDocs = leafCtx.reader().maxDoc(); bitSet = new FixedBitSet(numDocs); bitDocIdSet = new BitDocIdSet(bitSet); @@ -65,12 +68,20 @@ public boolean hasSubAgg() { @Override public void collectDocId(int docId) { - bitSet.set(docId); + if (liveDocs.get(docId)) { + bitSet.set(docId); + } } @Override public void collectDocIdSet(DocIdSetIterator iter) throws IOException { - bitSet.or(iter); + // Explicitly OR iter intoBitSet to filter out deleted docs + iter.nextDoc(); + for (int doc = iter.docID(); doc < DocIdSetIterator.NO_MORE_DOCS; doc = iter.nextDoc()) { + if (liveDocs.get(doc)) { + bitSet.set(doc); + } + } } @Override From ec999eabb5ddd8d3eaa65eeac1d8f1c5eb64439b Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Wed, 15 Oct 2025 13:59:36 -0700 Subject: [PATCH 2/5] Addressing test failures due to NPE Signed-off-by: Ankit Jain --- .../rangecollector/SubAggRangeCollector.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java index cd6b5a22fd0f7..bedff03f8bc79 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java @@ -66,9 +66,13 @@ public boolean hasSubAgg() { return true; } + private boolean isDocLive(int docId) { + return liveDocs == null || liveDocs.get(docId); + } + @Override public void collectDocId(int docId) { - if (liveDocs.get(docId)) { + if (isDocLive(docId)) { bitSet.set(docId); } } @@ -78,7 +82,7 @@ public void collectDocIdSet(DocIdSetIterator iter) throws IOException { // Explicitly OR iter intoBitSet to filter out deleted docs iter.nextDoc(); for (int doc = iter.docID(); doc < DocIdSetIterator.NO_MORE_DOCS; doc = iter.nextDoc()) { - if (liveDocs.get(doc)) { + if (isDocLive(doc)) { bitSet.set(doc); } } From f75012310dc5fbfa26193105caefe6f31776195b Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Tue, 4 Nov 2025 16:07:23 -0800 Subject: [PATCH 3/5] Adding unit test and fixing bugs Signed-off-by: Ankit Jain --- .../filterrewrite/PointTreeTraversal.java | 11 +++++-- .../rangecollector/SubAggRangeCollector.java | 12 +++++++ .../FilterRewriteSubAggTests.java | 31 +++++++++++++++++-- 3 files changed, 49 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/PointTreeTraversal.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/PointTreeTraversal.java index 075e17a695133..36cfb80d93f7b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/PointTreeTraversal.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/PointTreeTraversal.java @@ -91,10 +91,13 @@ private static void intersectWithRanges(PointValues.IntersectVisitor visitor, Po switch (r) { case CELL_INSIDE_QUERY: - collector.countNode((int) pointTree.size()); if (collector.hasSubAgg()) { + // counter for top level agg is handled by sub agg collect pointTree.visitDocIDs(visitor); } else { + // count node should be invoked only in absence of + // sub agg to not include the delete documents + collector.countNode((int) pointTree.size()); collector.visitInner(); } break; @@ -128,9 +131,10 @@ public void visit(DocIdSetIterator iterator) throws IOException { @Override public void visit(int docID, byte[] packedValue) throws IOException { visitPoints(packedValue, () -> { - collector.count(); if (collector.hasSubAgg()) { collector.collectDocId(docID); + } else { + collector.count(); } }); } @@ -140,9 +144,10 @@ public void visit(DocIdSetIterator iterator, byte[] packedValue) throws IOExcept visitPoints(packedValue, () -> { // note: iterator can only iterate once for (int doc = iterator.nextDoc(); doc != NO_MORE_DOCS; doc = iterator.nextDoc()) { - collector.count(); if (collector.hasSubAgg()) { collector.collectDocId(doc); + } else { + collector.count(); } } }); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java index bedff03f8bc79..e23af22a698d1 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java @@ -70,9 +70,20 @@ private boolean isDocLive(int docId) { return liveDocs == null || liveDocs.get(docId); } + @Override + public void countNode(int count) { + throw new UnsupportedOperationException("countNode should be unreachable"); + } + + @Override + public void count() { + throw new UnsupportedOperationException("countNode should be unreachable"); + } + @Override public void collectDocId(int docId) { if (isDocLive(docId)) { + counter++; bitSet.set(docId); } } @@ -83,6 +94,7 @@ public void collectDocIdSet(DocIdSetIterator iter) throws IOException { iter.nextDoc(); for (int doc = iter.docID(); doc < DocIdSetIterator.NO_MORE_DOCS; doc = iter.nextDoc()) { if (isDocLive(doc)) { + counter++; bitSet.set(doc); } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java index 4221a2837387d..d7bb315efb27b 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java @@ -10,9 +10,12 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.LongField; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; @@ -74,6 +77,7 @@ public class FilterRewriteSubAggTests extends AggregatorTestCase { new TestDoc(2, Instant.parse("2020-03-01T01:00:00Z")), new TestDoc(3, Instant.parse("2020-03-01T02:00:00Z")), new TestDoc(4, Instant.parse("2020-03-01T03:00:00Z")), + new TestDoc(4, Instant.parse("2020-03-01T04:00:00Z"), true), new TestDoc(5, Instant.parse("2020-03-01T04:00:00Z")), new TestDoc(6, Instant.parse("2020-03-01T04:00:00Z")) ); @@ -112,7 +116,7 @@ public void testDateHisto() throws IOException { dateFieldName ).calendarInterval(DateHistogramInterval.HOUR).subAggregation(AggregationBuilders.stats(statsAggName).field(longFieldName)); - InternalDateHistogram result = executeAggregation(DEFAULT_DATA, dateHistogramAggregationBuilder, true); + InternalDateHistogram result = executeAggregation(DEFAULT_DATA, dateHistogramAggregationBuilder, false); // Verify results List buckets = result.getBuckets(); @@ -337,11 +341,28 @@ private Directory setupIndex(List docs, boolean random) throws IOExcept for (TestDoc doc : docs) { indexWriter.addDocument(doc.toDocument()); } + + indexWriter.commit(); + } + + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig().setCodec(TestUtil.getDefaultCodec()))) { + for (TestDoc doc : docs) { + if (doc.deleted) { + BooleanQuery.Builder booleanQueryBuilder = new BooleanQuery.Builder(); + booleanQueryBuilder.add(LongPoint.newRangeQuery(longFieldName, doc.metric, doc.metric), BooleanClause.Occur.MUST); + booleanQueryBuilder.add(LongField.newRangeQuery(dateFieldName, dateFieldType.parse(doc.timestamp.toString()), dateFieldType.parse(doc.timestamp.toString())), BooleanClause.Occur.MUST); + indexWriter.deleteDocuments(booleanQueryBuilder.build()); + } + } + + indexWriter.commit(); } } else { try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { for (TestDoc doc : docs) { - indexWriter.addDocument(doc.toDocument()); + if (!doc.deleted) { + indexWriter.addDocument(doc.toDocument()); + } } } } @@ -413,10 +434,16 @@ private InternalAggregation.ReduceContext createReduceContext( private class TestDoc { private final long metric; private final Instant timestamp; + private final boolean deleted; public TestDoc(long metric, Instant timestamp) { + this(metric, timestamp, false); + } + + public TestDoc(long metric, Instant timestamp, boolean deleted) { this.metric = metric; this.timestamp = timestamp; + this.deleted = deleted; } public ParseContext.Document toDocument() { From e81603b7fc478b065ca5117fe88a7b82ffd5e46d Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Tue, 4 Nov 2025 16:09:25 -0800 Subject: [PATCH 4/5] Adding changelog entry Signed-off-by: Ankit Jain --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c75677224ac94..4baaf5625b473 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005)) - Add BindableServices extension point to transport-grpc-spi ([#19304](https://github.com/opensearch-project/OpenSearch/pull/19304)) - Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929)) +- Handle deleted documents for filter rewrite subaggregation optimization ([#19643](https://github.com/opensearch-project/OpenSearch/pull/19643)) - Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635)) - Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523)) - Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629)) From 92a629fa76b31b5e0a5469382fbb5bc37492d1aa Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Tue, 4 Nov 2025 16:20:46 -0800 Subject: [PATCH 5/5] Apply spotless Signed-off-by: Ankit Jain --- .../bucket/filterrewrite/FilterRewriteSubAggTests.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java index d7bb315efb27b..be5add530b406 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java @@ -350,7 +350,14 @@ private Directory setupIndex(List docs, boolean random) throws IOExcept if (doc.deleted) { BooleanQuery.Builder booleanQueryBuilder = new BooleanQuery.Builder(); booleanQueryBuilder.add(LongPoint.newRangeQuery(longFieldName, doc.metric, doc.metric), BooleanClause.Occur.MUST); - booleanQueryBuilder.add(LongField.newRangeQuery(dateFieldName, dateFieldType.parse(doc.timestamp.toString()), dateFieldType.parse(doc.timestamp.toString())), BooleanClause.Occur.MUST); + booleanQueryBuilder.add( + LongField.newRangeQuery( + dateFieldName, + dateFieldType.parse(doc.timestamp.toString()), + dateFieldType.parse(doc.timestamp.toString()) + ), + BooleanClause.Occur.MUST + ); indexWriter.deleteDocuments(booleanQueryBuilder.build()); } }