Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.opensearch.tsdb.utils.TSDBTestUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -61,6 +64,7 @@ public record RestTSDBEngineIngestor(RestClient restClient) {
* Ingest time series data from multiple input data configurations into their respective indices.
* Each InputDataConfig specifies which index its data should go to.
* Uses bulk API with batching for efficient ingestion.
* Sorted all time series samples by timestamp for the same index, to avoid ooo event cutoff
*
* @param inputDataConfigs List of input data configurations with their target indices
* @throws IOException If ingestion fails
Expand All @@ -70,12 +74,24 @@ public void ingestDataToMultipleIndices(List<InputDataConfig> inputDataConfigs)
return;
}

// Collect all samples per index, then sort each index's samples by timestamp.
// Sorting ensures that samples across multiple time series are interleaved chronologically
// before ingestion. Without this, series are emitted one-at-a-time: a shard may process
// series1 up to its maxTime (e.g. 07:00) and then receive series2's first sample (e.g. 00:00),
// which falls outside a short OOO cutoff window and gets rejected.
Map<String, List<TimeSeriesSample>> samplesByIndex = new HashMap<>();
for (InputDataConfig inputDataConfig : inputDataConfigs) {
if (inputDataConfig.indexName() == null || inputDataConfig.indexName().isEmpty()) {
throw new IllegalArgumentException("InputDataConfig must specify an index_name for data ingestion");
}
List<TimeSeriesSample> samples = TimeSeriesSampleGenerator.generateSamples(inputDataConfig);
ingestInBulk(samples, inputDataConfig.indexName(), DEFAULT_BULK_SIZE);
samplesByIndex.computeIfAbsent(inputDataConfig.indexName(), k -> new ArrayList<>()).addAll(samples);
}

