Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1a130aa
Skip backing indices with a disjoint range on @timestamp field.
martijnvg Mar 21, 2022
9db31c1
fix mistake
martijnvg Mar 22, 2022
2b0e37c
Merge remote-tracking branch 'es/master' into skip_backing_indices
martijnvg Mar 23, 2022
ac5cb16
added first test
martijnvg Mar 23, 2022
e0fc357
checkstyle
martijnvg Mar 23, 2022
82c41e2
Merge remote-tracking branch 'es/master' into skip_backing_indices
martijnvg May 4, 2022
a90bd47
fix test compile error after merging in master
martijnvg May 4, 2022
025e05e
Merge remote-tracking branch 'es/master' into skip_backing_indices
martijnvg May 9, 2022
74277ba
iter
martijnvg May 9, 2022
8314534
spotless
martijnvg May 9, 2022
7e9a877
iter
martijnvg May 10, 2022
b5d0d7d
Merge remote-tracking branch 'es/master' into skip_backing_indices
martijnvg May 10, 2022
641cecf
refactor
martijnvg May 10, 2022
a26c720
fix bug, in case of IndexLongFieldRange.EMPTY the hasTimestampData() …
martijnvg May 10, 2022
4fcec95
added test and
martijnvg May 10, 2022
a83cf41
RemoveTimeSeriesRange with IndexLongFieldRange
martijnvg May 10, 2022
34ec679
re-order
martijnvg May 10, 2022
714f629
Merge remote-tracking branch 'es/master' into skip_backing_indices
martijnvg Jun 17, 2022
de2e3ed
fix test compile error after merging in master
martijnvg Jun 17, 2022
c78845e
Merge remote-tracking branch 'es/master' into skip_backing_indices
martijnvg Jun 27, 2022
a4845e3
Merge remote-tracking branch 'es/master' into skip_backing_indices
martijnvg Jun 27, 2022
0034d7a
use start and end time from IndexMetadata instead of parsing it from …
martijnvg Jun 28, 2022
b475761
Add unit like test that verifies that for tsdb data stream
martijnvg Jun 28, 2022
68539e1
Merge remote-tracking branch 'es/master' into skip_backing_indices
martijnvg Jun 28, 2022
c5ecf62
spotless
martijnvg Jun 28, 2022
ffb33ec
fix bug
martijnvg Jun 28, 2022
710bb28
Merge remote-tracking branch 'es/master' into skip_backing_indices
martijnvg Jun 28, 2022
25b3bb3
handle when index mode is provided in upper case
martijnvg Jun 29, 2022
ca5893b
Merge remote-tracking branch 'es/master' into skip_backing_indices
martijnvg Jun 29, 2022
679e699
spotless
martijnvg Jun 29, 2022
f09ffe7
Merge branch 'master' into skip_backing_indices
elasticmachine Jul 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
Expand All @@ -21,12 +22,16 @@
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.indices.InvalidIndexTemplateException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.xcontent.XContentType;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand All @@ -36,6 +41,18 @@

