From 293ee777da1e22e9db70a0d1949314d43f5e09e7 Mon Sep 17 00:00:00 2001 From: Christos Soulios <1561376+csoulios@users.noreply.github.com> Date: Tue, 19 Oct 2021 13:25:54 +0300 Subject: [PATCH 1/2] Fix rate agg with custom `_doc_count` (#79346) When running a rate aggregation without setting the field parameter, the result is computed based on the bucket doc_count. This PR adds support for a custom _doc_count field. Closes #77734 --- .../metrics/rate-aggregation.asciidoc | 8 +- .../rate/AbstractRateAggregator.java | 4 +- .../analytics/rate/NumericRateAggregator.java | 76 +++++++++++++------ .../analytics/rate/RateAggregatorTests.java | 13 ++++ .../rest-api-spec/test/analytics/rate.yml | 41 ++++++++++ 5 files changed, 114 insertions(+), 28 deletions(-) diff --git a/docs/reference/aggregations/metrics/rate-aggregation.asciidoc b/docs/reference/aggregations/metrics/rate-aggregation.asciidoc index cb1f903f6443c..c99dffc8d8b5c 100644 --- a/docs/reference/aggregations/metrics/rate-aggregation.asciidoc +++ b/docs/reference/aggregations/metrics/rate-aggregation.asciidoc @@ -7,7 +7,7 @@ ++++ A `rate` metrics aggregation can be used only inside a `date_histogram` or `composite` aggregation. It calculates a rate of documents -or a field in each bucket. The field values can be generated extracted from specific numeric or +or a field in each bucket. The field values can be extracted from specific numeric or <> in the documents. NOTE: For `composite` aggregations, there must be exactly one `date_histogram` source for the `rate` aggregation to be supported. @@ -27,7 +27,7 @@ A `rate` aggregation looks like this in isolation: -------------------------------------------------- // NOTCONSOLE -The following request will group all sales records into monthly bucket and than convert the number of sales transaction in each bucket +The following request will group all sales records into monthly buckets and then convert the number of sales transactions in each bucket into per annual sales rate. [source,console] @@ -56,8 +56,8 @@ GET sales/_search <1> Histogram is grouped by month. <2> But the rate is converted into annual rate. -The response will return the annual rate of transaction in each bucket. Since there are 12 months per year, the annual rate will -be automatically calculated by multiplying monthly rate by 12. +The response will return the annual rate of transactions in each bucket. Since there are 12 months per year, the annual rate will +be automatically calculated by multiplying the monthly rate by 12. [source,console-result] -------------------------------------------------- diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/AbstractRateAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/AbstractRateAggregator.java index 5062f9d788a18..5105ee73729ca 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/AbstractRateAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/AbstractRateAggregator.java @@ -29,6 +29,7 @@ public abstract class AbstractRateAggregator extends NumericMetricsAggregator.Si private final Rounding.DateTimeUnit rateUnit; protected final RateMode rateMode; private final SizedBucketAggregator sizedBucketAggregator; + protected final boolean computeWithDocCount; protected DoubleArray sums; protected DoubleArray compensations; @@ -55,6 +56,8 @@ public AbstractRateAggregator( this.rateUnit = rateUnit; this.rateMode = rateMode; this.sizedBucketAggregator = findSizedBucketAncestor(); + // If no fields or scripts have been defined in the agg, rate should be computed based on bucket doc_counts + this.computeWithDocCount = valuesSourceConfig.fieldContext() == null && valuesSourceConfig.script() == null; } private SizedBucketAggregator findSizedBucketAncestor() { @@ -112,5 +115,4 @@ public InternalAggregation buildEmptyAggregation() { public void doClose() { Releasables.close(sums, compensations); } - } diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/NumericRateAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/NumericRateAggregator.java index 04a59d7f95cd7..8f855d48f4866 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/NumericRateAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/NumericRateAggregator.java @@ -13,6 +13,7 @@ import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.DocCountProvider; import org.elasticsearch.search.aggregations.metrics.CompensatedSum; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.ValuesSource; @@ -22,6 +23,9 @@ import java.util.Map; public class NumericRateAggregator extends AbstractRateAggregator { + + private final DocCountProvider docCountProvider; + public NumericRateAggregator( String name, ValuesSourceConfig valuesSourceConfig, @@ -32,42 +36,68 @@ public NumericRateAggregator( Map metadata ) throws IOException { super(name, valuesSourceConfig, rateUnit, rateMode, context, parent, metadata); + docCountProvider = computeWithDocCount ? new DocCountProvider() : null; } @Override public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { final CompensatedSum kahanSummation = new CompensatedSum(0, 0); - final SortedNumericDoubleValues values = ((ValuesSource.Numeric) valuesSource).doubleValues(ctx); - return new LeafBucketCollectorBase(sub, values) { - @Override - public void collect(int doc, long bucket) throws IOException { - sums = bigArrays().grow(sums, bucket + 1); - compensations = bigArrays().grow(compensations, bucket + 1); - - if (values.advanceExact(doc)) { - final int valuesCount = values.docValueCount(); + if (computeWithDocCount) { + // No field or script has been set at the rate agg. So, rate will be computed based on the doc_counts. + // This implementation hard-wires the DocCountProvider and reads the _doc_count fields when available. + // A better approach would be to create a DOC_COUNT ValuesSource type and use that as valuesSource + // In that case the computeRateOnDocs variable and this branch of the if-statement are not required. + docCountProvider.setLeafReaderContext(ctx); + return new LeafBucketCollectorBase(sub, null) { + @Override + public void collect(int doc, long bucket) throws IOException { + sums = bigArrays().grow(sums, bucket + 1); + compensations = bigArrays().grow(compensations, bucket + 1); // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. double sum = sums.get(bucket); double compensation = compensations.get(bucket); kahanSummation.reset(sum, compensation); - switch (rateMode) { - case SUM: - for (int i = 0; i < valuesCount; i++) { - kahanSummation.add(values.nextValue()); - } - break; - case VALUE_COUNT: - kahanSummation.add(valuesCount); - break; - default: - throw new IllegalArgumentException("Unsupported rate mode " + rateMode); - } + final int docCount = docCountProvider.getDocCount(doc); + kahanSummation.add(docCount); compensations.set(bucket, kahanSummation.delta()); sums.set(bucket, kahanSummation.value()); } - } - }; + }; + } else { + final SortedNumericDoubleValues values = ((ValuesSource.Numeric) valuesSource).doubleValues(ctx); + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + sums = bigArrays().grow(sums, bucket + 1); + compensations = bigArrays().grow(compensations, bucket + 1); + + if (values.advanceExact(doc)) { + final int valuesCount = values.docValueCount(); + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); + switch (rateMode) { + case SUM: + for (int i = 0; i < valuesCount; i++) { + kahanSummation.add(values.nextValue()); + } + break; + case VALUE_COUNT: + kahanSummation.add(valuesCount); + break; + default: + throw new IllegalArgumentException("Unsupported rate mode " + rateMode); + } + + compensations.set(bucket, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); + } + } + }; + } } } diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorTests.java index e21fa041828ac..86cd93edbd89f 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.index.fielddata.ScriptDocValues; +import org.elasticsearch.index.mapper.CustomTermFreqField; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; @@ -859,6 +860,18 @@ public void testModeWithoutField() { assertEquals("The mode parameter is only supported with field or script", ex.getMessage()); } + public void testWithCustomDocCount() throws IOException { + testCase(new MatchAllDocsQuery(), "month", true, "month", null, iw -> { + iw.addDocument(doc("2010-03-12T01:07:45", new CustomTermFreqField("_doc_count", "_doc_count", 10))); + iw.addDocument(doc("2010-04-01T03:43:34")); + iw.addDocument(doc("2010-04-27T03:43:34", new CustomTermFreqField("_doc_count", "_doc_count", 5))); + }, dh -> { + assertThat(dh.getBuckets(), hasSize(2)); + assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(10.0, 0.000001)); + assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(6.0, 0.000001)); + }); + } + private static AbstractAggregationBuilder randomValidMultiBucketAggBuilder( RateAggregationBuilder rateAggregationBuilder, DateHistogramInterval interval diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/rate.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/rate.yml index ea6e9c242fbe4..b3adcafced0cd 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/rate.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/rate.yml @@ -34,3 +34,44 @@ setup: - length: { aggregations.by_date.buckets: 2 } - match: { aggregations.by_date.buckets.0.rate.value: 1.0 } - match: { aggregations.by_date.buckets.1.rate.value: 2.0 } + + +--- +"rate with doc_count": + - skip: + version: " - 7.99.99" + reason: bug fixed in 8.0.0 + - do: + bulk: + index: test2 + refresh: true + body: + - '{"index": {}}' + - '{"timestamp": "2021-09-14T22:33:37.477Z", "_doc_count": 10}' + - '{"index": {}}' + - '{"timestamp": "2021-09-14T22:35:37.477Z", "_doc_count": 5}' + - '{"index": {}}' + - '{"timestamp": "2021-09-14T22:35:38.477Z", "_doc_count": 1}' + - '{"index": {}}' + - '{"timestamp": "2021-09-14T22:36:08.477Z"}' + - do: + search: + size: 0 + index: "test2" + body: + aggs: + by_date: + date_histogram: + field: timestamp + fixed_interval: 60s + aggs: + rate: + rate: + unit: minute + + - length: { aggregations.by_date.buckets: 4 } + - match: { aggregations.by_date.buckets.0.rate.value: 10.0 } + - match: { aggregations.by_date.buckets.1.rate.value: 0.0 } + - match: { aggregations.by_date.buckets.2.rate.value: 6.0 } + - match: { aggregations.by_date.buckets.3.rate.value: 1.0 } + From 34527dacf03d6d0da9f88a8e72ee385711b5dcab Mon Sep 17 00:00:00 2001 From: Christos Soulios Date: Tue, 19 Oct 2021 13:42:25 +0300 Subject: [PATCH 2/2] Enabled test for 7.16 --- .../resources/rest-api-spec/test/analytics/rate.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/rate.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/rate.yml index b3adcafced0cd..99b37c1d74652 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/rate.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/rate.yml @@ -39,8 +39,8 @@ setup: --- "rate with doc_count": - skip: - version: " - 7.99.99" - reason: bug fixed in 8.0.0 + version: " - 7.15.99" + reason: bug fixed in 7.16.0 - do: bulk: index: test2