for (Map.Entry<String, List<TimeSeriesSample>> entry : samplesByIndex.entrySet()) {
List<TimeSeriesSample> samples = entry.getValue();
samples.sort(Comparator.comparing(TimeSeriesSample::timestamp));
ingestInBulk(samples, entry.getKey(), DEFAULT_BULK_SIZE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,31 @@ public class TSDBStatsRestIT extends RestTimeSeriesTestFramework {

private static final String TEST_DATA_YAML = "test_cases/tsdb_stats_rest_it.yaml";
private boolean dataLoaded = false;
// OVERRIDE max_closeable_chunks_per_chunk_range_percentage = 100, otherwise only 15% of closed chunk will be dropped
// This is to test numChunks in HeadStats
private static final String CUSTOMIZED_INDEX_SETTINGS_YAML = """
index.refresh_interval: "1s"
index.tsdb_engine.enabled: true
index.tsdb_engine.labels.storage_type: binary
index.tsdb_engine.lang.m3.default_step_size: "10s"
index.tsdb_engine.max_closeable_chunks_per_chunk_range_percentage: 100
index.queries.cache.enabled: false
index.requests.cache.enable: false
index.translog.durability: async
index.translog.sync_interval: "1s"
""";

/**
* Lazily initialize test data on first use for each test.
* This ensures the REST client is ready before we try to create indices.
*/
private void ensureDataLoaded() throws Exception {
if (!dataLoaded) {
initializeTest(TEST_DATA_YAML);
initializeTest(TEST_DATA_YAML, CUSTOMIZED_INDEX_SETTINGS_YAML);
setupTest();
// Force CCI creation and MemChunk drop
Request flushRequest = new Request("POST", "/tsdb_stats_test/_flush?force=true");
client().performRequest(flushRequest);
dataLoaded = true;
}
}
Expand All @@ -53,8 +69,15 @@ private void ensureDataLoaded() throws Exception {
public void testBasicEndpoint() throws Exception {
ensureDataLoaded();

// "numChunks": 20, because 1 MemChunk(20m OOO cutoff window) + 1 open MemChunk
String expectedJson = """
{
"headStats": {
"numSeries": 10,
"numChunks": 20,
"minTime": 1735713600000,
"maxTime": 9223372036854775807
},
"labelStats": {
"numSeries": 10,
"name": {
Expand Down Expand Up @@ -128,7 +151,7 @@ public void testBasicEndpoint() throws Exception {
// Test POST request returns same result
Request postRequest = new Request("POST", "/_tsdb/stats");
postRequest.addParameter("start", "1735689600000");
postRequest.addParameter("end", "1735707600000");
postRequest.addParameter("end", "1735714800000");
postRequest.setJsonEntity("{\"query\": \"fetch name:*\"}");

Response postResponse = client().performRequest(postRequest);
Expand Down Expand Up @@ -274,6 +297,12 @@ public void testFormatOptions() throws Exception {
// Grouped format - same as default/testTSDBStatsEndpointExists
String expectedGroupedJson = """
{
"headStats": {
"numSeries": 10,
"numChunks": 20,
"minTime": 1735713600000,
"maxTime": 9223372036854775807
},
"labelStats": {
"numSeries": 10,
"name": {
Expand Down Expand Up @@ -348,6 +377,12 @@ public void testFormatOptions() throws Exception {
// Flat format has arrays sorted by count descending, then name ascending
String expectedFlatJson = """
{
"headStats": {
"numSeries": 10,
"numChunks": 20,
"minTime": 1735713600000,
"maxTime": 9223372036854775807
},
"seriesCountByMetricName": [
{"name": "http_requests_total", "value": 6},
{"name": "db_connections", "value": 2},
Expand Down Expand Up @@ -410,6 +445,12 @@ public void testQueryFiltering() throws Exception {
// Filtered to service:api AND name:http_* (5 series - all api series have http_* names)
String expectedJson = """
{
"headStats": {
"numSeries": 5,
"numChunks": 10,
"minTime": 1735713600000,
"maxTime": 9223372036854775807
},
"labelStats": {
"numSeries": 5,
"name": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,31 @@ public List<ChunkIterator> chunksForDoc(int docId, TSDBDocValues tsdbDocValues)
return chunkIterators;
}

/**
* Returns the number of MemChunks(open+OOO_cutoff) for the series corresponding to the given document.
*
* @param docId the document ID to look up
* @param tsdbDocValues the doc values reader for this leaf
* @return number of MemChunks for the series, or 0 if the doc has no series reference
* @throws IOException if an I/O error occurs
*/
public int numChunksForDoc(int docId, TSDBDocValues tsdbDocValues) throws IOException {
NumericDocValues seriesRefValue = tsdbDocValues.getChunkRefDocValues();
if (!seriesRefValue.advanceExact(docId)) {
return 0;
}
long seriesRef = seriesRefValue.longValue();
int numChunks = 0;
Set<MemChunk> chunksToFilter = mMappedChunks.getOrDefault(seriesRef, Collections.emptySet());
List<MemChunk> memChunks = memChunkReader.getChunks(seriesRef); // get all memchunks for the series
for (MemChunk memChunk : memChunks) {
if (!chunksToFilter.contains(memChunk)) {
numChunks++;
}
}
return numChunks;
}

@Override
public Labels labelsForDoc(int docId, TSDBDocValues tsdbDocValues) throws IOException {
return tsdbDocValues.getLabelsStorage().readLabels(docId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,33 +61,21 @@ public class InternalTSDBStats extends InternalAggregation {
* Statistics for the head (in-memory time series).
*
* @param numSeries the number of active time series in the head
* @param chunkCount the total number of memory chunks currently held
* @param numChunks total MemChunks across all series (≥ numSeries due to OOO or not-yet-dropped chunks)
* @param minTime the minimum sample timestamp present in the head
* @param maxTime the maximum sample timestamp present in the head
*/
public record HeadStats(long numSeries, long chunkCount, long minTime, long maxTime) {
public record HeadStats(long numSeries, long numChunks, long minTime, long maxTime) {

/**
* Deserializes a {@code HeadStats} instance from a stream.
*
* @param in the stream input to read from
* @throws IOException if an I/O error occurs during reading
*/
public HeadStats(StreamInput in) throws IOException {
this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
this(in.readVLong(), in.readVLong(), in.readLong(), in.readLong());
}

/**
* Serializes this {@code HeadStats} instance to a stream.
*
* @param out the stream output to write to
* @throws IOException if an I/O error occurs during writing
*/
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(numSeries);
out.writeVLong(chunkCount);
out.writeVLong(minTime);
out.writeVLong(maxTime);
out.writeVLong(numChunks);
out.writeLong(minTime);
out.writeLong(maxTime);
}
}

Expand Down Expand Up @@ -280,6 +268,7 @@ public void writeTo(StreamOutput out) throws IOException {
* Factory method for creating shard-level stats (with seriesIds ).
*
* @param name the name of the aggregation
* @param headStats the head statistics (null if not populated)
* @param shardStats the shard-level statistics with seriesIds
* @param metadata the aggregation metadata
* @return InternalTSDBStats instance for shard-level phase
Expand Down Expand Up @@ -442,6 +431,9 @@ private InternalTSDBStats reduceShardLevel(List<InternalAggregation> aggregation
Set<Long> mergedSeriesIds = new HashSet<>();
Map<String, Map<String, Set<Long>>> mergedLabelStats = new HashMap<>();

// Merge HeadStats from all shard-level aggregations
HeadStats mergedHeadStats = mergeHeadStats(aggregations);

// Capture global includeValueStats flag from first shard (all shards have same value)
boolean includeValueStats = false;
if (!aggregations.isEmpty()) {
Expand Down Expand Up @@ -538,7 +530,7 @@ private InternalTSDBStats reduceShardLevel(List<InternalAggregation> aggregation

// Return coordinator-level stats (seriesId converted to counts to save network bandwidth)
CoordinatorLevelStats coordinatorStats = new CoordinatorLevelStats(totalSeries, finalLabelStats);
return forCoordinatorLevel(name, null, coordinatorStats, metadata);
return forCoordinatorLevel(name, mergedHeadStats, coordinatorStats, metadata);
}

/**
Expand All @@ -548,7 +540,7 @@ private InternalTSDBStats reduceShardLevel(List<InternalAggregation> aggregation
* is needed because each time series exists on only one shard (guaranteed by routing).</p>
*/
private InternalTSDBStats reduceCoordinatorLevel(List<InternalAggregation> aggregations) {
HeadStats mergedHeadStats = null; // TODO: Merge HeadStats in future when populated
HeadStats mergedHeadStats = mergeHeadStats(aggregations);
Long totalSeries = null;
Map<String, LabelStatsBuilder> builders = new HashMap<>();

Expand Down Expand Up @@ -610,6 +602,38 @@ private static class LabelStatsBuilder {
Map<String, Long> valueCounts = new LinkedHashMap<>();
}

/**
* Merges HeadStats from multiple aggregations by summing numSeries,
* taking the minimum minTime, and taking the maximum maxTime.
*
* @param aggregations the list of aggregations containing HeadStats to merge
* @return merged HeadStats, or null if no aggregation had HeadStats
*/
static HeadStats mergeHeadStats(List<InternalAggregation> aggregations) {
long totalNumSeries = 0;
long totalNumChunks = 0;
long minTime = Long.MAX_VALUE;
long maxTime = Long.MIN_VALUE;
boolean hasAny = false;

for (InternalAggregation agg : aggregations) {
InternalTSDBStats stats = (InternalTSDBStats) agg;
if (stats.headStats != null) {
hasAny = true;
totalNumSeries += stats.headStats.numSeries();
totalNumChunks += stats.headStats.numChunks();
if (stats.headStats.minTime() < minTime) {
minTime = stats.headStats.minTime();
}
if (stats.headStats.maxTime() > maxTime) {
maxTime = stats.headStats.maxTime();
}
}
}

return hasAny ? new HeadStats(totalNumSeries, totalNumChunks, minTime, maxTime) : null;
}

/**
* Retrieves a property value based on the given path.
*
Expand Down Expand Up @@ -680,7 +704,7 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th
if (headStats != null) {
builder.startObject("headStats");
builder.field("numSeries", headStats.numSeries());
builder.field("chunkCount", headStats.chunkCount());
builder.field("numChunks", headStats.numChunks());
builder.field("minTime", headStats.minTime());
builder.field("maxTime", headStats.maxTime());
builder.endObject();
Expand Down
Loading
Loading