diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupIndexerAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupIndexerAction.java index 5c882f713e96a..b0601ba9a66f2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupIndexerAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupIndexerAction.java @@ -44,11 +44,19 @@ public static class Request extends BroadcastRequest implements Indices private RollupAction.Request rollupRequest; private String[] dimensionFields; private String[] metricFields; - - public Request(RollupAction.Request rollupRequest, final String[] dimensionFields, final String[] metricFields) { + private String[] labelFields; + + public Request( + RollupAction.Request rollupRequest, + final String[] dimensionFields, + final String[] metricFields, + final String[] labelFields + ) { + super(rollupRequest.indices()); this.rollupRequest = rollupRequest; this.dimensionFields = dimensionFields; this.metricFields = metricFields; + this.labelFields = labelFields; } public Request() {} @@ -58,6 +66,7 @@ public Request(StreamInput in) throws IOException { this.rollupRequest = new RollupAction.Request(in); this.dimensionFields = in.readStringArray(); this.metricFields = in.readStringArray(); + this.labelFields = in.readStringArray(); } @Override @@ -82,6 +91,10 @@ public String[] getMetricFields() { return this.metricFields; } + public String[] getLabelFields() { + return labelFields; + } + @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { return new RollupTask(id, type, action, parentTaskId, rollupRequest.getRollupIndex(), rollupRequest.getRollupConfig(), headers); @@ -93,6 +106,7 @@ public void writeTo(StreamOutput out) throws IOException { rollupRequest.writeTo(out); out.writeStringArray(dimensionFields); out.writeStringArray(metricFields); + out.writeStringArray(labelFields); } @Override @@ -106,6 +120,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("rollup_request", rollupRequest); builder.array("dimension_fields", dimensionFields); builder.array("metric_fields", metricFields); + builder.array("label_fields", labelFields); builder.endObject(); return builder; } @@ -115,6 +130,7 @@ public int hashCode() { int result = rollupRequest.hashCode(); result = 31 * result + Arrays.hashCode(dimensionFields); result = 31 * result + Arrays.hashCode(metricFields); + result = 31 * result + Arrays.hashCode(labelFields); return result; } @@ -125,6 +141,7 @@ public boolean equals(Object o) { Request request = (Request) o; if (rollupRequest.equals(request.rollupRequest) == false) return false; if (Arrays.equals(dimensionFields, request.dimensionFields) == false) return false; + if (Arrays.equals(labelFields, request.labelFields) == false) return false; return Arrays.equals(metricFields, request.metricFields); } } @@ -225,6 +242,10 @@ public String[] getMetricFields() { return request.getMetricFields(); } + public String[] getLabelFields() { + return request.getLabelFields(); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/rollup/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/rollup/10_basic.yml b/x-pack/plugin/rollup/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/rollup/10_basic.yml index 4ae26ed0fcf9e..fee9a0ed0ed08 100644 --- a/x-pack/plugin/rollup/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/rollup/10_basic.yml +++ b/x-pack/plugin/rollup/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/rollup/10_basic.yml @@ -1,7 +1,7 @@ setup: - skip: - version: " - 8.2.99" - reason: tsdb indexing changed in 8.3.0 + version: " - 8.3.99" + reason: "rollup: labels support added in 8.4.0" - do: indices.create: @@ -32,8 +32,18 @@ setup: time_series_dimension: true name: type: keyword + created_at: + type: date_nanos + running: + type: boolean + number_of_containers: + type: integer ip: type: ip + tags: + type: keyword + values: + type: integer network: properties: tx: @@ -48,21 +58,21 @@ setup: index: test body: - '{"index": {}}' - - '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}' + - '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}, "created_at": "2021-04-28T19:34:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 6]}}}' - '{"index": {}}' - - '{"@timestamp": "2021-04-28T18:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2005177954, "rx": 801479970}}}}' + - '{"@timestamp": "2021-04-28T18:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.26", "network": {"tx": 2005177954, "rx": 801479970}, "created_at": "2021-04-28T19:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west1"], "values": [1, 1, 3]}}}' - '{"index": {}}' - - '{"@timestamp": "2021-04-28T20:50:44.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2006223737, "rx": 802337279}}}}' + - '{"@timestamp": "2021-04-28T20:50:44.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.41", "network": {"tx": 2006223737, "rx": 802337279}, "created_at": "2021-04-28T19:36:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west2"], "values": [4, 1, 2]}}}' - '{"index": {}}' - - '{"@timestamp": "2021-04-28T20:51:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.2", "network": {"tx": 2012916202, "rx": 803685721}}}}' + - '{"@timestamp": "2021-04-28T20:51:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.22", "network": {"tx": 2012916202, "rx": 803685721}, "created_at": "2021-04-28T19:37:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 1]}}}' - '{"index": {}}' - - '{"@timestamp": "2021-04-28T18:50:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434521831, "rx": 530575198}}}}' + - '{"@timestamp": "2021-04-28T18:50:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.33", "network": {"tx": 1434521831, "rx": 530575198}, "created_at": "2021-04-28T19:42:00.000Z", "running": false, "number_of_containers": 1, "tags": ["backend", "test"], "values": [2, 3, 4]}}}' - '{"index": {}}' - - '{"@timestamp": "2021-04-28T18:50:23.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434577921, "rx": 530600088}}}}' + - '{"@timestamp": "2021-04-28T18:50:23.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.56", "network": {"tx": 1434577921, "rx": 530600088}, "created_at": "2021-04-28T19:43:00.000Z", "running": false, "number_of_containers": 1, "tags": ["backend", "test", "us-west2"], "values": [2, 1, 1]}}}' - '{"index": {}}' - - '{"@timestamp": "2021-04-28T19:50:53.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434587694, "rx": 530604797}}}}' + - '{"@timestamp": "2021-04-28T19:50:53.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.37", "network": {"tx": 1434587694, "rx": 530604797}, "created_at": "2021-04-28T19:44:00.000Z", "running": true, "number_of_containers": 1, "tags": ["backend", "test", "us-west1"], "values": [4, 5, 2]}}}' - '{"index": {}}' - - '{"@timestamp": "2021-04-28T19:51:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}' + - '{"@timestamp": "2021-04-28T19:51:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.120", "network": {"tx": 1434595272, "rx": 530605511}, "created_at": "2021-04-28T19:45:00.000Z", "running": true, "number_of_containers": 1, "tags": ["backend", "test", "us-west1"], "values": [3, 2, 1]}}}' - do: indices.put_settings: @@ -99,7 +109,12 @@ setup: - match: { hits.hits.0._source.k8s\.pod\.network\.tx.min: 2001818691 } - match: { hits.hits.0._source.k8s\.pod\.network\.tx.max: 2005177954 } - match: { hits.hits.0._source.k8s\.pod\.network\.tx.value_count: 2 } - - is_false: hits.hits.0._source.k8s\.pod\.ip # k8s.pod.ip isn't a dimension and is not rolled up + - match: { hits.hits.0._source.k8s\.pod\.ip: "10.10.55.26" } + - match: { hits.hits.0._source.k8s\.pod\.created_at: "2021-04-28T19:35:00.000Z" } + - match: { hits.hits.0._source.k8s\.pod\.number_of_containers: 2 } + - match: { hits.hits.0._source.k8s\.pod\.tags: ["backend", "prod", "us-west1"] } + - match: { hits.hits.0._source.k8s\.pod\.values: [1, 1, 3] } + - is_true: hits.hits.0._source.k8s\.pod\.running # Assert rollup index settings - do: diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/AbstractRollupFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/AbstractRollupFieldProducer.java new file mode 100644 index 0000000000000..1b1ff0189bdc8 --- /dev/null +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/AbstractRollupFieldProducer.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.rollup.v2; + +/** + * Base class for classes that read metric and label fields. + */ +abstract class AbstractRollupFieldProducer { + + protected final String name; + protected boolean isEmpty; + + AbstractRollupFieldProducer(String name) { + this.name = name; + this.isEmpty = true; + } + + /** + * Collect a value for the field applying the specific subclass collection strategy. + * @param value the value to collect. + */ + public abstract void collect(T value); + + /** + * @return the name of the field. + */ + public String name() { + return name; + } + + /** + * @return the value of the field. + */ + public abstract Object value(); + + /** + * Resets the collected value to the specific subclass reset value. + */ + public abstract void reset(); + + /** + * @return true if the field has not collected any value. + */ + public boolean isEmpty() { + return isEmpty; + } +} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/FieldValueFetcher.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/FieldValueFetcher.java index d299e4a1d01ad..e2073803f56bb 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/FieldValueFetcher.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/FieldValueFetcher.java @@ -18,9 +18,7 @@ import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.function.Function; @@ -30,8 +28,20 @@ */ class FieldValueFetcher { - private static final Set> VALID_TYPES = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList(Long.class, Double.class, BigInteger.class, String.class, BytesRef.class)) + private static final Set> VALID_METRIC_TYPES = Set.of( + Long.class, + Double.class, + BigInteger.class, + String.class, + BytesRef.class + ); + private static final Set> VALID_LABEL_TYPES = Set.of( + Long.class, + Double.class, + BigInteger.class, + String.class, + BytesRef.class, + Boolean.class ); private final String name; @@ -66,7 +76,7 @@ public IndexFieldData fieldData() { FormattedDocValues getLeaf(LeafReaderContext context) { - final FormattedDocValues delegate = fieldData.load(context).getFormattedValues(DocValueFormat.RAW); + final FormattedDocValues delegate = fieldData.load(context).getFormattedValues(format); return new FormattedDocValues() { @Override public boolean advanceExact(int docId) throws IOException { @@ -102,7 +112,7 @@ Object format(Object value) { /** * Retrieve field fetchers for a list of fields. */ - static List build(SearchExecutionContext context, String[] fields) { + private static List build(SearchExecutionContext context, String[] fields, Set> validTypes) { List fetchers = new ArrayList<>(fields.length); for (String field : fields) { MappedFieldType fieldType = context.getFieldType(field); @@ -110,20 +120,28 @@ static List build(SearchExecutionContext context, String[] fi throw new IllegalArgumentException("Unknown field: [" + field + "]"); } IndexFieldData fieldData = context.getForField(fieldType); - fetchers.add(new FieldValueFetcher(field, fieldType, fieldData, getValidator(field))); + fetchers.add(new FieldValueFetcher(field, fieldType, fieldData, getValidator(field, validTypes))); } return Collections.unmodifiableList(fetchers); } - static Function getValidator(String field) { + static Function getValidator(String field, Set> validTypes) { return value -> { - if (VALID_TYPES.contains(value.getClass()) == false) { + if (validTypes.contains(value.getClass()) == false) { throw new IllegalArgumentException( - "Expected [" + VALID_TYPES + "] for field [" + field + "], " + "got [" + value.getClass() + "]" + "Expected [" + validTypes + "] for field [" + field + "], " + "got [" + value.getClass() + "]" ); } return value; }; } + static List forMetrics(SearchExecutionContext context, String[] metricFields) { + return build(context, metricFields, VALID_METRIC_TYPES); + } + + static List forLabels(SearchExecutionContext context, String[] labelFields) { + return build(context, labelFields, VALID_LABEL_TYPES); + } + } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/LabelFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/LabelFieldProducer.java new file mode 100644 index 0000000000000..798cc785a0f19 --- /dev/null +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/LabelFieldProducer.java @@ -0,0 +1,130 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.rollup.v2; + +import org.elasticsearch.index.query.SearchExecutionContext; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Class that produces values for a label field. + */ +abstract class LabelFieldProducer extends AbstractRollupFieldProducer { + + private final Label label; + + LabelFieldProducer(String name, Label label) { + super(name); + this.label = label; + } + + public String name() { + return name; + } + + /** Collect the value of a raw field */ + @Override + public void collect(Object value) { + label.collect(value); + isEmpty = false; + } + + public Label label() { + return this.label; + } + + public void reset() { + label.reset(); + isEmpty = true; + } + + /** + * Return the downsampled value as computed after collecting all raw values. + * @return + */ + public abstract Object value(); + + abstract static class Label { + final String name; + + /** + * Abstract class that defines the how a label is computed. + * @param name + */ + protected Label(String name) { + this.name = name; + } + + abstract void collect(Object value); + + abstract Object get(); + + abstract void reset(); + } + + /** + * Label implementation that stores the last value over time for a label. This implementation + * assumes that field values are collected sorted by descending order by time. In this case, + * it assumes that the last value of the time is the first value collected. Eventually, + * the implementation of this class end up storing the first value it is empty and then + * ignoring everything else. + */ + static class LastValueLabel extends Label { + private Object lastValue; + + LastValueLabel() { + super("last_value"); + } + + @Override + void collect(Object value) { + if (lastValue == null) { + lastValue = value; + } + } + + @Override + Object get() { + return lastValue; + } + + @Override + void reset() { + lastValue = null; + } + } + + /** + * {@link LabelFieldProducer} implementation for a last value label + */ + static class LabelLastValueFieldProducer extends LabelFieldProducer { + + LabelLastValueFieldProducer(String name) { + super(name, new LastValueLabel()); + } + + @Override + public Object value() { + return label().get(); + } + } + + /** + * Produce a collection of label field producers. + */ + static Map buildLabelFieldProducers(SearchExecutionContext context, String[] labelFields) { + final Map fields = new LinkedHashMap<>(); + for (String field : labelFields) { + LabelFieldProducer producer = new LabelLastValueFieldProducer(field); + fields.put(field, producer); + } + return Collections.unmodifiableMap(fields); + } +} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/MetricFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/MetricFieldProducer.java index 80d2de7d6401c..8fec3c7ab6e4c 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/MetricFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/MetricFieldProducer.java @@ -22,32 +22,29 @@ * values. Based on the supported metric types, the subclasses of this class compute values for * gauge and metric types. */ -abstract class MetricFieldProducer { - private final String field; - +abstract class MetricFieldProducer extends AbstractRollupFieldProducer { /** * a list of metrics that will be computed for the field */ private final List metrics; - private boolean isEmpty = true; - MetricFieldProducer(String field, List metrics) { - this.field = field; + MetricFieldProducer(String name, List metrics) { + super(name); this.metrics = metrics; } /** * Reset all values collected for the field */ - void reset() { + public void reset() { for (Metric metric : metrics) { metric.reset(); } isEmpty = true; } - public String field() { - return field; + public String name() { + return name; } /** return the list of metrics that are computed for the field */ @@ -56,19 +53,17 @@ public List metrics() { } /** Collect the value of a raw field and compute all downsampled metrics */ - public void collectMetric(Double value) { + @Override + public void collect(Number value) { for (MetricFieldProducer.Metric metric : metrics) { metric.collect(value); } isEmpty = false; } - public boolean isEmpty() { - return isEmpty; - } - /** * Return the downsampled value as computed after collecting all raw values. + * @return */ public abstract Object value(); @@ -83,7 +78,7 @@ protected Metric(String name) { this.name = name; } - abstract void collect(double number); + abstract void collect(Number number); abstract Number get(); @@ -101,8 +96,8 @@ static class Max extends Metric { } @Override - void collect(double value) { - this.max = max != null ? Math.max(value, max) : value; + void collect(Number value) { + this.max = max != null ? Math.max(value.doubleValue(), max) : value.doubleValue(); } @Override @@ -127,8 +122,8 @@ static class Min extends Metric { } @Override - void collect(double value) { - this.min = min != null ? Math.min(value, min) : value; + void collect(Number value) { + this.min = min != null ? Math.min(value.doubleValue(), min) : value.doubleValue(); } @Override @@ -153,8 +148,8 @@ static class Sum extends Metric { } @Override - void collect(double value) { - kahanSummation.add(value); + void collect(Number value) { + kahanSummation.add(value.doubleValue()); } @Override @@ -179,7 +174,7 @@ static class ValueCount extends Metric { } @Override - void collect(double value) { + void collect(Number value) { count++; } @@ -209,9 +204,9 @@ static class LastValue extends Metric { } @Override - void collect(double value) { + void collect(Number value) { if (lastValue == null) { - lastValue = value; + lastValue = value.doubleValue(); } } @@ -231,8 +226,8 @@ void reset() { */ static class CounterMetricFieldProducer extends MetricFieldProducer { - CounterMetricFieldProducer(String field) { - super(field, List.of(new LastValue())); + CounterMetricFieldProducer(String name) { + super(name, List.of(new LastValue())); } @Override @@ -247,8 +242,8 @@ public Object value() { */ static class GaugeMetricFieldProducer extends MetricFieldProducer { - GaugeMetricFieldProducer(String field) { - super(field, List.of(new Min(), new Max(), new Sum(), new ValueCount())); + GaugeMetricFieldProducer(String name) { + super(name, List.of(new Min(), new Max(), new Sum(), new ValueCount())); } @Override diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java index 17479e850bd5b..04a9c054b0332 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexService; @@ -54,7 +55,9 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.elasticsearch.core.Strings.format; @@ -83,7 +86,9 @@ class RollupShardIndexer { private final String[] dimensionFields; private final String[] metricFields; + private final String[] labelFields; private final List metricFieldFetchers; + private final List labelFieldFetchers; private final AtomicLong numSent = new AtomicLong(); private final AtomicLong numIndexed = new AtomicLong(); @@ -96,7 +101,8 @@ class RollupShardIndexer { String rollupIndex, RollupActionConfig config, String[] dimensionFields, - String[] metricFields + String[] metricFields, + String[] labelFields ) { this.client = client; this.indexShard = indexService.getShard(shardId.id()); @@ -104,6 +110,7 @@ class RollupShardIndexer { this.rollupIndex = rollupIndex; this.dimensionFields = dimensionFields; this.metricFields = metricFields; + this.labelFields = labelFields; this.searcher = indexShard.acquireSearcher("rollup"); Closeable toClose = searcher; @@ -119,7 +126,8 @@ class RollupShardIndexer { this.timestampField = searchExecutionContext.getFieldType(DataStreamTimestampFieldMapper.DEFAULT_PATH); this.timestampFormat = timestampField.docValueFormat(null, null); this.rounding = config.createRounding(); - this.metricFieldFetchers = FieldValueFetcher.build(searchExecutionContext, metricFields); + this.metricFieldFetchers = FieldValueFetcher.forMetrics(searchExecutionContext, metricFields); + this.labelFieldFetchers = FieldValueFetcher.forLabels(searchExecutionContext, labelFields); toClose = null; } finally { IOUtils.closeWhileHandlingException(toClose); @@ -224,8 +232,12 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag docCountProvider.setLeafReaderContext(ctx); final Map metricsFieldLeaves = new HashMap<>(); for (FieldValueFetcher fetcher : metricFieldFetchers) { - FormattedDocValues leafField = fetcher.getLeaf(ctx); - metricsFieldLeaves.put(fetcher.name(), leafField); + metricsFieldLeaves.put(fetcher.name(), fetcher.getLeaf(ctx)); + } + + final Map labelFieldLeaves = new HashMap<>(); + for (FieldValueFetcher fetcher : labelFieldFetchers) { + labelFieldLeaves.put(fetcher.name(), fetcher.getLeaf(ctx)); } return new LeafBucketCollector() { @@ -237,7 +249,10 @@ public void collect(int docId, long owningBucketOrd) throws IOException { boolean tsidChanged = tsid.equals(rollupBucketBuilder.tsid()) == false; if (tsidChanged || timestamp < lastHistoTimestamp) { - lastHistoTimestamp = rounding.round(timestamp); + lastHistoTimestamp = Math.max( + rounding.round(timestamp), + searchExecutionContext.getIndexSettings().getTimestampBounds().startTime() + ); } if (logger.isTraceEnabled()) { @@ -289,23 +304,23 @@ public void collect(int docId, long owningBucketOrd) throws IOException { final int docCount = docCountProvider.getDocCount(docId); rollupBucketBuilder.collectDocCount(docCount); - for (Map.Entry e : metricsFieldLeaves.entrySet()) { - String fieldName = e.getKey(); - FormattedDocValues leafField = e.getValue(); + for (Map.Entry e : Sets.union(metricsFieldLeaves.entrySet(), labelFieldLeaves.entrySet())) { + final String fieldName = e.getKey(); + final FormattedDocValues leafField = e.getValue(); if (leafField.advanceExact(docId)) { - for (int i = 0; i < leafField.docValueCount(); i++) { - // TODO: We should lazily load the doc_values for the metric. - // In cases such as counter metrics we only need the first (latest_value) - Object obj = leafField.nextValue(); - // TODO: Implement aggregate_metric_double for rollup of rollups - if (obj instanceof Number number) { - // Collect docs to rollup doc - rollupBucketBuilder.collectMetric(fieldName, number.doubleValue()); - } else { - throw new IllegalArgumentException("Expected [Number], got [" + obj.getClass() + "]"); + rollupBucketBuilder.collect(fieldName, leafField.docValueCount(), docValueCount -> { + final Object[] values = new Object[docValueCount]; + for (int i = 0; i < docValueCount; ++i) { + try { + values[i] = leafField.nextValue(); + } catch (IOException ex) { + throw new ElasticsearchException("Failed to read values for field [" + fieldName + "]"); + } + } - } + return values; + }); } } docsProcessed++; @@ -347,9 +362,11 @@ private class RollupBucketBuilder { private long timestamp; private int docCount; private final Map metricFieldProducers; + private final Map labelFieldProducers; RollupBucketBuilder() { this.metricFieldProducers = MetricFieldProducer.buildMetricFieldProducers(searchExecutionContext, metricFields); + this.labelFieldProducers = LabelFieldProducer.buildLabelFieldProducers(searchExecutionContext, labelFields); } /** @@ -366,7 +383,8 @@ public RollupBucketBuilder resetTsid(BytesRef tsid, long timestamp) { public RollupBucketBuilder resetTimestamp(long timestamp) { this.timestamp = timestamp; this.docCount = 0; - this.metricFieldProducers.values().stream().forEach(p -> p.reset()); + this.metricFieldProducers.values().forEach(MetricFieldProducer::reset); + this.labelFieldProducers.values().forEach(LabelFieldProducer::reset); if (logger.isTraceEnabled()) { logger.trace( "New bucket for _tsid: [{}], @timestamp: [{}]", @@ -374,12 +392,45 @@ public RollupBucketBuilder resetTimestamp(long timestamp) { timestampFormat.format(timestamp) ); } - return this; } - public void collectMetric(String field, double value) { - metricFieldProducers.get(field).collectMetric(value); + public void collect(final String field, int docValueCount, final Function fieldValues) { + final Object[] value = fieldValues.apply(docValueCount); + if (metricFieldProducers.containsKey(field)) { + // TODO: missing support for array metrics + collectMetric(field, value[0]); + } else if (labelFieldProducers.containsKey(field)) { + if (value.length == 1) { + collectLabel(field, value[0]); + } else { + collectLabel(field, value); + } + } else { + throw new IllegalArgumentException( + "Field '" + + field + + "' is not a label nor a metric, existing labels: [ " + + String.join(",", labelFieldProducers.keySet()) + + "], existing metrics: [" + + String.join(", ", metricFieldProducers.keySet()) + + "]" + ); + } + } + + private void collectLabel(final String field, final Object value) { + labelFieldProducers.get(field).collect(value); + } + + private void collectMetric(final String field, final Object value) { + if (value instanceof Number number) { + metricFieldProducers.get(field).collect(number); + } else { + throw new IllegalArgumentException( + "Expected numeric value for field '" + field + "' but got non numeric value: '" + value + "'" + ); + } } public void collectDocCount(int docCount) { @@ -394,7 +445,9 @@ public Map buildRollupDocument() { // Extract dimension values from _tsid field, so we avoid loading them from doc_values @SuppressWarnings("unchecked") Map dimensions = (Map) DocValueFormat.TIME_SERIES_ID.format(tsid); - Map doc = Maps.newLinkedHashMapWithExpectedSize(2 + dimensions.size() + metricFieldProducers.size()); + Map doc = Maps.newLinkedHashMapWithExpectedSize( + 2 + dimensions.size() + metricFieldProducers.size() + labelFieldProducers.size() + ); doc.put(timestampField.name(), timestampFormat.format(timestamp)); doc.put(DocCountFieldMapper.NAME, docCount); @@ -403,9 +456,16 @@ public Map buildRollupDocument() { doc.put(e.getKey(), e.getValue()); } - for (MetricFieldProducer fieldProducer : metricFieldProducers.values()) { + for (AbstractRollupFieldProducer fieldProducer : Stream.concat( + metricFieldProducers.values().stream(), + labelFieldProducers.values().stream() + ).toList()) { if (fieldProducer.isEmpty() == false) { - doc.put(fieldProducer.field(), fieldProducer.value()); + String field = fieldProducer.name(); + Object value = fieldProducer.value(); + if (value != null) { + doc.put(field, value); + } } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TimeseriesFieldTypeHelper.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TimeseriesFieldTypeHelper.java new file mode 100644 index 0000000000000..0bb73a5c5e290 --- /dev/null +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TimeseriesFieldTypeHelper.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.rollup.v2; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.TimeSeriesParams; +import org.elasticsearch.indices.IndicesService; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; + +import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_DIMENSION_PARAM; +import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM; + +class TimeseriesFieldTypeHelper { + + private final MapperService mapperService; + private final String timestampField; + + private TimeseriesFieldTypeHelper(final MapperService mapperService, final String timestampField) { + this.mapperService = mapperService; + this.timestampField = timestampField; + } + + public boolean isTimeSeriesLabel(final String field, final Map unused) { + final MappedFieldType fieldType = mapperService.mappingLookup().getFieldType(field); + return fieldType != null + && (timestampField.equals(field) == false) + && (fieldType.isAggregatable()) + && (fieldType.isDimension() == false) + && (mapperService.isMetadataField(field) == false); + } + + public boolean isTimeSeriesMetric(final String unused, final Map fieldMapping) { + final String metricType = (String) fieldMapping.get(TIME_SERIES_METRIC_PARAM); + return metricType != null + && Arrays.asList(TimeSeriesParams.MetricType.values()).contains(TimeSeriesParams.MetricType.valueOf(metricType)); + } + + public boolean isTimeSeriesDimension(final String unused, final Map fieldMapping) { + return Boolean.TRUE.equals(fieldMapping.get(TIME_SERIES_DIMENSION_PARAM)); + } + + static class Builder { + private final IndicesService indicesService; + private final Map indexMapping; + private final IndexMetadata indexMetadata; + + Builder(final IndicesService indicesService, final Map indexMapping, final IndexMetadata indexMetadata) { + this.indicesService = indicesService; + this.indexMapping = indexMapping; + this.indexMetadata = indexMetadata; + } + + public TimeseriesFieldTypeHelper build(final String timestampField) throws IOException { + final MapperService mapperService = indicesService.createIndexMapperServiceForValidation(indexMetadata); + final CompressedXContent sourceIndexCompressedXContent = new CompressedXContent(indexMapping); + mapperService.merge(MapperService.SINGLE_MAPPING_NAME, sourceIndexCompressedXContent, MapperService.MergeReason.INDEX_TEMPLATE); + return new TimeseriesFieldTypeHelper(mapperService, timestampField); + } + } +} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java index e896e648d9583..36af383d42588 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java @@ -10,15 +10,15 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.stats.MappingVisitor; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; -import org.elasticsearch.action.fieldcaps.FieldCapabilities; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; @@ -47,7 +48,9 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.TimeSeriesParams; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; @@ -63,10 +66,12 @@ import org.elasticsearch.xpack.core.rollup.action.RollupIndexerAction; import java.io.IOException; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM; + /** * The master rollup action that coordinates * - creating the rollup index @@ -78,6 +83,7 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction private static final Logger logger = LogManager.getLogger(TransportRollupAction.class); private final Client client; + private final IndicesService indicesService; private final ClusterService clusterService; private final MetadataCreateIndexService metadataCreateIndexService; private final IndexScopedSettings indexScopedSettings; @@ -104,6 +110,7 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction @Inject public TransportRollupAction( Client client, + IndicesService indicesService, ClusterService clusterService, TransportService transportService, ThreadPool threadPool, @@ -123,6 +130,7 @@ public TransportRollupAction( ThreadPool.Names.SAME ); this.client = new OriginSettingClient(client, ClientHelper.ROLLUP_ORIGIN); + this.indicesService = indicesService; this.clusterService = clusterService; this.metadataCreateIndexService = metadataCreateIndexService; this.indexScopedSettings = indexScopedSettings; @@ -174,49 +182,53 @@ protected void masterOperation( MetadataCreateIndexService.validateIndexName(rollupIndexName, state); // Rollup will perform the following tasks: - // 1. Extract rollup config from source index field caps - // 2. Create the rollup index - // 3. Run rollup indexer - // 4. Make rollup index read-only and set replicas - // 5. Refresh rollup index - // 6. Mark rollup index as "completed successfully" - // 7. Force-merge the rollup index to a single segment + // 1. Extract source index mappings + // 2. Extract rollup config from index mappings + // 3. Create the rollup index + // 4. Run rollup indexer + // 5. Make rollup index read-only and set replicas + // 6. Refresh rollup index + // 7. Mark rollup index as "completed successfully" + // 8. Force-merge the rollup index to a single segment // At any point if there is an issue, delete the rollup index - // 1. Extract rollup config from source index field caps - FieldCapabilitiesRequest fieldCapsRequest = new FieldCapabilitiesRequest().indices(sourceIndexName).fields("*"); + // 1. Extract source index mappings final TaskId parentTask = new TaskId(clusterService.localNode().getId(), task.getId()); - fieldCapsRequest.setParentTask(parentTask); - client.fieldCaps(fieldCapsRequest, ActionListener.wrap(fieldCapsResponse -> { - final Map dimensionFieldCaps = new HashMap<>(); - final Map metricFieldCaps = new HashMap<>(); - for (Map.Entry> e : fieldCapsResponse.get().entrySet()) { - String field = e.getKey(); - /* - * Rollup runs on a single index, and we do not expect multiple mappings for the same - * field. So, it is safe to select the first and only value of the FieldCapsResponse - * by running: e.getValue().values().iterator().next() - */ - if (e.getValue().size() != 1) { - throw new IllegalStateException( - "Cannot parse mapping for field [" + field + "] at source index [" + sourceIndexName + "]" - ); - } - FieldCapabilities fieldCaps = e.getValue().values().iterator().next(); - if (fieldCaps.isDimension()) { - dimensionFieldCaps.put(field, fieldCaps); - } else if (e.getValue().values().iterator().next().getMetricType() != null) { - metricFieldCaps.put(field, fieldCaps); - } else { - // TODO: Field is not a dimension or a metric. Treat it as a tag + final GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(sourceIndexName); + getMappingsRequest.setParentTask(parentTask); + client.admin().indices().getMappings(getMappingsRequest, ActionListener.wrap(getMappingsResponse -> { + final Map sourceIndexMappings = getMappingsResponse.mappings() + .entrySet() + .stream() + .filter(entry -> sourceIndexName.equals(entry.getKey())) + .findFirst() + .map(mappingMetadata -> mappingMetadata.getValue().sourceAsMap()) + .orElseThrow(() -> new IllegalArgumentException("No mapping found for rollup source index [" + sourceIndexName + "]")); + + // 2. Extract rollup config from index mappings + final List dimensionFields = new ArrayList<>(); + final List metricFields = new ArrayList<>(); + final List labelFields = new ArrayList<>(); + final TimeseriesFieldTypeHelper helper = new TimeseriesFieldTypeHelper.Builder( + indicesService, + sourceIndexMappings, + sourceIndexMetadata + ).build(request.getRollupConfig().getTimestampField()); + MappingVisitor.visitMapping(sourceIndexMappings, (field, mapping) -> { + if (helper.isTimeSeriesDimension(field, mapping)) { + dimensionFields.add(field); + } else if (helper.isTimeSeriesMetric(field, mapping)) { + metricFields.add(field); + } else if (helper.isTimeSeriesLabel(field, mapping)) { + labelFields.add(field); } - } + }); RollupActionRequestValidationException validationException = new RollupActionRequestValidationException(); - if (dimensionFieldCaps.isEmpty()) { + if (dimensionFields.isEmpty()) { validationException.addValidationError("Index [" + sourceIndexName + "] does not contain any dimension fields"); } - if (metricFieldCaps.isEmpty()) { + if (metricFields.isEmpty()) { validationException.addValidationError("Index [" + sourceIndexName + "] does not contain any metric fields"); } @@ -225,22 +237,26 @@ protected void masterOperation( return; } + final MapperService mapperService = indicesService.createIndexMapperServiceForValidation(sourceIndexMetadata); + final CompressedXContent sourceIndexCompressedXContent = new CompressedXContent(sourceIndexMappings); + mapperService.merge(MapperService.SINGLE_MAPPING_NAME, sourceIndexCompressedXContent, MapperService.MergeReason.INDEX_TEMPLATE); + final String mapping; try { - mapping = createRollupIndexMapping(request.getRollupConfig(), dimensionFieldCaps, metricFieldCaps); + mapping = createRollupIndexMapping(helper, request.getRollupConfig(), mapperService, sourceIndexMappings); } catch (IOException e) { listener.onFailure(e); return; } - - // 2. Create rollup index + // 3. Create rollup index createRollupIndex(rollupIndexName, sourceIndexMetadata, mapping, request, ActionListener.wrap(createIndexResp -> { if (createIndexResp.isAcknowledged()) { // 3. Rollup index created. Run rollup indexer RollupIndexerAction.Request rollupIndexerRequest = new RollupIndexerAction.Request( request, - dimensionFieldCaps.keySet().toArray(new String[0]), - metricFieldCaps.keySet().toArray(new String[0]) + dimensionFields.toArray(new String[0]), + metricFields.toArray(new String[0]), + labelFields.toArray(new String[0]) ); rollupIndexerRequest.setParentTask(parentTask); client.execute(RollupIndexerAction.INSTANCE, rollupIndexerRequest, ActionListener.wrap(indexerResp -> { @@ -269,8 +285,8 @@ protected void masterOperation( mergeIndexResp -> listener.onResponse(AcknowledgedResponse.TRUE), e -> { /* - * At this point rollup has been created successfully even if force-merge - * fails. So, we should not fail the rollup operation. + * At this point rollup has been created successfully even if + * force-merge fails. So, we should not fail the rollup operation. */ logger.error( "Failed to force-merge rollup index [" + rollupIndexName + "]", @@ -367,68 +383,87 @@ protected ClusterBlockException checkBlock(RollupAction.Request request, Cluster * rollup configuration. * * @param config the rollup configuration - * @param dimensionFieldCaps a map with the field name as key and the fields caps response as value - * for the dimension fields of the source index - * @param metricFieldCaps a map with the field name as key and the fields caps response as value - * for the metric fields of the source index - * + * @param sourceIndexMappings a map with the source index mapping * @return the mapping of the rollup index */ public static String createRollupIndexMapping( + final TimeseriesFieldTypeHelper helper, final RollupActionConfig config, - final Map dimensionFieldCaps, - final Map metricFieldCaps + final MapperService mapperService, + final Map sourceIndexMappings ) throws IOException { - XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); - builder = getDynamicTemplates(builder); + final XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + + addDynamicTemplates(builder); builder.startObject("properties"); - String timestampField = config.getTimestampField(); - String dateIntervalType = config.getIntervalType(); - String dateInterval = config.getInterval().toString(); - String tz = config.getTimeZone(); + addTimestampField(config, builder); + addMetricFields(helper, sourceIndexMappings, builder); + + builder.endObject(); // match initial startObject + builder.endObject(); // match startObject("properties") + + final CompressedXContent rollupDiffXContent = CompressedXContent.fromJSON( + XContentHelper.convertToJson(BytesReference.bytes(builder), false, XContentType.JSON) + ); + return mapperService.merge(MapperService.SINGLE_MAPPING_NAME, rollupDiffXContent, MapperService.MergeReason.INDEX_TEMPLATE) + .mappingSource() + .uncompressed() + .utf8ToString(); + } + + private static void addMetricFields( + final TimeseriesFieldTypeHelper helper, + final Map sourceIndexMappings, + final XContentBuilder builder + ) { + MappingVisitor.visitMapping(sourceIndexMappings, (field, mapping) -> { + if (helper.isTimeSeriesMetric(field, mapping)) { + try { + addMetricFieldMapping(builder, field, mapping); + } catch (IOException e) { + throw new ElasticsearchException("Error while adding metric for field [" + field + "]"); + } + } + }); + } + + private static void addTimestampField(final RollupActionConfig config, final XContentBuilder builder) throws IOException { + final String timestampField = config.getTimestampField(); + final String dateIntervalType = config.getIntervalType(); + final String dateInterval = config.getInterval().toString(); + final String timezone = config.getTimeZone(); builder.startObject(timestampField) .field("type", DateFieldMapper.CONTENT_TYPE) .startObject("meta") .field(dateIntervalType, dateInterval) - .field(RollupActionConfig.TIME_ZONE, tz) + .field(RollupActionConfig.TIME_ZONE, timezone) .endObject() .endObject(); + } - for (Map.Entry e : dimensionFieldCaps.entrySet()) { - builder.startObject(e.getKey()) - .field("type", e.getValue().getType()) - .field(TimeSeriesParams.TIME_SERIES_DIMENSION_PARAM, true) + private static void addMetricFieldMapping(final XContentBuilder builder, final String field, final Map fieldProperties) + throws IOException { + final TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.valueOf( + fieldProperties.get(TIME_SERIES_METRIC_PARAM).toString() + ); + if (TimeSeriesParams.MetricType.counter.equals(metricType)) { + // For counters, we keep the same field type, because they store + // only one value (the last value of the counter) + builder.startObject(field).field("type", fieldProperties.get("type")).field(TIME_SERIES_METRIC_PARAM, metricType).endObject(); + } else { + final List supportedAggs = List.of(metricType.supportedAggs()); + // We choose max as the default metric + final String defaultMetric = supportedAggs.contains("max") ? "max" : supportedAggs.get(0); + builder.startObject(field) + .field("type", AggregateDoubleMetricFieldMapper.CONTENT_TYPE) + .stringListField(AggregateDoubleMetricFieldMapper.Names.METRICS, supportedAggs) + .field(AggregateDoubleMetricFieldMapper.Names.DEFAULT_METRIC, defaultMetric) + .field(TIME_SERIES_METRIC_PARAM, metricType) .endObject(); } - - for (Map.Entry e : metricFieldCaps.entrySet()) { - TimeSeriesParams.MetricType metricType = e.getValue().getMetricType(); - if (metricType == TimeSeriesParams.MetricType.counter) { - // For counters we keep the same field type, because they store - // only one value (the last value of the counter) - builder.startObject(e.getKey()) - .field("type", e.getValue().getType()) - .field(TimeSeriesParams.TIME_SERIES_METRIC_PARAM, metricType) - .endObject(); - } else { - List aggs = List.of(metricType.supportedAggs()); - // We choose max as the default metric - String defaultMetric = aggs.contains("max") ? "max" : aggs.get(0); - builder.startObject(e.getKey()) - .field("type", AggregateDoubleMetricFieldMapper.CONTENT_TYPE) - .stringListField(AggregateDoubleMetricFieldMapper.Names.METRICS, aggs) - .field(AggregateDoubleMetricFieldMapper.Names.DEFAULT_METRIC, defaultMetric) - .field(TimeSeriesParams.TIME_SERIES_METRIC_PARAM, metricType) - .endObject(); - } - } - - builder.endObject(); - builder.endObject(); - return XContentHelper.convertToJson(BytesReference.bytes(builder), false, XContentType.JSON); } /** @@ -473,8 +508,8 @@ private IndexMetadata.Builder copyIndexMetadata(IndexMetadata sourceIndexMetadat /** * Configure the dynamic templates to always map strings to the keyword field type. */ - private static XContentBuilder getDynamicTemplates(XContentBuilder builder) throws IOException { - return builder.startArray("dynamic_templates") + private static void addDynamicTemplates(final XContentBuilder builder) throws IOException { + builder.startArray("dynamic_templates") .startObject() .startObject("strings") .field("match_mapping_type", "string") diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupIndexerAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupIndexerAction.java index 8a0729718e0a0..76d07e827a8a4 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupIndexerAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupIndexerAction.java @@ -131,7 +131,8 @@ protected RollupIndexerAction.ShardRollupResponse shardOperation(RollupIndexerAc request.getRollupIndex(), request.getRollupConfig(), request.getDimensionFields(), - request.getMetricFields() + request.getMetricFields(), + request.getLabelFields() ); return indexer.execute(); } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/LabelFieldProducerTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/LabelFieldProducerTests.java new file mode 100644 index 0000000000000..8ac6df94131bf --- /dev/null +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/LabelFieldProducerTests.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.rollup.v2; + +import org.elasticsearch.search.aggregations.AggregatorTestCase; + +public class LabelFieldProducerTests extends AggregatorTestCase { + + public void testLastValueKeywordLabel() { + final LabelFieldProducer.Label label = new LabelFieldProducer.LastValueLabel(); + label.collect("aaa"); + label.collect("bbb"); + label.collect("ccc"); + assertEquals("aaa", label.get()); + label.reset(); + assertNull(label.get()); + } + + public void testLastValueDoubleLabel() { + final LabelFieldProducer.Label label = new LabelFieldProducer.LastValueLabel(); + label.collect(10.20D); + label.collect(17.30D); + label.collect(12.60D); + assertEquals(10.20D, label.get()); + label.reset(); + assertNull(label.get()); + } + + public void testLastValueIntegerLabel() { + final LabelFieldProducer.Label label = new LabelFieldProducer.LastValueLabel(); + label.collect(10); + label.collect(17); + label.collect(12); + assertEquals(10, label.get()); + label.reset(); + assertNull(label.get()); + } + + public void testLastValueLongLabel() { + final LabelFieldProducer.Label label = new LabelFieldProducer.LastValueLabel(); + label.collect(10L); + label.collect(17L); + label.collect(12L); + assertEquals(10L, label.get()); + label.reset(); + assertNull(label.get()); + } + + public void testLastValueBooleanLabel() { + final LabelFieldProducer.Label label = new LabelFieldProducer.LastValueLabel(); + label.collect(true); + label.collect(false); + label.collect(true); + assertEquals(true, label.get()); + label.reset(); + assertNull(label.get()); + } + + public void testLabelFieldProducer() { + final LabelFieldProducer producer = new LabelFieldProducer.LabelLastValueFieldProducer("dummy"); + assertTrue(producer.isEmpty()); + assertEquals("dummy", producer.name()); + assertEquals("last_value", producer.label().name); + producer.collect("aaaa"); + assertFalse(producer.isEmpty()); + assertEquals("aaaa", producer.value()); + producer.reset(); + assertTrue(producer.isEmpty()); + assertNull(producer.value()); + } +} diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/MetricFieldProducerTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/MetricFieldProducerTests.java index e0f97315b7082..670858486f3b0 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/MetricFieldProducerTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/MetricFieldProducerTests.java @@ -122,22 +122,22 @@ public void testLastValueMetric() { public void testCounterMetricFieldProducer() { MetricFieldProducer producer = new MetricFieldProducer.CounterMetricFieldProducer("field"); assertTrue(producer.isEmpty()); - producer.collectMetric(55.0); - producer.collectMetric(12.2); - producer.collectMetric(5.5); + producer.collect(55.0); + producer.collect(12.2); + producer.collect(5.5); assertFalse(producer.isEmpty()); Object o = producer.value(); assertEquals(55.0, o); - assertEquals("field", producer.field()); + assertEquals("field", producer.name()); } public void testGaugeMetricFieldProducer() { MetricFieldProducer producer = new MetricFieldProducer.GaugeMetricFieldProducer("field"); assertTrue(producer.isEmpty()); - producer.collectMetric(55.0); - producer.collectMetric(12.2); - producer.collectMetric(5.5); + producer.collect(55.0); + producer.collect(12.2); + producer.collect(5.5); assertFalse(producer.isEmpty()); Object o = producer.value(); @@ -148,7 +148,7 @@ public void testGaugeMetricFieldProducer() { } else { fail("Value is not a Map"); } - assertEquals("field", producer.field()); + assertEquals("field", producer.name()); } public void testBuildMetricProducers() { diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java index 4378280cfdb17..245784b21f369 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java @@ -6,13 +6,15 @@ */ package org.elasticsearch.xpack.rollup.v2; -import org.apache.lucene.tests.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.cluster.stats.MappingVisitor; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.action.admin.indices.rollover.RolloverResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; @@ -22,15 +24,15 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.datastreams.CreateDataStreamAction; import org.elasticsearch.action.datastreams.GetDataStreamAction; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.datastreams.DataStreamsPlugin; @@ -42,17 +44,26 @@ import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.index.mapper.TimeSeriesParams; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; -import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; -import org.elasticsearch.search.aggregations.bucket.composite.DateHistogramValuesSourceBuilder; -import org.elasticsearch.search.aggregations.bucket.composite.InternalComposite; -import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.InternalTopHits; +import org.elasticsearch.search.aggregations.metrics.Max; import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; +import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; @@ -72,17 +83,22 @@ import java.io.IOException; import java.time.Instant; import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collection; +import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; public class RollupActionSingleNodeTests extends ESSingleNodeTestCase { @@ -92,6 +108,18 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase { public static final String FIELD_DIMENSION_2 = "dimension_long"; public static final String FIELD_NUMERIC_1 = "numeric_1"; public static final String FIELD_NUMERIC_2 = "numeric_2"; + public static final String FIELD_METRIC_LABEL_DOUBLE = "metric_label_double"; + public static final String FIELD_LABEL_DOUBLE = "label_double"; + public static final String FIELD_LABEL_INTEGER = "label_integer"; + public static final String FIELD_LABEL_KEYWORD = "label_keyword"; + public static final String FIELD_LABEL_TEXT = "label_text"; + public static final String FIELD_LABEL_BOOLEAN = "label_boolean"; + public static final String FIELD_LABEL_IPv4_ADDRESS = "label_ipv4_address"; + public static final String FIELD_LABEL_IPv6_ADDRESS = "label_ipv6_address"; + public static final String FIELD_LABEL_DATE = "label_date"; + public static final String FIELD_LABEL_UNMAPPED = "label_unmapped"; + public static final String FIELD_LABEL_KEYWORD_ARRAY = "label_keyword_array"; + public static final String FIELD_LABEL_DOUBLE_ARRAY = "label_double_array"; private static final int MAX_DIM_VALUES = 5; private static final long MAX_NUM_BUCKETS = 10; @@ -128,6 +156,13 @@ public void setup() { dimensionValues.add(randomAlphaOfLength(6)); } + /** + * NOTE: here we map each numeric label field also as a (counter) metric. + * This is done for testing purposes. There is no easy way to test + * that labels are collected using the last value. The idea is to + * check that the value of the label (last value) matches the value + * of the corresponding metric which uses a last_value metric type. + */ client().admin() .indices() .prepareCreate(sourceIndex) @@ -137,7 +172,10 @@ public void setup() { .put("index.number_of_replicas", numOfReplicas) .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES) .putList(IndexMetadata.INDEX_ROUTING_PATH.getKey(), List.of(FIELD_DIMENSION_1)) - .put(IndexSettings.TIME_SERIES_START_TIME.getKey(), Instant.ofEpochMilli(startTime).toString()) + .put( + IndexSettings.TIME_SERIES_START_TIME.getKey(), + DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(Instant.ofEpochMilli(startTime).toEpochMilli()) + ) .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), "2106-01-08T23:40:53.384Z") .build() ) @@ -151,7 +189,29 @@ public void setup() { FIELD_NUMERIC_1, "type=long,time_series_metric=gauge", FIELD_NUMERIC_2, - "type=double,time_series_metric=counter" + "type=double,time_series_metric=counter", + FIELD_LABEL_DOUBLE, + "type=double", + FIELD_LABEL_INTEGER, + "type=integer", + FIELD_LABEL_KEYWORD, + "type=keyword", + FIELD_LABEL_TEXT, + "type=text", + FIELD_LABEL_BOOLEAN, + "type=boolean", + FIELD_METRIC_LABEL_DOUBLE, /* numeric label indexed as a metric */ + "type=double,time_series_metric=counter", + FIELD_LABEL_IPv4_ADDRESS, + "type=ip", + FIELD_LABEL_IPv6_ADDRESS, + "type=ip", + FIELD_LABEL_DATE, + "type=date,format=date_optional_time", + FIELD_LABEL_KEYWORD_ARRAY, + "type=keyword", + FIELD_LABEL_DOUBLE_ARRAY, + "type=double" ) .get(); } @@ -160,6 +220,22 @@ public void testRollupIndex() throws IOException { RollupActionConfig config = new RollupActionConfig(randomInterval()); SourceSupplier sourceSupplier = () -> { String ts = randomDateForInterval(config.getInterval()); + double labelDoubleValue = DATE_FORMATTER.parseMillis(ts); + int labelIntegerValue = randomInt(); + long labelLongValue = randomLong(); + String labelIpv4Address = NetworkAddress.format(randomIp(true)); + String labelIpv6Address = NetworkAddress.format(randomIp(false)); + Date labelDateValue = randomDate(); + int keywordArraySize = randomIntBetween(3, 10); + String[] keywordArray = new String[keywordArraySize]; + for (int i = 0; i < keywordArraySize; ++i) { + keywordArray[i] = randomAlphaOfLength(10); + } + int doubleArraySize = randomIntBetween(3, 10); + double[] doubleArray = new double[doubleArraySize]; + for (int i = 0; i < doubleArraySize; ++i) { + doubleArray[i] = randomDouble(); + } return XContentFactory.jsonBuilder() .startObject() .field(FIELD_TIMESTAMP, ts) @@ -167,6 +243,18 @@ public void testRollupIndex() throws IOException { // .field(FIELD_DIMENSION_2, randomIntBetween(1, 10)) //TODO: Fix _tsid format issue and then enable this .field(FIELD_NUMERIC_1, randomInt()) .field(FIELD_NUMERIC_2, DATE_FORMATTER.parseMillis(ts)) + .field(FIELD_LABEL_DOUBLE, labelDoubleValue) + .field(FIELD_METRIC_LABEL_DOUBLE, labelDoubleValue) + .field(FIELD_LABEL_INTEGER, labelIntegerValue) + .field(FIELD_LABEL_KEYWORD, ts) + .field(FIELD_LABEL_UNMAPPED, randomBoolean() ? labelLongValue : labelDoubleValue) + .field(FIELD_LABEL_TEXT, ts) + .field(FIELD_LABEL_BOOLEAN, randomBoolean()) + .field(FIELD_LABEL_IPv4_ADDRESS, labelIpv4Address) + .field(FIELD_LABEL_IPv6_ADDRESS, labelIpv6Address) + .field(FIELD_LABEL_DATE, labelDateValue) + .field(FIELD_LABEL_KEYWORD_ARRAY, keywordArray) + .field(FIELD_LABEL_DOUBLE_ARRAY, doubleArray) .endObject(); }; bulkIndex(sourceSupplier); @@ -175,6 +263,19 @@ public void testRollupIndex() throws IOException { assertRollupIndex(sourceIndex, rollupIndex, config); } + private Date randomDate() { + int randomYear = randomIntBetween(1970, 2020); + int randomMonth = randomIntBetween(1, 12); + int randomDayOfMonth = randomIntBetween(1, 28); + int randomHour = randomIntBetween(0, 23); + int randomMinute = randomIntBetween(0, 59); + int randomSecond = randomIntBetween(0, 59); + return Date.from( + ZonedDateTime.of(randomYear, randomMonth, randomDayOfMonth, randomHour, randomMinute, randomSecond, 0, ZoneOffset.UTC) + .toInstant() + ); + } + public void testCopyIndexSettings() throws IOException { Settings settings = Settings.builder() .put(LifecycleSettings.LIFECYCLE_NAME, randomAlphaOfLength(5)) @@ -233,7 +334,6 @@ public void testNullRollupConfig() { assertThat(exception.getMessage(), containsString("rollup configuration is missing")); } - @LuceneTestCase.AwaitsFix(bugUrl = "TODO: Fix this") public void testRollupSparseMetrics() throws IOException { RollupActionConfig config = new RollupActionConfig(randomInterval()); SourceSupplier sourceSupplier = () -> { @@ -268,7 +368,7 @@ public void testCannotRollupToExistingIndex() throws Exception { assertThat(exception.getMessage(), containsString(rollupIndex)); } - public void testRollupEmptyIndex() { + public void testRollupEmptyIndex() throws IOException { RollupActionConfig config = new RollupActionConfig(randomInterval()); // Source index has been created in the setup() method prepareSourceIndex(sourceIndex); @@ -439,69 +539,180 @@ private RolloverResponse rollover(String dataStreamName) throws ExecutionExcepti return response; } + private Aggregations aggregate(final String index, AggregationBuilder aggregationBuilder) { + return client().prepareSearch(index).addAggregation(aggregationBuilder).get().getAggregations(); + } + @SuppressWarnings("unchecked") - private void assertRollupIndex(String sourceIndex, String rollupIndex, RollupActionConfig config) { + private void assertRollupIndex(String sourceIndex, String rollupIndex, RollupActionConfig config) throws IOException { // Retrieve field information for the metric fields - FieldCapabilitiesResponse fieldCapsResponse = client().prepareFieldCaps(sourceIndex).setFields("*").get(); - Map metricFields = fieldCapsResponse.get() + final GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings(sourceIndex).get(); + final Map sourceIndexMappings = getMappingsResponse.mappings() .entrySet() .stream() - .filter(e -> e.getValue().values().iterator().next().getMetricType() != null) - .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().values().iterator().next().getMetricType())); - - final CompositeAggregationBuilder aggregation = buildCompositeAggs("resp", config, metricFields); - long numBuckets = 0; - InternalComposite origResp = client().prepareSearch(sourceIndex).addAggregation(aggregation).get().getAggregations().get("resp"); - InternalComposite rollupResp = client().prepareSearch(rollupIndex).addAggregation(aggregation).get().getAggregations().get("resp"); - while (origResp.afterKey() != null) { - numBuckets += origResp.getBuckets().size(); - assertEquals(origResp, rollupResp); - aggregation.aggregateAfter(origResp.afterKey()); - origResp = client().prepareSearch(sourceIndex).addAggregation(aggregation).get().getAggregations().get("resp"); - rollupResp = client().prepareSearch(rollupIndex).addAggregation(aggregation).get().getAggregations().get("resp"); - } - assertEquals(origResp, rollupResp); + .filter(entry -> sourceIndex.equals(entry.getKey())) + .findFirst() + .map(mappingMetadata -> mappingMetadata.getValue().sourceAsMap()) + .orElseThrow(() -> new IllegalArgumentException("No mapping found for rollup source index [" + sourceIndex + "]")); + + IndexMetadata indexMetadata = client().admin().cluster().prepareState().get().getState().getMetadata().index(sourceIndex); + TimeseriesFieldTypeHelper helper = new TimeseriesFieldTypeHelper.Builder( + getInstanceFromNode(IndicesService.class), + sourceIndexMappings, + indexMetadata + ).build(config.getTimestampField()); + Map metricFields = new HashMap<>(); + Map labelFields = new HashMap<>(); + MappingVisitor.visitMapping(sourceIndexMappings, (field, fieldMapping) -> { + if (helper.isTimeSeriesMetric(field, fieldMapping)) { + metricFields.put(field, TimeSeriesParams.MetricType.valueOf(fieldMapping.get(TIME_SERIES_METRIC_PARAM).toString())); + } else if (helper.isTimeSeriesLabel(field, fieldMapping)) { + labelFields.put(field, fieldMapping.get("type").toString()); + } + }); - SearchResponse resp = client().prepareSearch(rollupIndex).setTrackTotalHits(true).get(); - assertThat(resp.getHits().getTotalHits().value, equalTo(numBuckets)); + assertRollupIndexAggregations(sourceIndex, rollupIndex, config, metricFields, labelFields); GetIndexResponse indexSettingsResp = client().admin().indices().prepareGetIndex().addIndices(sourceIndex, rollupIndex).get(); - // Assert rollup metadata are set in index settings - assertEquals("success", indexSettingsResp.getSetting(rollupIndex, "index.rollup.status")); - assertEquals( - indexSettingsResp.getSetting(sourceIndex, "index.uuid"), - indexSettingsResp.getSetting(rollupIndex, "index.rollup.source.uuid") - ); - assertEquals(sourceIndex, indexSettingsResp.getSetting(rollupIndex, "index.rollup.source.name")); - assertEquals(indexSettingsResp.getSetting(sourceIndex, "index.mode"), indexSettingsResp.getSetting(rollupIndex, "index.mode")); - assertEquals( - indexSettingsResp.getSetting(sourceIndex, "time_series.start_time"), - indexSettingsResp.getSetting(rollupIndex, "time_series.start_time") - ); - assertEquals( - indexSettingsResp.getSetting(sourceIndex, "time_series.end_time"), - indexSettingsResp.getSetting(rollupIndex, "time_series.end_time") - ); - assertEquals( - indexSettingsResp.getSetting(sourceIndex, "index.routing_path"), - indexSettingsResp.getSetting(rollupIndex, "index.routing_path") - ); - assertEquals( - indexSettingsResp.getSetting(sourceIndex, "index.number_of_shards"), - indexSettingsResp.getSetting(rollupIndex, "index.number_of_shards") - ); - assertEquals( - indexSettingsResp.getSetting(sourceIndex, "index.number_of_replicas"), - indexSettingsResp.getSetting(rollupIndex, "index.number_of_replicas") - ); - assertEquals("true", indexSettingsResp.getSetting(rollupIndex, "index.blocks.write")); + assertRollupIndexSettings(sourceIndex, rollupIndex, indexSettingsResp); - // Assert field mappings Map> mappings = (Map>) indexSettingsResp.getMappings() .get(rollupIndex) .getSourceAsMap() .get("properties"); + assertFieldMappings(config, metricFields, mappings); + + GetMappingsResponse indexMappings = client().admin() + .indices() + .getMappings(new GetMappingsRequest().indices(rollupIndex, sourceIndex)) + .actionGet(); + Map rollupIndexProperties = (Map) indexMappings.mappings() + .get(rollupIndex) + .sourceAsMap() + .get("properties"); + Map sourceIndexCloneProperties = (Map) indexMappings.mappings() + .get(sourceIndex) + .sourceAsMap() + .get("properties"); + List> labelFieldRollupIndexCloneProperties = (rollupIndexProperties.entrySet() + .stream() + .filter(entry -> labelFields.containsKey(entry.getKey())) + .toList()); + List> labelFieldSourceIndexProperties = (sourceIndexCloneProperties.entrySet() + .stream() + .filter(entry -> labelFields.containsKey(entry.getKey())) + .toList()); + assertEquals(labelFieldRollupIndexCloneProperties, labelFieldSourceIndexProperties); + } + + private void assertRollupIndexAggregations( + String sourceIndex, + String rollupIndex, + RollupActionConfig config, + Map metricFields, + Map labelFields + ) { + final AggregationBuilder aggregations = buildAggregations(config, metricFields, labelFields, config.getTimestampField()); + Aggregations origResp = aggregate(sourceIndex, aggregations); + Aggregations rollupResp = aggregate(rollupIndex, aggregations); + assertEquals(origResp.asMap().keySet(), rollupResp.asMap().keySet()); + + StringTerms originalTsIdTermsAggregation = (StringTerms) origResp.getAsMap().values().stream().toList().get(0); + StringTerms rollupTsIdTermsAggregation = (StringTerms) rollupResp.getAsMap().values().stream().toList().get(0); + originalTsIdTermsAggregation.getBuckets().forEach(originalBucket -> { + + StringTerms.Bucket rollupBucket = rollupTsIdTermsAggregation.getBucketByKey(originalBucket.getKeyAsString()); + assertEquals(originalBucket.getAggregations().asList().size(), rollupBucket.getAggregations().asList().size()); + + InternalDateHistogram originalDateHistogram = (InternalDateHistogram) originalBucket.getAggregations().asList().get(0); + InternalDateHistogram rollupDateHistogram = (InternalDateHistogram) rollupBucket.getAggregations().asList().get(0); + List originalDateHistogramBuckets = originalDateHistogram.getBuckets(); + List rollupDateHistogramBuckets = rollupDateHistogram.getBuckets(); + assertEquals(originalDateHistogramBuckets.size(), rollupDateHistogramBuckets.size()); + assertEquals( + originalDateHistogramBuckets.stream().map(InternalDateHistogram.Bucket::getKeyAsString).collect(Collectors.toList()), + rollupDateHistogramBuckets.stream().map(InternalDateHistogram.Bucket::getKeyAsString).collect(Collectors.toList()) + ); + + for (int i = 0; i < originalDateHistogramBuckets.size(); ++i) { + InternalDateHistogram.Bucket originalDateHistogramBucket = originalDateHistogramBuckets.get(i); + InternalDateHistogram.Bucket rollupDateHistogramBucket = rollupDateHistogramBuckets.get(i); + assertEquals(originalDateHistogramBucket.getKeyAsString(), rollupDateHistogramBucket.getKeyAsString()); + + Aggregations originalAggregations = originalDateHistogramBucket.getAggregations(); + Aggregations rollupAggregations = rollupDateHistogramBucket.getAggregations(); + assertEquals(originalAggregations.asList().size(), rollupAggregations.asList().size()); + + List nonTopHitsOriginalAggregations = originalAggregations.asList() + .stream() + .filter(agg -> agg.getType().equals("top_hits") == false) + .toList(); + List nonTopHitsRollupAggregations = rollupAggregations.asList() + .stream() + .filter(agg -> agg.getType().equals("top_hits") == false) + .toList(); + assertEquals(nonTopHitsOriginalAggregations, nonTopHitsRollupAggregations); + + List topHitsOriginalAggregations = originalAggregations.asList() + .stream() + .filter(agg -> agg.getType().equals("top_hits")) + .toList(); + List topHitsRollupAggregations = rollupAggregations.asList() + .stream() + .filter(agg -> agg.getType().equals("top_hits")) + .toList(); + assertEquals(topHitsRollupAggregations.size(), topHitsRollupAggregations.size()); + + for (int j = 0; j < topHitsRollupAggregations.size(); ++j) { + InternalTopHits originalTopHits = (InternalTopHits) topHitsOriginalAggregations.get(j); + InternalTopHits rollupTopHits = (InternalTopHits) topHitsRollupAggregations.get(j); + SearchHit[] originalHits = originalTopHits.getHits().getHits(); + SearchHit[] rollupHits = rollupTopHits.getHits().getHits(); + assertEquals(originalHits.length, rollupHits.length); + + for (int k = 0; k < originalHits.length; ++k) { + SearchHit originalHit = originalHits[k]; + SearchHit rollupHit = rollupHits[k]; + + Map originalHitDocumentFields = originalHit.getDocumentFields(); + Map rollupHitDocumentFields = rollupHit.getDocumentFields(); + List originalFields = originalHitDocumentFields.values().stream().toList(); + List rollupFields = rollupHitDocumentFields.values().stream().toList(); + List originalFieldsList = originalFields.stream().flatMap(x -> x.getValues().stream()).toList(); + List rollupFieldsList = rollupFields.stream().flatMap(x -> x.getValues().stream()).toList(); + if (originalFieldsList.isEmpty() == false && rollupFieldsList.isEmpty() == false) { + // NOTE: here we take advantage of the fact that a label field is indexed also as a metric of type + // `counter`. This way we can actually check that the label value stored in the rollup index + // is the last value (which is what we store for a metric of type counter) by comparing the metric + // field value to the label field value. + originalFieldsList.forEach(field -> assertTrue(rollupFieldsList.contains(field))); + rollupFieldsList.forEach(field -> assertTrue(originalFieldsList.contains(field))); + Object originalLabelValue = originalHit.getDocumentFields().values().stream().toList().get(0).getValue(); + Object rollupLabelValue = rollupHit.getDocumentFields().values().stream().toList().get(0).getValue(); + Optional labelAsMetric = nonTopHitsOriginalAggregations.stream() + .filter(agg -> agg.getName().equals("metric_" + rollupTopHits.getName())) + .findFirst(); + // NOTE: this check is possible only if the label can be indexed as a metric (the label is a numeric field) + if (labelAsMetric.isPresent()) { + double metricValue = ((Max) labelAsMetric.get()).value(); + assertEquals(metricValue, rollupLabelValue); + assertEquals(metricValue, originalLabelValue); + } + } + } + } + } + }); + } + + @SuppressWarnings("unchecked") + private void assertFieldMappings( + RollupActionConfig config, + Map metricFields, + Map> mappings + ) { + // Assert field mappings assertEquals(DateFieldMapper.CONTENT_TYPE, mappings.get(config.getTimestampField()).get("type")); Map dateTimeMeta = (Map) mappings.get(config.getTimestampField()).get("meta"); assertEquals(config.getTimeZone(), dateTimeMeta.get("time_zone")); @@ -517,38 +728,107 @@ private void assertRollupIndex(String sourceIndex, String rollupIndex, RollupAct }); } - private CompositeAggregationBuilder buildCompositeAggs( - String name, - RollupActionConfig config, - Map metricFields + private void assertRollupIndexSettings(String sourceIndex, String rollupIndex, GetIndexResponse indexSettingsResp) { + // Assert rollup metadata are set in index settings + assertEquals("success", indexSettingsResp.getSetting(rollupIndex, IndexMetadata.INDEX_ROLLUP_STATUS_KEY)); + + assertNotNull(indexSettingsResp.getSetting(sourceIndex, IndexMetadata.SETTING_INDEX_UUID)); + assertNotNull(indexSettingsResp.getSetting(rollupIndex, IndexMetadata.INDEX_ROLLUP_SOURCE_UUID_KEY)); + assertEquals( + indexSettingsResp.getSetting(sourceIndex, IndexMetadata.SETTING_INDEX_UUID), + indexSettingsResp.getSetting(rollupIndex, IndexMetadata.INDEX_ROLLUP_SOURCE_UUID_KEY) + ); + + assertEquals(sourceIndex, indexSettingsResp.getSetting(rollupIndex, IndexMetadata.INDEX_ROLLUP_SOURCE_NAME_KEY)); + assertEquals(indexSettingsResp.getSetting(sourceIndex, "index.mode"), indexSettingsResp.getSetting(rollupIndex, "index.mode")); + + assertNotNull(indexSettingsResp.getSetting(sourceIndex, IndexSettings.TIME_SERIES_START_TIME.getKey())); + assertNotNull(indexSettingsResp.getSetting(rollupIndex, IndexSettings.TIME_SERIES_START_TIME.getKey())); + assertEquals( + indexSettingsResp.getSetting(sourceIndex, IndexSettings.TIME_SERIES_START_TIME.getKey()), + indexSettingsResp.getSetting(rollupIndex, IndexSettings.TIME_SERIES_START_TIME.getKey()) + ); + + assertNotNull(indexSettingsResp.getSetting(sourceIndex, IndexSettings.TIME_SERIES_END_TIME.getKey())); + assertNotNull(indexSettingsResp.getSetting(rollupIndex, IndexSettings.TIME_SERIES_END_TIME.getKey())); + assertEquals( + indexSettingsResp.getSetting(sourceIndex, IndexSettings.TIME_SERIES_END_TIME.getKey()), + indexSettingsResp.getSetting(rollupIndex, IndexSettings.TIME_SERIES_END_TIME.getKey()) + ); + assertNotNull(indexSettingsResp.getSetting(sourceIndex, "index.routing_path")); + assertNotNull(indexSettingsResp.getSetting(rollupIndex, "index.routing_path")); + assertEquals( + indexSettingsResp.getSetting(sourceIndex, "index.routing_path"), + indexSettingsResp.getSetting(rollupIndex, "index.routing_path") + ); + + assertNotNull(indexSettingsResp.getSetting(sourceIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS)); + assertNotNull(indexSettingsResp.getSetting(rollupIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS)); + assertEquals( + indexSettingsResp.getSetting(sourceIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS), + indexSettingsResp.getSetting(rollupIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS) + ); + + assertNotNull(indexSettingsResp.getSetting(sourceIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS)); + assertNotNull(indexSettingsResp.getSetting(rollupIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS)); + assertEquals( + indexSettingsResp.getSetting(sourceIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS), + indexSettingsResp.getSetting(rollupIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS) + ); + assertEquals("true", indexSettingsResp.getSetting(rollupIndex, "index.blocks.write")); + } + + private AggregationBuilder buildAggregations( + final RollupActionConfig config, + final Map metrics, + final Map labels, + final String timestampField ) { - List> sources = new ArrayList<>(); - // For time series indices, we use the _tsid field for the terms aggregation - sources.add(new TermsValuesSourceBuilder("tsid").field(TimeSeriesIdFieldMapper.NAME)); - DateHistogramValuesSourceBuilder dateHisto = new DateHistogramValuesSourceBuilder(config.getTimestampField()); - dateHisto.field(config.getTimestampField()); + final TermsAggregationBuilder tsidAggregation = new TermsAggregationBuilder("tsid").field(TimeSeriesIdFieldMapper.NAME) + .size(10_000); + final DateHistogramAggregationBuilder dateHistogramAggregation = new DateHistogramAggregationBuilder("timestamp").field( + config.getTimestampField() + ).fixedInterval(config.getInterval()); if (config.getTimeZone() != null) { - dateHisto.timeZone(ZoneId.of(config.getTimeZone())); + dateHistogramAggregation.timeZone(ZoneId.of(config.getTimeZone())); } - dateHisto.fixedInterval(config.getInterval()); - sources.add(dateHisto); - - final CompositeAggregationBuilder composite = new CompositeAggregationBuilder(name, sources).size(10); - metricFields.forEach((fieldname, metricType) -> { - for (String agg : metricType.supportedAggs()) { - switch (agg) { - case "min" -> composite.subAggregation(new MinAggregationBuilder(fieldname + "_" + agg).field(fieldname)); - case "max", "last_value" -> composite.subAggregation(new MaxAggregationBuilder(fieldname + "_" + agg).field(fieldname)); - case "sum" -> composite.subAggregation(new SumAggregationBuilder(fieldname + "_" + agg).field(fieldname)); - case "value_count" -> composite.subAggregation( - new ValueCountAggregationBuilder(fieldname + "_" + agg).field(fieldname) + tsidAggregation.subAggregation(dateHistogramAggregation); + + metrics.forEach((fieldName, metricType) -> { + for (final String supportedAggregation : metricType.supportedAggs()) { + switch (supportedAggregation) { + case "min" -> dateHistogramAggregation.subAggregation( + new MinAggregationBuilder(fieldName + "_" + supportedAggregation).field(fieldName) + ); + case "max" -> dateHistogramAggregation.subAggregation( + new MaxAggregationBuilder(fieldName + "_" + supportedAggregation).field(fieldName) + ); + case "last_value" -> dateHistogramAggregation.subAggregation( + new TopHitsAggregationBuilder(fieldName + "_" + supportedAggregation).sort( + SortBuilders.fieldSort(timestampField).order(SortOrder.DESC) + ).size(1).fetchField(fieldName) ); - default -> throw new IllegalArgumentException("Unsupported metric type [" + agg + "]"); + case "sum" -> dateHistogramAggregation.subAggregation( + new SumAggregationBuilder(fieldName + "_" + supportedAggregation).field(fieldName) + ); + case "value_count" -> dateHistogramAggregation.subAggregation( + new ValueCountAggregationBuilder(fieldName + "_" + supportedAggregation).field(fieldName) + ); + default -> throw new IllegalArgumentException("Unsupported metric type [" + supportedAggregation + "]"); } } }); - return composite; + + labels.forEach((fieldName, type) -> { + dateHistogramAggregation.subAggregation( + new TopHitsAggregationBuilder(fieldName + "_last_value").sort(SortBuilders.fieldSort(timestampField).order(SortOrder.DESC)) + .size(1) + .fetchField(fieldName) + ); + }); + + return tsidAggregation; } @FunctionalInterface