Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introduce SecureHttpTransportParameters experimental API (to complement SecureTransportParameters counterpart) ([#18572](https://github.com/opensearch-project/OpenSearch/issues/18572))
- Create equivalents of JSM's AccessController in the java agent ([#18346](https://github.com/opensearch-project/OpenSearch/issues/18346))
- Introduced a new cluster-level API to fetch remote store metadata (segments and translogs) for each shard of an index. ([#18257](https://github.com/opensearch-project/OpenSearch/pull/18257))
- Add last index request timestamp columns to the `_cat/indices` API. ([10766](https://github.com/opensearch-project/OpenSearch/issues/10766))
- Introduce a new pull-based ingestion plugin for file-based indexing (for local testing) ([#18591](https://github.com/opensearch-project/OpenSearch/pull/18591))
- Add support for search pipeline in search and msearch template ([#18564](https://github.com/opensearch-project/OpenSearch/pull/18564))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.metrics;

import java.util.concurrent.atomic.LongAccumulator;

/**
* A metric for tracking the maximum value seen.
*
* @opensearch.internal
*/
public class MaxMetric implements Metric {
private final LongAccumulator max = new LongAccumulator(Long::max, Long.MIN_VALUE);

public void collect(long value) {
max.accumulate(value);
}

public long get() {
return max.get();
}

public void clear() {
max.reset();
}

Check warning on line 31 in server/src/main/java/org/opensearch/common/metrics/MaxMetric.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/metrics/MaxMetric.java#L30-L31

Added lines #L30 - L31 were not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ public IndexShard(
);
this.mapperService = mapperService;
this.indexCache = indexCache;
this.internalIndexingStats = new InternalIndexingStats();
this.internalIndexingStats = new InternalIndexingStats(threadPool);
final List<IndexingOperationListener> listenersList = new ArrayList<>(listeners);
listenersList.add(internalIndexingStats);
this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
private long throttleTimeInMillis;
private boolean isThrottled;
private final DocStatusStats docStatusStats;
private long maxLastIndexRequestTimestamp;

Stats() {
docStatusStats = new DocStatusStats();
Expand All @@ -175,7 +176,11 @@
noopUpdateCount = in.readVLong();
isThrottled = in.readBoolean();
throttleTimeInMillis = in.readLong();

if (in.getVersion().onOrAfter(Version.V_3_2_0)) {
maxLastIndexRequestTimestamp = in.readLong();
} else {
maxLastIndexRequestTimestamp = 0L;
}
if (in.getVersion().onOrAfter(Version.V_2_11_0)) {
docStatusStats = in.readOptionalWriteable(DocStatusStats::new);
} else {
Expand All @@ -195,6 +200,36 @@
boolean isThrottled,
long throttleTimeInMillis,
DocStatusStats docStatusStats
) {
this(

Check warning on line 204 in server/src/main/java/org/opensearch/index/shard/IndexingStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexingStats.java#L204

Added line #L204 was not covered by tests
indexCount,
indexTimeInMillis,
indexCurrent,
indexFailedCount,
deleteCount,
deleteTimeInMillis,
deleteCurrent,
noopUpdateCount,
isThrottled,
throttleTimeInMillis,
docStatusStats,
0L
);
}

Check warning on line 218 in server/src/main/java/org/opensearch/index/shard/IndexingStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexingStats.java#L218

Added line #L218 was not covered by tests

public Stats(
long indexCount,
long indexTimeInMillis,
long indexCurrent,
long indexFailedCount,
long deleteCount,
long deleteTimeInMillis,
long deleteCurrent,
long noopUpdateCount,
boolean isThrottled,
long throttleTimeInMillis,
DocStatusStats docStatusStats,
long maxLastIndexRequestTimestamp
) {
this.indexCount = indexCount;
this.indexTimeInMillis = indexTimeInMillis;
Expand All @@ -207,6 +242,7 @@
this.isThrottled = isThrottled;
this.throttleTimeInMillis = throttleTimeInMillis;
this.docStatusStats = docStatusStats;
this.maxLastIndexRequestTimestamp = maxLastIndexRequestTimestamp;
}

public void add(Stats stats) {
Expand All @@ -226,6 +262,8 @@
if (getDocStatusStats() != null) {
getDocStatusStats().add(stats.getDocStatusStats());
}

maxLastIndexRequestTimestamp = Math.max(maxLastIndexRequestTimestamp, stats.maxLastIndexRequestTimestamp);
}

/**
Expand Down Expand Up @@ -299,6 +337,10 @@
return docStatusStats;
}

public long getMaxLastIndexRequestTimestamp() {
return maxLastIndexRequestTimestamp;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(indexCount);
Expand All @@ -311,7 +353,9 @@
out.writeVLong(noopUpdateCount);
out.writeBoolean(isThrottled);
out.writeLong(throttleTimeInMillis);

if (out.getVersion().onOrAfter(Version.V_3_2_0)) {
out.writeLong(maxLastIndexRequestTimestamp);
}
if (out.getVersion().onOrAfter(Version.V_2_11_0)) {
out.writeOptionalWriteable(docStatusStats);
}
Expand All @@ -337,6 +381,8 @@
getDocStatusStats().toXContent(builder, params);
}

builder.field(Fields.MAX_LAST_INDEX_REQUEST_TIMESTAMP, maxLastIndexRequestTimestamp);

return builder;
}

Expand Down Expand Up @@ -410,6 +456,7 @@
static final String THROTTLED_TIME_IN_MILLIS = "throttle_time_in_millis";
static final String THROTTLED_TIME = "throttle_time";
static final String DOC_STATUS = "doc_status";
static final String MAX_LAST_INDEX_REQUEST_TIMESTAMP = "max_last_index_request_timestamp";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
package org.opensearch.index.shard;

import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.metrics.MaxMetric;
import org.opensearch.common.metrics.MeanMetric;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.engine.Engine;
import org.opensearch.threadpool.ThreadPool;

import java.util.concurrent.TimeUnit;

Expand All @@ -47,6 +49,11 @@
*/
final class InternalIndexingStats implements IndexingOperationListener {
private final StatsHolder totalStats = new StatsHolder();
private final ThreadPool threadPool;

InternalIndexingStats(ThreadPool threadPool) {
this.threadPool = threadPool;
}

/**
* Returns the stats, including type specific stats. If the types are null/0 length, then nothing
Expand Down Expand Up @@ -74,6 +81,8 @@ public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult re
long took = result.getTook();
totalStats.indexMetric.inc(took);
totalStats.indexCurrent.dec();
long now = threadPool.absoluteTimeInMillis();
totalStats.maxLastIndexRequestTimestamp.collect(now);
}
break;
case FAILURE:
Expand Down Expand Up @@ -142,6 +151,7 @@ static class StatsHolder {
private final CounterMetric indexFailed = new CounterMetric();
private final CounterMetric deleteCurrent = new CounterMetric();
private final CounterMetric noopUpdates = new CounterMetric();
private final MaxMetric maxLastIndexRequestTimestamp = new MaxMetric();

IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) {
return new IndexingStats.Stats(
Expand All @@ -155,7 +165,8 @@ IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) {
noopUpdates.count(),
isThrottled,
TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis),
new IndexingStats.Stats.DocStatusStats()
new IndexingStats.Stats.DocStatusStats(),
maxLastIndexRequestTimestamp.get()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,15 @@ protected Table getTableWithHeader(final RestRequest request, final PageToken pa

table.addCell("search.throttled", "alias:sth;default:false;desc:indicates if the index is search throttled");

table.addCell(
"last_index_request_timestamp",
"alias:last_index_ts,lastIndexRequestTimestamp;default:false;text-align:right;desc:timestamp of the last processed index request (epoch millis)"
);
table.addCell(
"last_index_request_timestamp_string",
"alias:last_index_ts_string,lastIndexRequestTimestampString;default:false;text-align:right;desc:timestamp of the last processed index request (ISO8601 string)"
);

table.endHeaders();
return table;
}
Expand Down Expand Up @@ -1058,8 +1067,13 @@ protected Table buildTable(

table.addCell(searchThrottled);

table.endRow();
table.addCell(totalStats.getIndexing() == null ? null : totalStats.getIndexing().getTotal().getMaxLastIndexRequestTimestamp());
Long ts = totalStats.getIndexing() == null ? null : totalStats.getIndexing().getTotal().getMaxLastIndexRequestTimestamp();
table.addCell(
ts == null || ts == 0 ? null : STRICT_DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(ts).atZone(ZoneOffset.UTC))
);

table.endRow();
}

return table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ public void testToXContentForIndexingStats() throws IOException {
+ counter[3]
+ ",\"5xx\":"
+ counter[4]
+ "}}}";
+ "},\"max_last_index_request_timestamp\":"
+ totalStats.getMaxLastIndexRequestTimestamp()
+ "}}";

XContentBuilder xContentBuilder = MediaTypeRegistry.contentBuilder(MediaTypeRegistry.JSON);
xContentBuilder.startObject();
Expand All @@ -115,6 +117,64 @@ public void testToXContentForIndexingStats() throws IOException {
assertEquals(expected, xContentBuilder.toString());
}

/**
* Tests aggregation logic for maxLastIndexRequestTimestamp in IndexingStats.Stats.
* Uses reflection because the field is private and not settable via public API.
* This ensures that aggregation (add) always surfaces the maximum value, even across multiple adds and random values.
*/
public void testMaxLastIndexRequestTimestampAggregation() throws Exception {
// Use explicit values for all fields except the timestamp
IndexingStats.Stats.DocStatusStats docStatusStats = new IndexingStats.Stats.DocStatusStats();
long ts1 = randomLongBetween(0, 1000000);
long ts2 = randomLongBetween(0, 1000000);
long ts3 = randomLongBetween(0, 1000000);
IndexingStats.Stats stats1 = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, false, 9, docStatusStats, ts1);
IndexingStats.Stats stats2 = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, false, 9, docStatusStats, ts2);
IndexingStats.Stats stats3 = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, false, 9, docStatusStats, ts3);

// Aggregate stats1 + stats2
stats1.add(stats2);
assertEquals(Math.max(ts1, ts2), stats1.getMaxLastIndexRequestTimestamp());

// Aggregate stats1 + stats3
stats1.add(stats3);
assertEquals(Math.max(Math.max(ts1, ts2), ts3), stats1.getMaxLastIndexRequestTimestamp());

// Test with zero and negative values
IndexingStats.Stats statsZero = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, false, 9, docStatusStats, 0L);
IndexingStats.Stats statsNeg = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, false, 9, docStatusStats, -100L);
statsZero.add(statsNeg);
assertEquals(0L, statsZero.getMaxLastIndexRequestTimestamp());

IndexingStats.Stats statsNeg2 = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, false, 9, docStatusStats, -50L);
statsNeg.add(statsNeg2);
assertEquals(-50L, statsNeg.getMaxLastIndexRequestTimestamp());
}

public void testMaxLastIndexRequestTimestampBackwardCompatibility() throws IOException {
IndexingStats.Stats.DocStatusStats docStatusStats = new IndexingStats.Stats.DocStatusStats();
long ts = randomLongBetween(0, 1000000);
IndexingStats.Stats stats = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, false, 9, docStatusStats, ts);

// Serialize with V_3_1_0 (should include the field)
BytesStreamOutput outNew = new BytesStreamOutput();
outNew.setVersion(org.opensearch.Version.V_3_2_0);
stats.writeTo(outNew);
StreamInput inNew = outNew.bytes().streamInput();
inNew.setVersion(org.opensearch.Version.V_3_2_0);
IndexingStats.Stats deserializedNew = new IndexingStats.Stats(inNew);
assertEquals(ts, deserializedNew.getMaxLastIndexRequestTimestamp());

// Serialize with V_2_11_0 (should NOT include the field, should default to 0)
BytesStreamOutput outOld = new BytesStreamOutput();
outOld.setVersion(org.opensearch.Version.V_2_11_0);
stats.writeTo(outOld);
StreamInput inOld = outOld.bytes().streamInput();
inOld.setVersion(org.opensearch.Version.V_2_11_0);
IndexingStats.Stats deserializedOld = new IndexingStats.Stats(inOld);
assertEquals(0L, deserializedOld.getMaxLastIndexRequestTimestamp());
}

private IndexingStats createTestInstance() {
IndexingStats.Stats.DocStatusStats docStatusStats = new IndexingStats.Stats.DocStatusStats();
for (int i = 1; i < 6; ++i) {
Expand All @@ -132,7 +192,8 @@ private IndexingStats createTestInstance() {
randomNonNegativeLong(),
randomBoolean(),
randomNonNegativeLong(),
docStatusStats
docStatusStats,
randomLong()
);

return new IndexingStats(stats);
Expand Down
Loading
Loading