Skip to content

Commit 6ab7538

Browse files
Add support for tracking failures at query group level (#15527) (#15539)
* add workload managementRequestFailureListener * add unit tests * add CHANGELOG * add missing javadoc * refactor * address comments * rename listener instance --------- Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
1 parent 4d8a1de commit 6ab7538

9 files changed

Lines changed: 328 additions & 74 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3333
- Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630))
3434
- [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428)))
3535
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
36+
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
3637
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
3738

3839
### Dependencies

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@
265265
import org.opensearch.watcher.ResourceWatcherService;
266266
import org.opensearch.wlm.QueryGroupService;
267267
import org.opensearch.wlm.WorkloadManagementTransportInterceptor;
268-
import org.opensearch.wlm.listeners.QueryGroupRequestRejectionOperationListener;
268+
import org.opensearch.wlm.listeners.QueryGroupRequestOperationListener;
269269

270270
import javax.net.ssl.SNIHostName;
271271

@@ -998,11 +998,12 @@ protected Node(
998998
List<IdentityAwarePlugin> identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class);
999999
identityService.initializeIdentityAwarePlugins(identityAwarePlugins);
10001000

1001-
final QueryGroupRequestRejectionOperationListener queryGroupRequestRejectionListener =
1002-
new QueryGroupRequestRejectionOperationListener(
1003-
new QueryGroupService(), // We will need to replace this with actual instance of the queryGroupService
1004-
threadPool
1005-
);
1001+
final QueryGroupService queryGroupService = new QueryGroupService(); // We will need to replace this with actual instance of the
1002+
// queryGroupService
1003+
final QueryGroupRequestOperationListener queryGroupRequestOperationListener = new QueryGroupRequestOperationListener(
1004+
queryGroupService,
1005+
threadPool
1006+
);
10061007

10071008
// register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory
10081009
final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory =
@@ -1012,7 +1013,7 @@ protected Node(
10121013
searchRequestStats,
10131014
searchRequestSlowLog,
10141015
searchTaskRequestOperationsListener,
1015-
queryGroupRequestRejectionListener
1016+
queryGroupRequestOperationListener
10161017
),
10171018
pluginComponents.stream()
10181019
.filter(p -> p instanceof SearchRequestOperationsListener)

server/src/main/java/org/opensearch/wlm/QueryGroupService.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,59 @@
99
package org.opensearch.wlm;
1010

1111
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
12+
import org.opensearch.wlm.stats.QueryGroupState;
13+
import org.opensearch.wlm.stats.QueryGroupStats;
14+
import org.opensearch.wlm.stats.QueryGroupStats.QueryGroupStatsHolder;
15+
16+
import java.util.HashMap;
17+
import java.util.Map;
1218

1319
/**
14-
* This is stub at this point in time and will be replace by an acutal one in couple of days
20+
* As of now this is a stub and main implementation PR will be raised soon.Coming PR will collate these changes with core QueryGroupService changes
1521
*/
1622
public class QueryGroupService {
23+
// This map does not need to be concurrent since we will process the cluster state change serially and update
24+
// this map with new additions and deletions of entries. QueryGroupState is thread safe
25+
private final Map<String, QueryGroupState> queryGroupStateMap;
26+
27+
public QueryGroupService() {
28+
this(new HashMap<>());
29+
}
30+
31+
public QueryGroupService(Map<String, QueryGroupState> queryGroupStateMap) {
32+
this.queryGroupStateMap = queryGroupStateMap;
33+
}
34+
35+
/**
36+
* updates the failure stats for the query group
37+
* @param queryGroupId query group identifier
38+
*/
39+
public void incrementFailuresFor(final String queryGroupId) {
40+
QueryGroupState queryGroupState = queryGroupStateMap.get(queryGroupId);
41+
// This can happen if the request failed for a deleted query group
42+
// or new queryGroup is being created and has not been acknowledged yet
43+
if (queryGroupState == null) {
44+
return;
45+
}
46+
queryGroupState.failures.inc();
47+
}
48+
49+
/**
50+
*
51+
* @return node level query group stats
52+
*/
53+
public QueryGroupStats nodeStats() {
54+
final Map<String, QueryGroupStatsHolder> statsHolderMap = new HashMap<>();
55+
for (Map.Entry<String, QueryGroupState> queryGroupsState : queryGroupStateMap.entrySet()) {
56+
final String queryGroupId = queryGroupsState.getKey();
57+
final QueryGroupState currentState = queryGroupsState.getValue();
58+
59+
statsHolderMap.put(queryGroupId, QueryGroupStatsHolder.from(currentState));
60+
}
61+
62+
return new QueryGroupStats(statsHolderMap);
63+
}
64+
1765
/**
1866
*
1967
* @param queryGroupId query group identifier

server/src/main/java/org/opensearch/wlm/ResourceType.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.opensearch.tasks.Task;
1515

1616
import java.io.IOException;
17+
import java.util.List;
1718
import java.util.function.Function;
1819

1920
/**
@@ -31,6 +32,8 @@ public enum ResourceType {
3132
private final Function<Task, Long> getResourceUsage;
3233
private final boolean statsEnabled;
3334

35+
private static List<ResourceType> sortedValues = List.of(CPU, MEMORY);
36+
3437
ResourceType(String name, Function<Task, Long> getResourceUsage, boolean statsEnabled) {
3538
this.name = name;
3639
this.getResourceUsage = getResourceUsage;
@@ -72,4 +75,8 @@ public long getResourceUsage(Task task) {
7275
public boolean hasStatsEnabled() {
7376
return statsEnabled;
7477
}
78+
79+
public static List<ResourceType> getSortedValues() {
80+
return sortedValues;
81+
}
7582
}

server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java renamed to server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListener.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,22 @@
88

99
package org.opensearch.wlm.listeners;
1010

11+
import org.opensearch.action.search.SearchPhaseContext;
1112
import org.opensearch.action.search.SearchRequestContext;
1213
import org.opensearch.action.search.SearchRequestOperationsListener;
1314
import org.opensearch.threadpool.ThreadPool;
1415
import org.opensearch.wlm.QueryGroupService;
1516
import org.opensearch.wlm.QueryGroupTask;
1617

1718
/**
18-
* This listener is used to perform the rejections for incoming requests into a queryGroup
19+
* This listener is used to listen for request lifecycle events for a queryGroup
1920
*/
20-
public class QueryGroupRequestRejectionOperationListener extends SearchRequestOperationsListener {
21+
public class QueryGroupRequestOperationListener extends SearchRequestOperationsListener {
2122

2223
private final QueryGroupService queryGroupService;
2324
private final ThreadPool threadPool;
2425

25-
public QueryGroupRequestRejectionOperationListener(QueryGroupService queryGroupService, ThreadPool threadPool) {
26+
public QueryGroupRequestOperationListener(QueryGroupService queryGroupService, ThreadPool threadPool) {
2627
this.queryGroupService = queryGroupService;
2728
this.threadPool = threadPool;
2829
}
@@ -36,4 +37,10 @@ protected void onRequestStart(SearchRequestContext searchRequestContext) {
3637
final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER);
3738
queryGroupService.rejectIfNeeded(queryGroupId);
3839
}
40+
41+
@Override
42+
protected void onRequestFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
43+
final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER);
44+
queryGroupService.incrementFailuresFor(queryGroupId);
45+
}
3946
}

