Add resource stats to task framework#1555
Add resource stats to task framework#1555sruti1312 wants to merge 5 commits intoopensearch-project:mainfrom sruti1312:feature/stat-task-framework
Conversation
|
Can one of the admins verify this patch? |
|
✅ Gradle Wrapper Validation success c16586137b6ffe154ad6677f95b380a4d8974019 |
|
✅ Gradle Precommit success c16586137b6ffe154ad6677f95b380a4d8974019 |
|
❌ Gradle Check failure c16586137b6ffe154ad6677f95b380a4d8974019 |
Signed-off-by: Sruti Parthiban <partsrut@amazon.com>
|
✅ Gradle Wrapper Validation success 4744ffe |
|
✅ Gradle Precommit success 4744ffe |
|
LGTM, how do you plan to use it? |
|
@dblock - The resource stats can be used to get top N resource consuming tasks. We can use the |
| /** | ||
| * This listener is notified whenever an task is completed and has stats present | ||
| */ | ||
| public interface TaskStatConsumer { |
There was a problem hiding this comment.
May be TaskStatsConsumer would be better name ?
| import java.util.HashMap; | ||
| import java.util.Map; | ||
|
|
||
| public class StatCollectorTask extends CancellableTask { |
There was a problem hiding this comment.
StatsCollectorTask, since it collect all stats?
| private final SetOnce<TaskCancellationService> cancellationService = new SetOnce<>(); | ||
|
|
||
| /** Consumers that are notified of the stats */ | ||
| private List<TaskStatConsumer> statConsumers; |
There was a problem hiding this comment.
I believe we should use the thread-safe data structure here, TaskManager is a singleton and addTaskStatConsumer could be called any time. Alternatively, we could make it final and add the constructor argument (instead of addTaskStatConsumer) to exclude any races.
Thanks @dblock , I have a few comments primarily regarding naming, it looks inconsistent (to me), there is a mix of |
Thanks! @sruti1312 care to address naming? |
server/src/main/java/org/opensearch/tasks/consumer/TaskStatConsumer.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/tasks/consumer/TaskStatsLogger.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/tasks/StatCollectorTask.java
Outdated
Show resolved
Hide resolved
| table.addCell(FORMATTER.format(Instant.ofEpochMilli(taskInfo.getStartTime()))); | ||
| table.addCell(taskInfo.getRunningTimeNanos()); | ||
| table.addCell(TimeValue.timeValueNanos(taskInfo.getRunningTimeNanos())); | ||
| table.addCell(taskInfo.getStatsInfo()); |
There was a problem hiding this comment.
I think we should use a request parameter (similar to detailed flag) to control if task API will return the status_info in results or not. This will help to avoid breaking the backward compatibility where old client are accessing the cluster
There was a problem hiding this comment.
@dblock : When a new field is added in any REST API response object, will that break backward compatibility from client perspective ? Or adding a new field is fine and client doesn't perform any strict schema validation on response object ? Based on that info we can decide if a new parameter is needed to control the response payload with new field or not.
There was a problem hiding this comment.
I think new field are ok, @nknize can you confirm please?
| builder.field(attribute.getKey(), attribute.getValue()); | ||
| } | ||
| builder.endObject(); | ||
| if (statsInfo != null) { |
There was a problem hiding this comment.
In what case statsInfo will be null here ? I see it is always being setting as Collections.emptyMap() in callers if not supported by task
| }, | ||
| "stats_info": { | ||
| "type" : "object", | ||
| "enabled" : false |
There was a problem hiding this comment.
how this new field will be handled for existing task result indices ?
| parentTask, | ||
| headers | ||
| headers, | ||
| this instanceof StatCollectorTask ? ((StatCollectorTask) this).getStats() : Collections.emptyMap() |
There was a problem hiding this comment.
We are using empty map as default for statsInfo in TaskInfo, that means for all the API like CancelTask as well it will end up returning an empty object, which is not needed. Instead, we will need to have some mechanism similar to description field which supports nullability and is only returned if detailed flag is set in request. For cancel type API that is always treated as false and the field in not returned in the response. You can refer the caller of this method
| TaskId parentTaskId, | ||
| Map<String, String> headers | ||
| Map<String, String> headers, | ||
| Map<String, Long> statsInfo |
There was a problem hiding this comment.
We should instead use some stats object as value rather than Long to help with adding other fields in that object in future for each stats type
|
@sruti1312 Want to get this over the finishing line? Rebase, etc.? Adding @andrross to help out on the CRs. |
Signed-off-by: Sruti Parthiban <partsrut@amazon.com>
Signed-off-by: Sruti Parthiban <partsrut@amazon.com>
| MEMORY("memory"), | ||
| CPU("cpu"); |
There was a problem hiding this comment.
What do these stats mean? Like is the memory value a percentage or bytes? Maybe these answers are obvious to everyone but me :)
|
start gradle check |
|
Leaving this to @andrross to final/review/merge. |
client/rest-high-level/src/main/java/org/opensearch/client/tasks/TaskInfo.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/tasks/TaskStatsContext.java
Outdated
Show resolved
Hide resolved
|
|
||
| private final Map<String, String> headers; | ||
|
|
||
| private final Map<String, List<TaskResourceStatsUtil>> resourceStats; |
There was a problem hiding this comment.
Current class names are confusing to me specially ones with Util and Helper as suffix in it. Does are usually used for static utility classes. How about below names:
TaskResourceStatsUtil --> TaskCompleteResourceInfo
TaskResourceStatsHelper: ---> TaskResourceInfo
TaskResourceMetric: --> TaskStatsInfo
This will make the different classes like below which shows the relation between them.
TaskCompleteResourceInfo:
Map<TaskStatsType, TaskResourceInfo>
TaskResourceInfo:
Map<TaskStats, TaskStatsInfo>
TaskStatsInfo:
private final TaskStats stats;
private final boolean absolute;
private long startValue;
private long endValue;
| private TaskId parentTaskId; | ||
| private final Map<String, Object> status = new HashMap<>(); | ||
| private final Map<String, String> headers = new HashMap<>(); | ||
| private final List<Object> resourceStats = new ArrayList<>(); |
There was a problem hiding this comment.
Probably we should have it as Map<String, Object> where initially the entry will be for total resources consumed at task level. That way the serialization can also be of below format:
resource_stats: {
"total": {
"cpu_time": 183930,
"memory_in_bytes": 1010293
}
}
| appender.search_fatlog_rolling.type = Console | ||
| appender.search_fatlog_rolling.name = search_fatlog_rolling | ||
| appender.search_fatlog_rolling.layout.type = OpenSearchJsonLayout | ||
| appender.search_fatlog_rolling.layout.type_name = search_fatlog |
There was a problem hiding this comment.
let's name it something else other than search_fatlog like tasks_info.log or tasks_details.log. Considering it can be used for other actions except search let's not add search in the log name.
| /** Consumers that are notified of the stats */ | ||
| private final List<Consumer<TaskStatsContext>> statsConsumers = new ArrayList<Consumer<TaskStatsContext>>() { | ||
| { | ||
| add(new TaskSearchStatsLogger()); |
There was a problem hiding this comment.
Is this consumer enabled by default ?
|
✅ Gradle Check success 46e533ca57b8be2da523897514b9dcf45b95972b |
|
✅ Gradle Check success e314321a11d478d177a3e7ae061ff1294e009385 |
|
❌ Gradle Check failure 1432f54c93aef302454f3f2e3a64e3c926edfa46 |
Signed-off-by: sruti1312 <srutiparthiban@gmail.com>
| } | ||
|
|
||
| public long getTotalResourceUtilization(TaskStats taskStats) { | ||
| AtomicLong totalResourceConsumption = new AtomicLong(); |
There was a problem hiding this comment.
Using an AtomicLong just to be able to capture side effects from inside lambda functions seems like an anti-pattern to me. It implies there's some sort of concurrency here and also seems a bit like a hack to get around the compiler. I think you can rewrite this pretty simply to use regular for loops and a primitive long, like:
long totalResourceConsumption = 0L;
for (List<TaskCompleteResourceInfo> statsUtilList : resourceStats.values()) {
for (TaskCompleteResourceInfo statsUtil : statsUtilList) {
for (Map.Entry<TaskStatsType, TaskResourceInfo> entry : statsUtil.getResourceInfo().entrySet()) {
if (!entry.getKey().isOnlyForAnalysis()) {
totalResourceConsumption += entry.getValue().getStatsInfo().get(taskStats).getTotalValue();
}
}
}
}
return totalResourceConsumption;
I think you can also write this using the Java stream API, but I find it a bit hard to read and might prefer the more verbose style above:
return resourceStats.values().stream().mapToLong(
statsUtilList -> statsUtilList.stream().mapToLong(
statsUtil -> statsUtil.getResourceInfo().entrySet().stream()
.filter(e -> !e.getKey().isOnlyForAnalysis())
.mapToLong(e -> e.getValue().getStatsInfo().get(taskStats).getTotalValue())
.sum())
.sum())
.sum();
| this.startTime = startTime; | ||
| this.startTimeNanos = startTimeNanos; | ||
| this.headers = headers; | ||
| this.resourceStats = resourceStats; |
There was a problem hiding this comment.
The constructor above creates a ConcurrentHashMap, but this constructor allows a caller to inject any kind of map it wants. If thread safety is important then usage of a thread safe map should be required (either copy the provided map into a ConcurrentHashMap or make the type ConcurrentMap). See my comment below about thread safety though.
| TaskStatsType statsType, | ||
| TaskStatsInfo... taskResourceMetrics | ||
| ) { | ||
| List<TaskCompleteResourceInfo> taskStatsUtilList = resourceStats.getOrDefault(threadId, new ArrayList<>()); |
There was a problem hiding this comment.
resourceStats is a ConcurrentHashMap and therefore is thread safe, but this method isn't. Two threads could race in this method and one would end up overwriting the effects of the other. Is thread safety actually required?
Signed-off-by: Sruti Parthiban partsrut@amazon.com
Description
Add resource stats to task framework. The task expose interfaces to allow updating the resource stats. On task completion, the resource usage stats are emitted to registered consumers (or sinks).
Sample task response:
Issues Resolved
This is used for Search Memory tracking.
#1009
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.