Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -92,7 +92,7 @@ private void validateBackpressure(
assertTrue(ex.getMessage().contains("rejected execution on primary shard"));
assertTrue(ex.getMessage().contains(breachMode));

RemoteRefreshSegmentTracker.Stats stats = stats();
RemoteSegmentTransferTracker.Stats stats = stats();
assertTrue(stats.bytesLag > 0);
assertTrue(stats.refreshTimeLagMs > 0);
assertTrue(stats.localRefreshNumber - stats.remoteRefreshNumber > 0);
Expand All @@ -102,7 +102,7 @@ private void validateBackpressure(
.setRandomControlIOExceptionRate(0d);

assertBusy(() -> {
RemoteRefreshSegmentTracker.Stats finalStats = stats();
RemoteSegmentTransferTracker.Stats finalStats = stats();
assertEquals(0, finalStats.bytesLag);
assertEquals(0, finalStats.refreshTimeLagMs);
assertEquals(0, finalStats.localRefreshNumber - finalStats.remoteRefreshNumber);
Expand All @@ -115,11 +115,11 @@ private void validateBackpressure(
deleteRepo();
}

private RemoteRefreshSegmentTracker.Stats stats() {
private RemoteSegmentTransferTracker.Stats stats() {
String shardId = "0";
RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get();
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId);
List<RemoteStoreStats> matches = Arrays.stream(response.getShards())
List<RemoteStoreStats> matches = Arrays.stream(response.getRemoteStoreStats())
.filter(stat -> indexShardId.equals(stat.getStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;

import java.io.IOException;

Expand All @@ -24,72 +28,128 @@
*/
public class RemoteStoreStats implements Writeable, ToXContentFragment {

private final RemoteRefreshSegmentTracker.Stats remoteSegmentUploadShardStats;
private final RemoteSegmentTransferTracker.Stats remoteSegmentShardStats;

public RemoteStoreStats(RemoteRefreshSegmentTracker.Stats remoteSegmentUploadShardStats) {
this.remoteSegmentUploadShardStats = remoteSegmentUploadShardStats;
private final ShardRouting shardRouting;

public RemoteStoreStats(RemoteSegmentTransferTracker.Stats remoteSegmentUploadShardStats, ShardRouting shardRouting) {
this.remoteSegmentShardStats = remoteSegmentUploadShardStats;
this.shardRouting = shardRouting;
}

public RemoteStoreStats(StreamInput in) throws IOException {
remoteSegmentUploadShardStats = in.readOptionalWriteable(RemoteRefreshSegmentTracker.Stats::new);
this.remoteSegmentShardStats = in.readOptionalWriteable(RemoteSegmentTransferTracker.Stats::new);
this.shardRouting = new ShardRouting(in);
}

public RemoteSegmentTransferTracker.Stats getStats() {
return remoteSegmentShardStats;
}

public RemoteRefreshSegmentTracker.Stats getStats() {
return remoteSegmentUploadShardStats;
public ShardRouting getShardRouting() {
return shardRouting;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.field(Fields.SHARD_ID, remoteSegmentUploadShardStats.shardId)
.field(Fields.LOCAL_REFRESH_TIMESTAMP, remoteSegmentUploadShardStats.localRefreshClockTimeMs)
.field(Fields.REMOTE_REFRESH_TIMESTAMP, remoteSegmentUploadShardStats.remoteRefreshClockTimeMs)
.field(Fields.REFRESH_TIME_LAG_IN_MILLIS, remoteSegmentUploadShardStats.refreshTimeLagMs)
.field(Fields.REFRESH_LAG, remoteSegmentUploadShardStats.localRefreshNumber - remoteSegmentUploadShardStats.remoteRefreshNumber)
.field(Fields.BYTES_LAG, remoteSegmentUploadShardStats.bytesLag)

.field(Fields.BACKPRESSURE_REJECTION_COUNT, remoteSegmentUploadShardStats.rejectionCount)
.field(Fields.CONSECUTIVE_FAILURE_COUNT, remoteSegmentUploadShardStats.consecutiveFailuresCount);

builder.startObject(Fields.TOTAL_REMOTE_REFRESH);
builder.field(SubFields.STARTED, remoteSegmentUploadShardStats.totalUploadsStarted)
.field(SubFields.SUCCEEDED, remoteSegmentUploadShardStats.totalUploadsSucceeded)
.field(SubFields.FAILED, remoteSegmentUploadShardStats.totalUploadsFailed);
builder.startObject();
buildShardRouting(builder);
builder.startObject(Fields.SEGMENT);
builder.startObject(SubFields.DOWNLOAD);
// Ensuring that we are not showing 0 metrics to the user
if (remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesStarted != 0) {
buildDownloadStats(builder);
}
builder.endObject();
builder.startObject(SubFields.UPLOAD);
// Ensuring that we are not showing 0 metrics to the user
if (remoteSegmentShardStats.totalUploadsStarted != 0) {
buildUploadStats(builder);
}
builder.endObject();

builder.startObject(Fields.TOTAL_UPLOADS_IN_BYTES);
builder.field(SubFields.STARTED, remoteSegmentUploadShardStats.uploadBytesStarted)
.field(SubFields.SUCCEEDED, remoteSegmentUploadShardStats.uploadBytesSucceeded)
.field(SubFields.FAILED, remoteSegmentUploadShardStats.uploadBytesFailed);
builder.endObject();
return builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(remoteSegmentShardStats);
shardRouting.writeTo(out);
}

builder.startObject(Fields.REMOTE_REFRESH_SIZE_IN_BYTES);
builder.field(SubFields.LAST_SUCCESSFUL, remoteSegmentUploadShardStats.lastSuccessfulRemoteRefreshBytes);
builder.field(SubFields.MOVING_AVG, remoteSegmentUploadShardStats.uploadBytesMovingAverage);
private void buildUploadStats(XContentBuilder builder) throws IOException {
builder.field(UploadStatsFields.LOCAL_REFRESH_TIMESTAMP, remoteSegmentShardStats.localRefreshClockTimeMs)
.field(UploadStatsFields.REMOTE_REFRESH_TIMESTAMP, remoteSegmentShardStats.remoteRefreshClockTimeMs)
.field(UploadStatsFields.REFRESH_TIME_LAG_IN_MILLIS, remoteSegmentShardStats.refreshTimeLagMs)
.field(UploadStatsFields.REFRESH_LAG, remoteSegmentShardStats.localRefreshNumber - remoteSegmentShardStats.remoteRefreshNumber)
.field(UploadStatsFields.BYTES_LAG, remoteSegmentShardStats.bytesLag)
.field(UploadStatsFields.BACKPRESSURE_REJECTION_COUNT, remoteSegmentShardStats.rejectionCount)
.field(UploadStatsFields.CONSECUTIVE_FAILURE_COUNT, remoteSegmentShardStats.consecutiveFailuresCount);
builder.startObject(UploadStatsFields.TOTAL_SYNCS_TO_REMOTE)
.field(SubFields.STARTED, remoteSegmentShardStats.totalUploadsStarted)
.field(SubFields.SUCCEEDED, remoteSegmentShardStats.totalUploadsSucceeded)
.field(SubFields.FAILED, remoteSegmentShardStats.totalUploadsFailed);
builder.endObject();
builder.startObject(UploadStatsFields.TOTAL_UPLOADS_IN_BYTES)
.field(SubFields.STARTED, remoteSegmentShardStats.uploadBytesStarted)
.field(SubFields.SUCCEEDED, remoteSegmentShardStats.uploadBytesSucceeded)
.field(SubFields.FAILED, remoteSegmentShardStats.uploadBytesFailed);
builder.endObject();
builder.startObject(UploadStatsFields.REMOTE_REFRESH_SIZE_IN_BYTES)
.field(SubFields.LAST_SUCCESSFUL, remoteSegmentShardStats.lastSuccessfulRemoteRefreshBytes)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.uploadBytesMovingAverage);
builder.endObject();
builder.startObject(UploadStatsFields.UPLOAD_LATENCY_IN_BYTES_PER_SEC)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.uploadBytesPerSecMovingAverage);
builder.endObject();
builder.startObject(UploadStatsFields.REMOTE_REFRESH_LATENCY_IN_MILLIS)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.uploadTimeMovingAverage);
builder.endObject();
}

builder.startObject(Fields.UPLOAD_LATENCY_IN_BYTES_PER_SEC);
builder.field(SubFields.MOVING_AVG, remoteSegmentUploadShardStats.uploadBytesPerSecMovingAverage);
private void buildDownloadStats(XContentBuilder builder) throws IOException {
builder.field(
DownloadStatsFields.LAST_SYNC_TIMESTAMP,
remoteSegmentShardStats.directoryFileTransferTrackerStats.lastTransferTimestampMs
);
builder.startObject(DownloadStatsFields.TOTAL_DOWNLOADS_IN_BYTES)
.field(SubFields.STARTED, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesStarted)
.field(SubFields.SUCCEEDED, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesSucceeded)
.field(SubFields.FAILED, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesFailed);
builder.endObject();
builder.startObject(Fields.REMOTE_REFRESH_LATENCY_IN_MILLIS);
builder.field(SubFields.MOVING_AVG, remoteSegmentUploadShardStats.uploadTimeMovingAverage);
builder.startObject(DownloadStatsFields.DOWNLOAD_SIZE_IN_BYTES)
.field(SubFields.LAST_SUCCESSFUL, remoteSegmentShardStats.directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesMovingAverage);
builder.endObject();
builder.startObject(DownloadStatsFields.DOWNLOAD_SPEED_IN_BYTES_PER_SEC)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage);
builder.endObject();
}

return builder;
private void buildShardRouting(XContentBuilder builder) throws IOException {
builder.startObject(Fields.ROUTING);
builder.field(RoutingFields.STATE, shardRouting.state());
builder.field(RoutingFields.PRIMARY, shardRouting.primary());
builder.field(RoutingFields.NODE_ID, shardRouting.currentNodeId());
builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(remoteSegmentUploadShardStats);
static final class Fields {
static final String ROUTING = "routing";
static final String SEGMENT = "segment";
static final String TRANSLOG = "translog";
}

static final class RoutingFields {
static final String STATE = "state";
static final String PRIMARY = "primary";
static final String NODE_ID = "node";
}

/**
* Fields for remote store stats response
*/
static final class Fields {
static final String SHARD_ID = "shard_id";

static final class UploadStatsFields {
/**
* Lag in terms of bytes b/w local and remote store
*/
Expand Down Expand Up @@ -128,7 +188,7 @@ static final class Fields {
/**
* Represents the number of remote refreshes
*/
static final String TOTAL_REMOTE_REFRESH = "total_remote_refresh";
static final String TOTAL_SYNCS_TO_REMOTE = "total_syncs_to_remote";

/**
* Represents the total uploads to remote store in bytes
Expand All @@ -151,21 +211,46 @@ static final class Fields {
static final String REMOTE_REFRESH_LATENCY_IN_MILLIS = "remote_refresh_latency_in_millis";
}

static final class DownloadStatsFields {
/**
* Last successful sync from remote in milliseconds
*/
static final String LAST_SYNC_TIMESTAMP = "last_sync_timestamp";

/**
* Total bytes of segment files downloaded from the remote store for a specific shard
*/
static final String TOTAL_DOWNLOADS_IN_BYTES = "total_downloads_in_bytes";

/**
* Size of each segment file downloaded from the remote store
*/
static final String DOWNLOAD_SIZE_IN_BYTES = "download_size_in_bytes";

/**
* Speed (in bytes/sec) for segment file downloads
*/
static final String DOWNLOAD_SPEED_IN_BYTES_PER_SEC = "download_speed_in_bytes_per_sec";
}

/**
* Reusable sub fields for {@link Fields}
* Reusable sub fields for {@link UploadStatsFields} and {@link DownloadStatsFields}
*/
static final class SubFields {
static final String STARTED = "started";
static final String SUCCEEDED = "succeeded";
static final String FAILED = "failed";

static final String DOWNLOAD = "download";
static final String UPLOAD = "upload";

/**
* Moving avg over last N values stat for a {@link Fields}
* Moving avg over last N values stat
*/
static final String MOVING_AVG = "moving_avg";

/**
* Most recent successful attempt stat for a {@link Fields}
* Most recent successful attempt stat
*/
static final String LAST_SUCCESSFUL = "last_successful";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Remote Store stats response
Expand All @@ -26,49 +29,71 @@
*/
public class RemoteStoreStatsResponse extends BroadcastResponse {

private final RemoteStoreStats[] shards;
private final RemoteStoreStats[] remoteStoreStats;

public RemoteStoreStatsResponse(StreamInput in) throws IOException {
super(in);
shards = in.readArray(RemoteStoreStats::new, RemoteStoreStats[]::new);
remoteStoreStats = in.readArray(RemoteStoreStats::new, RemoteStoreStats[]::new);
}

public RemoteStoreStatsResponse(
RemoteStoreStats[] shards,
RemoteStoreStats[] remoteStoreStats,
int totalShards,
int successfulShards,
int failedShards,
List<DefaultShardOperationFailedException> shardFailures
) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.shards = shards;
this.remoteStoreStats = remoteStoreStats;
}

public RemoteStoreStats[] getShards() {
return this.shards;
public RemoteStoreStats[] getRemoteStoreStats() {
return this.remoteStoreStats;
}

public RemoteStoreStats getAt(int position) {
return shards[position];
public Map<String, Map<Integer, List<RemoteStoreStats>>> groupByIndexAndShards() {
Map<String, Map<Integer, List<RemoteStoreStats>>> indexWiseStats = new HashMap<>();
for (RemoteStoreStats shardStat : remoteStoreStats) {
indexWiseStats.computeIfAbsent(shardStat.getShardRouting().getIndexName(), k -> new HashMap<>())
.computeIfAbsent(shardStat.getShardRouting().getId(), k -> new ArrayList<>())
.add(shardStat);
}
return indexWiseStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeArray(shards);
out.writeArray(remoteStoreStats);
}

@Override
protected void addCustomXContentFields(XContentBuilder builder, Params params) throws IOException {
builder.startArray("stats");
for (RemoteStoreStats shard : shards) {
shard.toXContent(builder, params);
Map<String, Map<Integer, List<RemoteStoreStats>>> indexWiseStats = groupByIndexAndShards();
builder.startObject(Fields.INDICES);
for (String indexName : indexWiseStats.keySet()) {
builder.startObject(indexName);
builder.startObject(Fields.SHARDS);
for (int shardId : indexWiseStats.get(indexName).keySet()) {
builder.startArray(Integer.toString(shardId));
for (RemoteStoreStats shardStat : indexWiseStats.get(indexName).get(shardId)) {
shardStat.toXContent(builder, params);
}
builder.endArray();
}
builder.endObject();
builder.endObject();
}
builder.endArray();
builder.endObject();
}

@Override
public String toString() {
return Strings.toString(XContentType.JSON, this, true, false);
}

static final class Fields {
static final String SHARDS = "shards";
static final String INDICES = "indices";
}
}
Loading