Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ public void addMetricField(Metric m, NumberFieldMapper.NumberFieldType subfield)
metricFields.put(m, subfield);
}

public Collection<NumberFieldMapper.NumberFieldType> getMetricFields() {
return metricFields.values();
}

public void setDefaultMetric(Metric defaultMetric) {
this.defaultMetric = defaultMetric;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.index.fielddata.FormattedDocValues;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.mapper.DocCountFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper;

import java.io.IOException;
import java.math.BigInteger;
Expand Down Expand Up @@ -48,6 +51,7 @@ protected FieldValueFetcher(String name,

FormattedDocValues getLeaf(LeafReaderContext context) {
final FormattedDocValues delegate = fieldData.load(context).getFormattedValues(DocValueFormat.RAW);

return new FormattedDocValues() {
@Override
public boolean advanceExact(int docId) throws IOException {
Expand Down Expand Up @@ -86,9 +90,17 @@ static List<FieldValueFetcher> build(SearchExecutionContext context, String[] fi
MappedFieldType fieldType = context.getFieldType(field);
if (fieldType == null) {
throw new IllegalArgumentException("Unknown field: [" + field + "]");
} else if (fieldType instanceof AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType) {
AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType aggMetricFieldType =
(AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType) fieldType;
for (NumberFieldMapper.NumberFieldType metricSubField : aggMetricFieldType.getMetricFields()) {
IndexFieldData<?> fieldData = context.getForField(metricSubField);
fetchers.add(new FieldValueFetcher(field, fieldType, fieldData, getValidator(field)));
}
} else {
IndexFieldData<?> fieldData = context.getForField(fieldType);
fetchers.add(new FieldValueFetcher(field, fieldType, fieldData, getValidator(field)));
}
IndexFieldData<?> fieldData = context.getForField(fieldType);
fetchers.add(new FieldValueFetcher(field, fieldType, fieldData, getValidator(field)));
}
return Collections.unmodifiableList(fetchers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.bucket.DocCountProvider;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.xpack.core.rollup.RollupActionConfig;
Expand Down Expand Up @@ -96,6 +97,7 @@ class RollupShardIndexer {

private final List<FieldValueFetcher> groupFieldFetchers;
private final List<FieldValueFetcher> metricsFieldFetchers;
private final DocCountProvider docCountProvider;

private final CompressingOfflineSorter sorter;

Expand Down Expand Up @@ -173,6 +175,8 @@ public void deleteFile(String name) throws IOException {
this.metricsFieldFetchers = Collections.emptyList();
}

this.docCountProvider = new DocCountProvider();

this.sorter = new CompressingOfflineSorter(dir, "rollup-", keyComparator(), ramBufferSizeMB);
toClose = null;
} finally {
Expand Down Expand Up @@ -311,6 +315,10 @@ private Long computeBucket(long lastRounding) throws IOException {
producer.reset();
}
}

// read doc count
docCount += in.readVInt();

for (FieldMetricsProducer field : fieldsMetrics) {
int size = in.readVInt();
for (int i = 0; i < size; i++) {
Expand All @@ -320,7 +328,7 @@ private Long computeBucket(long lastRounding) throws IOException {
}
}
}
++ docCount;

lastKey = key;
}
next = it.next();
Expand Down Expand Up @@ -434,6 +442,7 @@ private BucketCollector(long timestamp,
public LeafCollector getLeafCollector(LeafReaderContext context) {
final List<FormattedDocValues> groupFieldLeaves = leafFetchers(context, groupFieldFetchers);
final List<FormattedDocValues> metricsFieldLeaves = leafFetchers(context, metricsFieldFetchers);

return new LeafCollector() {
@Override
public void setScorer(Scorable scorer) {
Expand Down Expand Up @@ -472,11 +481,16 @@ public void collect(int docID) throws IOException {
}
valueBytes = out.bytes().toBytesRef();
}

docCountProvider.setLeafReaderContext(context);
final int docCount = docCountProvider.getDocCount(docID);

for (List<Object> groupFields : cartesianProduct(combinationKeys)) {
try (BytesStreamOutput out = new BytesStreamOutput()) {
BytesRef keyBytes = encodeKey(timestamp, groupFields);
out.writeInt(keyBytes.length);
out.writeBytes(keyBytes.bytes, keyBytes.offset, keyBytes.length);
out.writeVInt(docCount);
out.writeBytes(valueBytes.bytes, valueBytes.offset, valueBytes.length);
externalSorter.add(out.bytes().toBytesRef());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,29 @@ public void testRollupDatastream() throws Exception {
assertRollupIndex(config, oldIndexName, rollupIndexName + "-2");
}

public void testRollupOfRollup() throws Exception {
String rollupOfRollupIndex = "rollup2-" + rollupIndex;
RollupActionDateHistogramGroupConfig monthlyDateHistogramGroupConfig =
new RollupActionDateHistogramGroupConfig.FixedInterval("date_1", new DateHistogramInterval("30d"), "UTC");
RollupActionDateHistogramGroupConfig dailyDateHistogramGroupConfig =
new RollupActionDateHistogramGroupConfig.FixedInterval("date_1", new DateHistogramInterval("1d"), "UTC");
SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder().startObject()
.field("date_1", randomDateForInterval(dailyDateHistogramGroupConfig.getInterval()))
.field("numeric_1", randomInt())
.endObject();
RollupActionConfig monthlyConfig = new RollupActionConfig(
new RollupActionGroupConfig(monthlyDateHistogramGroupConfig, null, null),
Collections.singletonList(new MetricConfig("numeric_1", Collections.singletonList("max"))));
RollupActionConfig dailyConfig = new RollupActionConfig(
new RollupActionGroupConfig(monthlyDateHistogramGroupConfig, null, null),
Collections.singletonList(new MetricConfig("numeric_1", Collections.singletonList("max"))));
bulkIndex(sourceSupplier);
rollup(index, rollupIndex, dailyConfig);
assertRollupIndex(dailyConfig, index, rollupIndex);
rollup(rollupIndex, rollupOfRollupIndex, dailyConfig);
assertRollupIndex(monthlyConfig, index, rollupOfRollupIndex);
}

private RollupActionDateHistogramGroupConfig randomRollupActionDateHistogramGroupConfig(String field) {
RollupActionDateHistogramGroupConfig randomConfig = ConfigTestHelpers.randomRollupActionDateHistogramGroupConfig(random());
if (randomConfig instanceof RollupActionDateHistogramGroupConfig.FixedInterval) {
Expand Down