Skip to content

Commit 40320b8

Browse files
committed
Support first and last parameter for missing bucket ordering in composite aggregation (#1942)
Support for "first" and "last" parameters for missing bucket ordering in composite aggregation. By default, if order is asc, missing_bucket at first, if order is desc, missing_bucket at last. If missing_order is "first" or "last", regardless order, missing_bucket is at first or last respectively. Signed-off-by: Peng Huo <penghuo@gmail.com>
1 parent 6eda740 commit 40320b8

21 files changed

Lines changed: 850 additions & 77 deletions

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/BinaryValuesSource.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@
4747
import org.opensearch.index.mapper.StringFieldType;
4848
import org.opensearch.search.DocValueFormat;
4949
import org.opensearch.search.aggregations.LeafBucketCollector;
50+
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
5051

5152
import java.io.IOException;
53+
import java.util.Objects;
5254
import java.util.function.LongConsumer;
5355

5456
/**
@@ -68,10 +70,11 @@ class BinaryValuesSource extends SingleDimensionValuesSource<BytesRef> {
6870
CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc,
6971
DocValueFormat format,
7072
boolean missingBucket,
73+
MissingOrder missingOrder,
7174
int size,
7275
int reverseMul
7376
) {
74-
super(bigArrays, format, fieldType, missingBucket, size, reverseMul);
77+
super(bigArrays, format, fieldType, missingBucket, missingOrder, size, reverseMul);
7578
this.breakerConsumer = breakerConsumer;
7679
this.docValuesFunc = docValuesFunc;
7780
this.values = bigArrays.newObjectArray(Math.min(size, 100));
@@ -101,10 +104,9 @@ void copyCurrent(int slot) {
101104
@Override
102105
int compare(int from, int to) {
103106
if (missingBucket) {
104-
if (values.get(from) == null) {
105-
return values.get(to) == null ? 0 : -1 * reverseMul;
106-
} else if (values.get(to) == null) {
107-
return reverseMul;
107+
int result = missingOrder.compare(() -> Objects.isNull(values.get(from)), () -> Objects.isNull(values.get(to)), reverseMul);
108+
if (MissingOrder.unknownOrder(result) == false) {
109+
return result;
108110
}
109111
}
110112
return compareValues(values.get(from), values.get(to));
@@ -113,10 +115,9 @@ int compare(int from, int to) {
113115
@Override
114116
int compareCurrent(int slot) {
115117
if (missingBucket) {
116-
if (currentValue == null) {
117-
return values.get(slot) == null ? 0 : -1 * reverseMul;
118-
} else if (values.get(slot) == null) {
119-
return reverseMul;
118+
int result = missingOrder.compare(() -> Objects.isNull(currentValue), () -> Objects.isNull(values.get(slot)), reverseMul);
119+
if (MissingOrder.unknownOrder(result) == false) {
120+
return result;
120121
}
121122
}
122123
return compareValues(currentValue, values.get(slot));
@@ -125,10 +126,9 @@ int compareCurrent(int slot) {
125126
@Override
126127
int compareCurrentWithAfter() {
127128
if (missingBucket) {
128-
if (currentValue == null) {
129-
return afterValue == null ? 0 : -1 * reverseMul;
130-
} else if (afterValue == null) {
131-
return reverseMul;
129+
int result = missingOrder.compare(() -> Objects.isNull(currentValue), () -> Objects.isNull(afterValue), reverseMul);
130+
if (MissingOrder.unknownOrder(result) == false) {
131+
return result;
132132
}
133133
}
134134
return compareValues(currentValue, afterValue);

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.opensearch.search.aggregations.MultiBucketCollector;
7171
import org.opensearch.search.aggregations.MultiBucketConsumerService;
7272
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
73+
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
7374
import org.opensearch.search.internal.SearchContext;
7475
import org.opensearch.search.searchafter.SearchAfterBuilder;
7576
import org.opensearch.search.sort.SortAndFormats;
@@ -89,6 +90,7 @@ final class CompositeAggregator extends BucketsAggregator {
8990
private final int size;
9091
private final List<String> sourceNames;
9192
private final int[] reverseMuls;
93+
private final MissingOrder[] missingOrders;
9294
private final List<DocValueFormat> formats;
9395
private final CompositeKey rawAfterKey;
9496

@@ -117,6 +119,7 @@ final class CompositeAggregator extends BucketsAggregator {
117119
this.size = size;
118120
this.sourceNames = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::name).collect(Collectors.toList());
119121
this.reverseMuls = Arrays.stream(sourceConfigs).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray();
122+
this.missingOrders = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::missingOrder).toArray(MissingOrder[]::new);
120123
this.formats = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::format).collect(Collectors.toList());
121124
this.sources = new SingleDimensionValuesSource[sourceConfigs.length];
122125
// check that the provided size is not greater than the search.max_buckets setting
@@ -189,7 +192,15 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
189192
CompositeKey key = queue.toCompositeKey(slot);
190193
InternalAggregations aggs = subAggsForBuckets[slot];
191194
int docCount = queue.getDocCount(slot);
192-
buckets[queue.size()] = new InternalComposite.InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs);
195+
buckets[queue.size()] = new InternalComposite.InternalBucket(
196+
sourceNames,
197+
formats,
198+
key,
199+
reverseMuls,
200+
missingOrders,
201+
docCount,
202+
aggs
203+
);
193204
}
194205
CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null;
195206
return new InternalAggregation[] {
@@ -201,14 +212,26 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
201212
Arrays.asList(buckets),
202213
lastBucket,
203214
reverseMuls,
215+
missingOrders,
204216
earlyTerminated,
205217
metadata()
206218
) };
207219
}
208220

209221
@Override
210222
public InternalAggregation buildEmptyAggregation() {
211-
return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), null, reverseMuls, false, metadata());
223+
return new InternalComposite(
224+
name,
225+
size,
226+
sourceNames,
227+
formats,
228+
Collections.emptyList(),
229+
null,
230+
reverseMuls,
231+
missingOrders,
232+
false,
233+
metadata()
234+
);
212235
}
213236

214237
private void finishLeaf() {

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@
3333
package org.opensearch.search.aggregations.bucket.composite;
3434

3535
import org.opensearch.LegacyESVersion;
36+
import org.opensearch.Version;
3637
import org.opensearch.common.io.stream.StreamInput;
3738
import org.opensearch.common.io.stream.StreamOutput;
3839
import org.opensearch.common.io.stream.Writeable;
3940
import org.opensearch.common.xcontent.ToXContentFragment;
4041
import org.opensearch.common.xcontent.XContentBuilder;
4142
import org.opensearch.index.query.QueryShardContext;
4243
import org.opensearch.script.Script;
44+
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
4345
import org.opensearch.search.aggregations.support.ValueType;
4446
import org.opensearch.search.aggregations.support.ValuesSource;
4547
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
@@ -50,6 +52,8 @@
5052
import java.time.ZoneId;
5153
import java.util.Objects;
5254

55+
import static org.opensearch.search.aggregations.bucket.missing.MissingOrder.fromString;
56+
5357
/**
5458
* A {@link ValuesSource} builder for {@link CompositeAggregationBuilder}
5559
*/
@@ -60,6 +64,7 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
6064
private Script script = null;
6165
private ValueType userValueTypeHint = null;
6266
private boolean missingBucket = false;
67+
private MissingOrder missingOrder = MissingOrder.DEFAULT;
6368
private SortOrder order = SortOrder.ASC;
6469
private String format = null;
6570

@@ -85,6 +90,9 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
8590
// skip missing value for BWC
8691
in.readGenericValue();
8792
}
93+
if (in.getVersion().onOrAfter(Version.V_1_3_0)) {
94+
this.missingOrder = MissingOrder.readFromStream(in);
95+
}
8896
this.order = SortOrder.readFromStream(in);
8997
if (in.getVersion().onOrAfter(LegacyESVersion.V_6_3_0)) {
9098
this.format = in.readOptionalString();
@@ -114,6 +122,9 @@ public final void writeTo(StreamOutput out) throws IOException {
114122
// write missing value for BWC
115123
out.writeGenericValue(null);
116124
}
125+
if (out.getVersion().onOrAfter(Version.V_1_3_0)) {
126+
missingOrder.writeTo(out);
127+
}
117128
order.writeTo(out);
118129
if (out.getVersion().onOrAfter(LegacyESVersion.V_6_3_0)) {
119130
out.writeOptionalString(format);
@@ -141,6 +152,9 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params)
141152
if (format != null) {
142153
builder.field("format", format);
143154
}
155+
if (MissingOrder.isDefault(missingOrder) == false) {
156+
builder.field(MissingOrder.NAME, missingOrder.toString());
157+
}
144158
builder.field("order", order);
145159
doXContentBody(builder, params);
146160
builder.endObject();
@@ -163,6 +177,7 @@ public boolean equals(Object o) {
163177
&& Objects.equals(script, that.script())
164178
&& Objects.equals(userValueTypeHint, that.userValuetypeHint())
165179
&& Objects.equals(missingBucket, that.missingBucket())
180+
&& Objects.equals(missingOrder, that.missingOrder())
166181
&& Objects.equals(order, that.order())
167182
&& Objects.equals(format, that.format());
168183
}
@@ -247,6 +262,29 @@ public boolean missingBucket() {
247262
return missingBucket;
248263
}
249264

265+
/**
266+
* Sets the {@link MissingOrder} to use to order missing value.
267+
*/
268+
public AB missingOrder(MissingOrder missingOrder) {
269+
this.missingOrder = missingOrder;
270+
return (AB) this;
271+
}
272+
273+
/**
274+
* Sets the {@link MissingOrder} to use to order missing value.
275+
* @param missingOrder "first", "last" or "default".
276+
*/
277+
public AB missingOrder(String missingOrder) {
278+
return missingOrder(fromString(missingOrder));
279+
}
280+
281+
/**
282+
* Missing value order. {@link MissingOrder}.
283+
*/
284+
public MissingOrder missingOrder() {
285+
return missingOrder;
286+
}
287+
250288
/**
251289
* Sets the {@link SortOrder} to use to sort values produced this source
252290
*/
@@ -307,6 +345,9 @@ protected abstract CompositeValuesSourceConfig innerBuild(QueryShardContext quer
307345
protected abstract ValuesSourceType getDefaultValuesSourceType();
308346

309347
public final CompositeValuesSourceConfig build(QueryShardContext queryShardContext) throws IOException {
348+
if (missingBucket == false && missingOrder != MissingOrder.DEFAULT) {
349+
throw new IllegalArgumentException(MissingOrder.NAME + " require missing_bucket is true");
350+
}
310351
ValuesSourceConfig config = ValuesSourceConfig.resolve(
311352
queryShardContext,
312353
userValueTypeHint,

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.opensearch.common.util.BigArrays;
3838
import org.opensearch.index.mapper.MappedFieldType;
3939
import org.opensearch.search.DocValueFormat;
40+
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
4041
import org.opensearch.search.aggregations.support.ValuesSource;
4142
import org.opensearch.search.sort.SortOrder;
4243

@@ -62,6 +63,7 @@ SingleDimensionValuesSource<?> createValuesSource(
6263
private final DocValueFormat format;
6364
private final int reverseMul;
6465
private final boolean missingBucket;
66+
private final MissingOrder missingOrder;
6567
private final boolean hasScript;
6668
private final SingleDimensionValuesSourceProvider singleDimensionValuesSourceProvider;
6769

@@ -83,6 +85,7 @@ SingleDimensionValuesSource<?> createValuesSource(
8385
DocValueFormat format,
8486
SortOrder order,
8587
boolean missingBucket,
88+
MissingOrder missingOrder,
8689
boolean hasScript,
8790
SingleDimensionValuesSourceProvider singleDimensionValuesSourceProvider
8891
) {
@@ -94,6 +97,7 @@ SingleDimensionValuesSource<?> createValuesSource(
9497
this.missingBucket = missingBucket;
9598
this.hasScript = hasScript;
9699
this.singleDimensionValuesSourceProvider = singleDimensionValuesSourceProvider;
100+
this.missingOrder = missingOrder;
97101
}
98102

99103
/**
@@ -132,6 +136,13 @@ boolean missingBucket() {
132136
return missingBucket;
133137
}
134138

139+
/**
140+
* Return the {@link MissingOrder} for the config.
141+
*/
142+
MissingOrder missingOrder() {
143+
return missingOrder;
144+
}
145+
135146
/**
136147
* Returns true if the source contains a script that can change the value.
137148
*/

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesSourceParserHelper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.opensearch.common.xcontent.XContentBuilder;
4444
import org.opensearch.common.xcontent.XContentParser;
4545
import org.opensearch.script.Script;
46+
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
4647
import org.opensearch.search.aggregations.support.ValueType;
4748

4849
import java.io.IOException;
@@ -54,6 +55,7 @@ public class CompositeValuesSourceParserHelper {
5455
static <VB extends CompositeValuesSourceBuilder<VB>, T> void declareValuesSourceFields(AbstractObjectParser<VB, T> objectParser) {
5556
objectParser.declareField(VB::field, XContentParser::text, new ParseField("field"), ObjectParser.ValueType.STRING);
5657
objectParser.declareBoolean(VB::missingBucket, new ParseField("missing_bucket"));
58+
objectParser.declareString(VB::missingOrder, new ParseField(MissingOrder.NAME));
5759

5860
objectParser.declareField(VB::userValuetypeHint, p -> {
5961
ValueType valueType = ValueType.lenientParse(p.text());

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.opensearch.search.aggregations.bucket.histogram.DateIntervalConsumer;
5353
import org.opensearch.search.aggregations.bucket.histogram.DateIntervalWrapper;
5454
import org.opensearch.search.aggregations.bucket.histogram.Histogram;
55+
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
5556
import org.opensearch.search.aggregations.support.CoreValuesSourceType;
5657
import org.opensearch.search.aggregations.support.ValuesSource;
5758
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
@@ -81,6 +82,7 @@ CompositeValuesSourceConfig apply(
8182
boolean hasScript, // probably redundant with the config, but currently we check this two different ways...
8283
String format,
8384
boolean missingBucket,
85+
MissingOrder missingOrder,
8486
SortOrder order
8587
);
8688
}
@@ -288,7 +290,7 @@ public static void register(ValuesSourceRegistry.Builder builder) {
288290
builder.register(
289291
REGISTRY_KEY,
290292
org.opensearch.common.collect.List.of(CoreValuesSourceType.DATE, CoreValuesSourceType.NUMERIC),
291-
(valuesSourceConfig, rounding, name, hasScript, format, missingBucket, order) -> {
293+
(valuesSourceConfig, rounding, name, hasScript, format, missingBucket, missingOrder, order) -> {
292294
ValuesSource.Numeric numeric = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource();
293295
// TODO once composite is plugged in to the values source registry or at least understands Date values source types use it
294296
// here
@@ -304,6 +306,7 @@ public static void register(ValuesSourceRegistry.Builder builder) {
304306
docValueFormat,
305307
order,
306308
missingBucket,
309+
missingOrder,
307310
hasScript,
308311
(
309312
BigArrays bigArrays,
@@ -319,6 +322,7 @@ public static void register(ValuesSourceRegistry.Builder builder) {
319322
roundingValuesSource::round,
320323
compositeValuesSourceConfig.format(),
321324
compositeValuesSourceConfig.missingBucket(),
325+
compositeValuesSourceConfig.missingOrder(),
322326
size,
323327
compositeValuesSourceConfig.reverseMul()
324328
);
@@ -339,6 +343,6 @@ protected CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardCon
339343
Rounding rounding = dateHistogramInterval.createRounding(timeZone(), offset);
340344
return queryShardContext.getValuesSourceRegistry()
341345
.getAggregator(REGISTRY_KEY, config)
342-
.apply(config, rounding, name, config.script() != null, format(), missingBucket(), order());
346+
.apply(config, rounding, name, config.script() != null, format(), missingBucket(), missingOrder(), order());
343347
}
344348
}

0 commit comments

Comments
 (0)