Skip to content

Add resource stats to task framework#1555

Closed
sruti1312 wants to merge 5 commits intoopensearch-project:mainfrom
sruti1312:feature/stat-task-framework
Closed

Add resource stats to task framework#1555
sruti1312 wants to merge 5 commits intoopensearch-project:mainfrom
sruti1312:feature/stat-task-framework

Conversation

@sruti1312
Copy link
Copy Markdown
Contributor

@sruti1312 sruti1312 commented Nov 16, 2021

Signed-off-by: Sruti Parthiban partsrut@amazon.com

Description

Add resource stats to task framework. The task expose interfaces to allow updating the resource stats. On task completion, the resource usage stats are emitted to registered consumers (or sinks).

Sample task response:

"rh6TkaWmTwCp3Tb0zrjHKQ:8120" : {
     "node" : "rh6TkaWmTwCp3Tb0zrjHKQ",
     "id" : 8120,
     "type" : "direct",
     "action" : "indices:data/read/search[phase/query]",
     "start_time_in_millis" : 1637004446880,
     "running_time_in_nanos" : 9010172401,
     "cancellable" : true,
     "parent_task_id" : "rh6TkaWmTwCp3Tb0zrjHKQ:8119",
     "headers" : { },
     "resource_stats" : {
            "total_resource_consumption" : {
              "cpu_time_in_nanos" : 0,
              "memory_in_bytes" : 0
            }
     }
}     

Issues Resolved

This is used for Search Memory tracking.
#1009

Check List

  • New functionality includes testing.
    • All tests pass
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@sruti1312 sruti1312 requested a review from a team as a code owner November 16, 2021 01:27
@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

Can one of the admins verify this patch?

@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

✅   Gradle Wrapper Validation success c16586137b6ffe154ad6677f95b380a4d8974019

@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

✅   Gradle Precommit success c16586137b6ffe154ad6677f95b380a4d8974019

@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

❌   Gradle Check failure c16586137b6ffe154ad6677f95b380a4d8974019
Log 1093

Reports 1093

Signed-off-by: Sruti Parthiban <partsrut@amazon.com>
@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

✅   Gradle Wrapper Validation success 4744ffe

@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

✅   Gradle Precommit success 4744ffe

@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

✅   Gradle Check success 4744ffe
Log 1097

Reports 1097

@dblock
Copy link
Copy Markdown
Member

dblock commented Nov 19, 2021

LGTM, how do you plan to use it?

@sruti1312 sruti1312 changed the title [WIP] Add stats to task framework Add resource stats to task framework Nov 19, 2021
@sruti1312
Copy link
Copy Markdown
Contributor Author

@dblock - The resource stats can be used to get top N resource consuming tasks. We can use the TaskStatConsumer interface for adding log based or system index based consumer (or sink) to provide additional insights.

@dblock
Copy link
Copy Markdown
Member

dblock commented Nov 22, 2021

I approved, but am not super familiar with this code. Maybe @nknize or @reta could take a quick second look?