server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class QueryGroupState {
3131
/**
3232
* this will track the cumulative failures in a query group
3333
*/
34-
final CounterMetric failures = new CounterMetric();
34+
public final CounterMetric failures = new CounterMetric();
3535

3636
/**
3737
* This will track total number of cancellations in the query group due to all resource type breaches
@@ -95,9 +95,18 @@ public static class ResourceTypeState {
9595
final ResourceType resourceType;
9696
final CounterMetric cancellations = new CounterMetric();
9797
final CounterMetric rejections = new CounterMetric();
98+
private double lastRecordedUsage = 0;
9899

99100
public ResourceTypeState(ResourceType resourceType) {
100101
this.resourceType = resourceType;
101102
}
103+
104+
public void setLastRecordedUsage(double recordedUsage) {
105+
lastRecordedUsage = recordedUsage;
106+
}
107+
108+
public double getLastRecordedUsage() {
109+
return lastRecordedUsage;
110+
}
102111
}
103112
}

server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,12 @@
1414
import org.opensearch.core.xcontent.ToXContentObject;
1515
import org.opensearch.core.xcontent.XContentBuilder;
1616
import org.opensearch.wlm.ResourceType;
17+
import org.opensearch.wlm.stats.QueryGroupState.ResourceTypeState;
1718

1819
import java.io.IOException;
20+
import java.util.ArrayList;
21+
import java.util.HashMap;
22+
import java.util.List;
1923
import java.util.Map;
2024
import java.util.Objects;
2125

@@ -52,7 +56,11 @@ public void writeTo(StreamOutput out) throws IOException {
5256
@Override
5357
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
5458
builder.startObject("query_groups");
55-
for (Map.Entry<String, QueryGroupStatsHolder> queryGroupStats : stats.entrySet()) {
59+
// to keep the toXContent consistent
60+
List<Map.Entry<String, QueryGroupStatsHolder>> entryList = new ArrayList<>(stats.entrySet());
61+
entryList.sort((k1, k2) -> k1.getKey().compareTo(k2.getKey()));
62+
63+
for (Map.Entry<String, QueryGroupStatsHolder> queryGroupStats : entryList) {
5664
builder.startObject(queryGroupStats.getKey());
5765
queryGroupStats.getValue().toXContent(builder, params);
5866
builder.endObject();
@@ -83,11 +91,14 @@ public static class QueryGroupStatsHolder implements ToXContentObject, Writeable
8391
public static final String REJECTIONS = "rejections";
8492
public static final String TOTAL_CANCELLATIONS = "total_cancellations";
8593
public static final String FAILURES = "failures";
86-
private final long completions;
87-
private final long rejections;
88-
private final long failures;
89-
private final long totalCancellations;
90-
private final Map<ResourceType, ResourceStats> resourceStats;
94+
private long completions;
95+
private long rejections;
96+
private long failures;
97+
private long totalCancellations;
98+
private Map<ResourceType, ResourceStats> resourceStats;
99+
100+
// this is needed to support the factory method
101+
public QueryGroupStatsHolder() {}
91102

92103
public QueryGroupStatsHolder(
93104
long completions,
@@ -111,6 +122,28 @@ public QueryGroupStatsHolder(StreamInput in) throws IOException {
111122
this.resourceStats = in.readMap((i) -> ResourceType.fromName(i.readString()), ResourceStats::new);
112123
}
113124

125+
/**
126+
* static factory method to convert {@link QueryGroupState} into {@link QueryGroupStatsHolder}
127+
* @param queryGroupState which needs to be converted
128+
* @return QueryGroupStatsHolder object
129+
*/
130+
public static QueryGroupStatsHolder from(QueryGroupState queryGroupState) {
131+
final QueryGroupStatsHolder statsHolder = new QueryGroupStatsHolder();
132+
133+
Map<ResourceType, ResourceStats> resourceStatsMap = new HashMap<>();
134+
135+
for (Map.Entry<ResourceType, ResourceTypeState> resourceTypeStateEntry : queryGroupState.getResourceState().entrySet()) {
136+
resourceStatsMap.put(resourceTypeStateEntry.getKey(), ResourceStats.from(resourceTypeStateEntry.getValue()));
137+
}
138+
139+
statsHolder.completions = queryGroupState.getCompletions();
140+
statsHolder.rejections = queryGroupState.getTotalRejections();
141+
statsHolder.failures = queryGroupState.getFailures();
142+
statsHolder.totalCancellations = queryGroupState.getTotalCancellations();
143+
statsHolder.resourceStats = resourceStatsMap;
144+
return statsHolder;
145+
}
146+
114147
/**
115148
* Writes the {@param statsHolder} to {@param out}
116149
* @param out StreamOutput
@@ -136,9 +169,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
136169
builder.field(REJECTIONS, rejections);
137170
builder.field(FAILURES, failures);
138171
builder.field(TOTAL_CANCELLATIONS, totalCancellations);
139-
for (Map.Entry<ResourceType, ResourceStats> resourceStat : resourceStats.entrySet()) {
140-
ResourceType resourceType = resourceStat.getKey();
141-
ResourceStats resourceStats1 = resourceStat.getValue();
172+
173+
for (ResourceType resourceType : ResourceType.getSortedValues()) {
174+
ResourceStats resourceStats1 = resourceStats.get(resourceType);
175+
if (resourceStats1 == null) continue;
142176
builder.startObject(resourceType.getName());
143177
resourceStats1.toXContent(builder, params);
144178
builder.endObject();
@@ -187,6 +221,19 @@ public ResourceStats(StreamInput in) throws IOException {
187221
this.rejections = in.readVLong();
188222
}
189223

224+
/**
225+
* static factory method to convert {@link ResourceTypeState} into {@link ResourceStats}
226+
* @param resourceTypeState which needs to be converted
227+
* @return QueryGroupStatsHolder object
228+
*/
229+
public static ResourceStats from(ResourceTypeState resourceTypeState) {
230+
return new ResourceStats(
231+
resourceTypeState.getLastRecordedUsage(),
232+
resourceTypeState.cancellations.count(),
233+
resourceTypeState.rejections.count()
234+
);
235+
}
236+
190237
/**
191238
* Writes the {@param stats} to {@param out}
192239
* @param out StreamOutput

0 commit comments

Comments
 (0)