public class TSDBIndexingIT extends ESSingleNodeTestCase {

public static final String MAPPING_TEMPLATE = """
{
"_doc":{
"properties": {
"metricset": {
"type": "keyword",
"time_series_dimension": true
}
}
}
}""";

private static final String DOC = """
{
"@timestamp": "$time",
Expand Down Expand Up @@ -341,6 +358,76 @@ public void testInvalidTsdbTemplatesMissingSettings() throws Exception {
assertThat(e.getCause().getMessage(), equalTo("[index.routing_path] requires [index.mode=time_series]"));
}

public void testSkippingShards() throws Exception {
Instant time = Instant.now();
{
var templateSettings = Settings.builder().put("index.mode", "time_series").put("index.routing_path", "metricset").build();
var request = new PutComposableIndexTemplateAction.Request("id1");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("pattern-1"),
new Template(templateSettings, new CompressedXContent(MAPPING_TEMPLATE), null),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(false, false),
null
)
);
client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
var indexRequest = new IndexRequest("pattern-1").opType(DocWriteRequest.OpType.CREATE).setRefreshPolicy("true");
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
client().index(indexRequest).actionGet();
}
{
var request = new PutComposableIndexTemplateAction.Request("id2");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("pattern-2"),
new Template(null, new CompressedXContent(MAPPING_TEMPLATE), null),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(false, false),
null
)
);
client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
var indexRequest = new IndexRequest("pattern-2").opType(DocWriteRequest.OpType.CREATE).setRefreshPolicy("true");
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
client().index(indexRequest).actionGet();
}
{
var matchingRange = new SearchSourceBuilder().query(
new RangeQueryBuilder("@timestamp").from(time.minusSeconds(1).toEpochMilli()).to(time.plusSeconds(1).toEpochMilli())
);
var searchRequest = new SearchRequest("pattern-*");
searchRequest.setPreFilterShardSize(1);
searchRequest.source(matchingRange);
var searchResponse = client().search(searchRequest).actionGet();
ElasticsearchAssertions.assertHitCount(searchResponse, 2);
assertThat(searchResponse.getTotalShards(), equalTo(2));
assertThat(searchResponse.getSkippedShards(), equalTo(0));
assertThat(searchResponse.getSuccessfulShards(), equalTo(2));
}
{
var nonMatchingRange = new SearchSourceBuilder().query(
new RangeQueryBuilder("@timestamp").from(time.minus(2, ChronoUnit.DAYS).toEpochMilli())
.to(time.minus(1, ChronoUnit.DAYS).toEpochMilli())
);
var searchRequest = new SearchRequest("pattern-*");
searchRequest.setPreFilterShardSize(1);
searchRequest.source(nonMatchingRange);
var searchResponse = client().search(searchRequest).actionGet();
ElasticsearchAssertions.assertNoSearchHits(searchResponse);
assertThat(searchResponse.getTotalShards(), equalTo(2));
assertThat(searchResponse.getSkippedShards(), equalTo(1));
assertThat(searchResponse.getSuccessfulShards(), equalTo(2));
}
}

static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.datastreams;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
import java.util.List;

import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class TimestampFieldMapperServiceTests extends ESSingleNodeTestCase {

private static final String DOC = """
{
"@timestamp": "$time",
"metricset": "pod",
"k8s": {
"pod": {
"name": "dog",
"uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9",
"ip": "10.10.55.3",
"network": {
"tx": 1434595272,
"rx": 530605511
}
}
}
}
""";

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(DataStreamsPlugin.class);
}

public void testGetTimestampFieldTypeForTsdbDataStream() throws IOException {
createTemplate(true);
IndexResponse indexResponse = indexDoc();

var indicesService = getInstanceFromNode(IndicesService.class);
var result = indicesService.getTimestampFieldType(indexResponse.getShardId().getIndex());
assertThat(result, notNullValue());
}

public void testGetTimestampFieldTypeForDataStream() throws IOException {
createTemplate(false);
IndexResponse indexResponse = indexDoc();

var indicesService = getInstanceFromNode(IndicesService.class);
var result = indicesService.getTimestampFieldType(indexResponse.getShardId().getIndex());
assertThat(result, nullValue());
}

private IndexResponse indexDoc() {
Instant time = Instant.now();
var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
return client().index(indexRequest).actionGet();
}

private void createTemplate(boolean tsdb) throws IOException {
var mappingTemplate = """
{
"_doc":{
"properties": {
"metricset": {
"type": "keyword",
"time_series_dimension": true
}
}
}
}""";
var templateSettings = Settings.builder().put("index.mode", tsdb ? "time_series" : "standard");
var request = new PutComposableIndexTemplateAction.Request("id");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("k8s*"),
new Template(templateSettings.build(), new CompressedXContent(mappingTemplate), null),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(false, false),
null
)
);
client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
}

private static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,15 @@ public IndexLongFieldRange getTimestampRange() {
return timestampRange;
}

/**
* @return the time range this index represents if this index is in time series mode.
* Otherwise <code>null</code> is returned.
*/
@Nullable
public IndexLongFieldRange getTimeSeriesTimestampRange() {
return IndexSettings.MODE.get(settings).getConfiguredTimestampRange(this);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -1745,8 +1754,10 @@ public IndexMetadata build() {
}

final boolean isSearchableSnapshot = SearchableSnapshotsSettings.isSearchableSnapshotStore(settings);
final String indexMode = settings.get(IndexSettings.MODE.getKey());
final boolean isTsdb = IndexSettings.isTimeSeriesModeEnabled()
&& IndexMode.TIME_SERIES.getName().equals(settings.get(IndexSettings.MODE.getKey()));
&& indexMode != null
&& IndexMode.TIME_SERIES.getName().equals(indexMode.toLowerCase(Locale.ROOT));
return new IndexMetadata(
new Index(index, uuid),
version,
Expand Down
22 changes: 22 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.index.mapper.TsidExtractingIdFieldMapper;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.index.shard.ShardLongFieldRange;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -104,6 +106,11 @@ public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) {
public DocumentDimensions buildDocumentDimensions() {
return new DocumentDimensions.OnlySingleValueAllowed();
}

@Override
public IndexLongFieldRange getConfiguredTimestampRange(IndexMetadata indexMetadata) {
return null;
}
},
TIME_SERIES("time_series") {
@Override
Expand Down Expand Up @@ -185,6 +192,13 @@ public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) {
public DocumentDimensions buildDocumentDimensions() {
return new TimeSeriesIdFieldMapper.TimeSeriesIdBuilder();
}

@Override
public IndexLongFieldRange getConfiguredTimestampRange(IndexMetadata indexMetadata) {
long min = indexMetadata.getTimeSeriesStart().toEpochMilli();
long max = indexMetadata.getTimeSeriesEnd().toEpochMilli();
return IndexLongFieldRange.NO_SHARDS.extendWithShardRange(0, 1, ShardLongFieldRange.of(min, max));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey. So. Like. I think I made getTimestampBound and it does like nearly the same thing, right? I feel like that method should be involved in this PR somehow. Or maybe IndexSettings.timestampBounds should be.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think I can reuse getTimestampBound(...) and then IndexMetadata#getTimeSeriesTimestampRange(...) use that to build a IndexLongFieldRange instance.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimestampBounds needs IndexScopedSettings which I don't have in the context In want to use it.
This technically only needed for dynamically updating endTime field, which is my case isn't needed.
But maybe with a minor change to TimestampBounds, I can use it too.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! Would you be ok to merge this as is and then do some kind of refactoring to make combine the two paths somehow?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I can do that.

}
};

protected static String tsdbMode() {
Expand Down Expand Up @@ -286,6 +300,14 @@ public String getName() {
*/
public abstract DocumentDimensions buildDocumentDimensions();

/**
* @return the time range based on the provided index settings and index mode implementation.
* Otherwise <code>null</code> is returned.
* @param indexMetadata
*/
@Nullable
public abstract IndexLongFieldRange getConfiguredTimestampRange(IndexMetadata indexMetadata);

public static IndexMode fromString(String value) {
return switch (value) {
case "standard" -> IndexMode.STANDARD;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.shard.IndexLongFieldRange;
Expand All @@ -27,21 +26,18 @@
* don't hold queried data. See IndexMetadata#getTimestampRange() for more details
*/
public class CoordinatorRewriteContext extends QueryRewriteContext {
private final Index index;
private IndexLongFieldRange indexLongFieldRange;
private final IndexLongFieldRange indexLongFieldRange;
private final DateFieldMapper.DateFieldType timestampFieldType;

public CoordinatorRewriteContext(
XContentParserConfiguration parserConfig,
NamedWriteableRegistry writeableRegistry,
Client client,
LongSupplier nowInMillis,
Index index,
IndexLongFieldRange indexLongFieldRange,
DateFieldMapper.DateFieldType timestampFieldType
) {
super(parserConfig, writeableRegistry, client, nowInMillis);
this.index = index;
this.indexLongFieldRange = indexLongFieldRange;
this.timestampFieldType = timestampFieldType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -48,20 +47,26 @@ public CoordinatorRewriteContextProvider(

@Nullable
public CoordinatorRewriteContext getCoordinatorRewriteContext(Index index) {
ClusterState clusterState = clusterStateSupplier.get();
IndexMetadata indexMetadata = clusterState.metadata().index(index);
var clusterState = clusterStateSupplier.get();
var indexMetadata = clusterState.metadata().index(index);

if (indexMetadata == null || indexMetadata.getTimestampRange().containsAllShardRanges() == false) {
if (indexMetadata == null) {
return null;
}
IndexLongFieldRange timestampRange = indexMetadata.getTimestampRange();
if (timestampRange.containsAllShardRanges() == false) {
timestampRange = indexMetadata.getTimeSeriesTimestampRange();
if (timestampRange == null) {
return null;
}
}

DateFieldMapper.DateFieldType dateFieldType = mappingSupplier.apply(index);

if (dateFieldType == null) {
return null;
}

IndexLongFieldRange timestampRange = indexMetadata.getTimestampRange();
return new CoordinatorRewriteContext(parserConfig, writeableRegistry, client, nowInMillis, index, timestampRange, dateFieldType);
return new CoordinatorRewriteContext(parserConfig, writeableRegistry, client, nowInMillis, timestampRange, dateFieldType);
}
}
Loading