/**
* This listener is notified whenever an task is completed and has stats present
*/
public interface TaskStatConsumer {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be TaskStatsConsumer would be better name ?

import java.util.HashMap;
import java.util.Map;

public class StatCollectorTask extends CancellableTask {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StatsCollectorTask, since it collect all stats?

private final SetOnce<TaskCancellationService> cancellationService = new SetOnce<>();

/** Consumers that are notified of the stats */
private List<TaskStatConsumer> statConsumers;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should use the thread-safe data structure here, TaskManager is a singleton and addTaskStatConsumer could be called any time. Alternatively, we could make it final and add the constructor argument (instead of addTaskStatConsumer) to exclude any races.

@reta
Copy link
Copy Markdown
Contributor

reta commented Nov 23, 2021

I approved, but am not super familiar with this code. Maybe @nknize or @reta could take a quick second look?

Thanks @dblock , I have a few comments primarily regarding naming, it looks inconsistent (to me), there is a mix of StatXxx vs StatsXxx in different places. Otherwise, looks good, afaik it is a part of a bigger picture and real stats collectors are going to be introduced soon.

@dblock
Copy link
Copy Markdown
Member

dblock commented Nov 23, 2021

I approved, but am not super familiar with this code. Maybe @nknize or @reta could take a quick second look?

Thanks @dblock , I have a few comments primarily regarding naming, it looks inconsistent (to me), there is a mix of StatXxx vs StatsXxx in different places. Otherwise, looks good, afaik it is a part of a bigger picture and real stats collectors are going to be introduced soon.

Thanks! @sruti1312 care to address naming?

table.addCell(FORMATTER.format(Instant.ofEpochMilli(taskInfo.getStartTime())));
table.addCell(taskInfo.getRunningTimeNanos());
table.addCell(TimeValue.timeValueNanos(taskInfo.getRunningTimeNanos()));
table.addCell(taskInfo.getStatsInfo());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use a request parameter (similar to detailed flag) to control if task API will return the status_info in results or not. This will help to avoid breaking the backward compatibility where old client are accessing the cluster

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dblock : When a new field is added in any REST API response object, will that break backward compatibility from client perspective ? Or adding a new field is fine and client doesn't perform any strict schema validation on response object ? Based on that info we can decide if a new parameter is needed to control the response payload with new field or not.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think new field are ok, @nknize can you confirm please?

builder.field(attribute.getKey(), attribute.getValue());
}
builder.endObject();
if (statsInfo != null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what case statsInfo will be null here ? I see it is always being setting as Collections.emptyMap() in callers if not supported by task

},
"stats_info": {
"type" : "object",
"enabled" : false
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how this new field will be handled for existing task result indices ?

parentTask,
headers
headers,
this instanceof StatCollectorTask ? ((StatCollectorTask) this).getStats() : Collections.emptyMap()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using empty map as default for statsInfo in TaskInfo, that means for all the API like CancelTask as well it will end up returning an empty object, which is not needed. Instead, we will need to have some mechanism similar to description field which supports nullability and is only returned if detailed flag is set in request. For cancel type API that is always treated as false and the field in not returned in the response. You can refer the caller of this method

TaskId parentTaskId,
Map<String, String> headers
Map<String, String> headers,
Map<String, Long> statsInfo
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should instead use some stats object as value rather than Long to help with adding other fields in that object in future for each stats type

Copy link
Copy Markdown
Member

@dblock dblock left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for another review @sohami, will leave this open for now.

@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

✅   Gradle Check success 071e56b
Log 1513

Reports 1513

@dblock dblock requested a review from andrross December 18, 2021 16:06
@dblock
Copy link
Copy Markdown
Member

dblock commented Dec 18, 2021

@sruti1312 Want to get this over the finishing line? Rebase, etc.? Adding @andrross to help out on the CRs.

Signed-off-by: Sruti Parthiban <partsrut@amazon.com>
Signed-off-by: Sruti Parthiban <partsrut@amazon.com>
@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

✅   Gradle Check success 6c845dd
Log 1747

Reports 1747

Comment on lines +12 to +13
MEMORY("memory"),
CPU("cpu");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do these stats mean? Like is the memory value a percentage or bytes? Maybe these answers are obvious to everyone but me :)

@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

❌   Gradle Check failure 208bf3f
Log 2008

Reports 2008

@dblock
Copy link
Copy Markdown
Member

dblock commented Feb 1, 2022

start gradle check

@dblock
Copy link
Copy Markdown
Member

dblock commented Feb 1, 2022

Leaving this to @andrross to final/review/merge.

@dblock dblock requested a review from andrross February 1, 2022 20:46
@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

❌   Gradle Check failure 208bf3f
Log 2160

Reports 2160


private final Map<String, String> headers;

private final Map<String, List<TaskResourceStatsUtil>> resourceStats;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current class names are confusing to me specially ones with Util and Helper as suffix in it. Does are usually used for static utility classes. How about below names:

TaskResourceStatsUtil --> TaskCompleteResourceInfo
TaskResourceStatsHelper:  ---> TaskResourceInfo
TaskResourceMetric: --> TaskStatsInfo

This will make the different classes like below which shows the relation between them.

TaskCompleteResourceInfo:
   Map<TaskStatsType, TaskResourceInfo>

TaskResourceInfo:
   Map<TaskStats, TaskStatsInfo>

TaskStatsInfo:
   private final TaskStats stats;
    private final boolean absolute;
    private long startValue;
    private long endValue;

private TaskId parentTaskId;
private final Map<String, Object> status = new HashMap<>();
private final Map<String, String> headers = new HashMap<>();
private final List<Object> resourceStats = new ArrayList<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we should have it as Map<String, Object> where initially the entry will be for total resources consumed at task level. That way the serialization can also be of below format:

resource_stats: {
    "total": {
          "cpu_time": 183930,
          "memory_in_bytes": 1010293
      }
}

appender.search_fatlog_rolling.type = Console
appender.search_fatlog_rolling.name = search_fatlog_rolling
appender.search_fatlog_rolling.layout.type = OpenSearchJsonLayout
appender.search_fatlog_rolling.layout.type_name = search_fatlog
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's name it something else other than search_fatlog like tasks_info.log or tasks_details.log. Considering it can be used for other actions except search let's not add search in the log name.

/** Consumers that are notified of the stats */
private final List<Consumer<TaskStatsContext>> statsConsumers = new ArrayList<Consumer<TaskStatsContext>>() {
{
add(new TaskSearchStatsLogger());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this consumer enabled by default ?

@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

✅   Gradle Check success 46e533ca57b8be2da523897514b9dcf45b95972b
Log 2298

Reports 2298

@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

✅   Gradle Check success e314321a11d478d177a3e7ae061ff1294e009385
Log 2315

Reports 2315

@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

❌   Gradle Check failure 1432f54c93aef302454f3f2e3a64e3c926edfa46
Log 2316

Reports 2316

Signed-off-by: sruti1312 <srutiparthiban@gmail.com>
@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

✅   Gradle Check success eb3e993
Log 2317

Reports 2317

}

public long getTotalResourceUtilization(TaskStats taskStats) {
AtomicLong totalResourceConsumption = new AtomicLong();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an AtomicLong just to be able to capture side effects from inside lambda functions seems like an anti-pattern to me. It implies there's some sort of concurrency here and also seems a bit like a hack to get around the compiler. I think you can rewrite this pretty simply to use regular for loops and a primitive long, like:

long totalResourceConsumption = 0L;
for (List<TaskCompleteResourceInfo> statsUtilList : resourceStats.values()) {
    for (TaskCompleteResourceInfo statsUtil : statsUtilList) {
        for (Map.Entry<TaskStatsType, TaskResourceInfo> entry : statsUtil.getResourceInfo().entrySet()) {
            if (!entry.getKey().isOnlyForAnalysis()) {
                totalResourceConsumption += entry.getValue().getStatsInfo().get(taskStats).getTotalValue();
            }
        }
    }
}
return totalResourceConsumption;

I think you can also write this using the Java stream API, but I find it a bit hard to read and might prefer the more verbose style above:

return resourceStats.values().stream().mapToLong(
    statsUtilList -> statsUtilList.stream().mapToLong(
        statsUtil -> statsUtil.getResourceInfo().entrySet().stream()
            .filter(e -> !e.getKey().isOnlyForAnalysis())
            .mapToLong(e -> e.getValue().getStatsInfo().get(taskStats).getTotalValue())
            .sum())
        .sum())
    .sum();

this.startTime = startTime;
this.startTimeNanos = startTimeNanos;
this.headers = headers;
this.resourceStats = resourceStats;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor above creates a ConcurrentHashMap, but this constructor allows a caller to inject any kind of map it wants. If thread safety is important then usage of a thread safe map should be required (either copy the provided map into a ConcurrentHashMap or make the type ConcurrentMap). See my comment below about thread safety though.

TaskStatsType statsType,
TaskStatsInfo... taskResourceMetrics
) {
List<TaskCompleteResourceInfo> taskStatsUtilList = resourceStats.getOrDefault(threadId, new ArrayList<>());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resourceStats is a ConcurrentHashMap and therefore is thread safe, but this method isn't. Two threads could race in this method and one would end up overwriting the effects of the other. Is thread safety actually required?

@sruti1312
Copy link
Copy Markdown
Contributor Author

Thanks @reta @sohami and @andrross for reviewing this PR. Will address the comments as part of this PR.

Closing this PR and opening multiple smaller PR's against feature branch.

  1. Adding resource information into task framework.
  2. Integration sinks to consume resource information.

@sruti1312 sruti1312 closed this Feb 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants