Is your feature request related to a problem? Please describe.
While serving search workload from open search cluster, the current protection mechanism on data nodes such as ThreadPoolQueueSize and CircuitBreakers are not fully efficient to protect the cluster against traffic surge, partial failures, slow node or a single rogue (resource-guzzling) query.
Problem Statement in Detail
1. Count of requests in the search queue is not the accurate reflection of the load on the nodes.
Search queue sizes are effectively fixed, and the count of requests in the search queue is not the accurate reflection of the actual load on the node. There is a need to estimate the cost of the query (based on query context) and map it against the available resources on the node, to take an admission decision. This is applicable for individual search task (shard-search-request) on the data nodes as well. Essentially we need to model the queue sizes into resource maps to selectively control the inflow.
2. Single rogue query can adversely impact data nodes and can create cascading affect of node failures.
Single bad query today can potential end up tripping the data nodes and create a cascading effect of failures across the cluster. We need to come up with fail safe mechanism where problematic queries are proactively identified, blocked or terminated in cases where a data node is running in duress. This is an extension to point 1 above, where we need checkpointing, and terminate a rogue query, in case it has breached its original estimated by some degree.
3. A single slow path can bottleneck coordinators and impact the overall service quality and availability
Each of the fanout transport request from coordinator to data nodes for transport actions such as indices:data/read/search[phase/query uses the async-io from the coordinator. While coordinator is still aggregating or awaiting response from data nodes, the ongoing execution is not accounted in the search queue size on the coordinator node. Due to this reason coordinator continues to process more incoming requests, creating more pressure on the downstream data nodes. This lack of backpressure often results into huge number of in-flight requests on the coordinator node, if data nodes are taking longer to respond back. Since coordinator manages the request contexts, partial results, failure retries, a single slow path in the cluster can bottleneck coordinators and impact the overall service quality and availability.
4. Force retries under heavy workload can worsen the situation.
Once coordinator receives responses for shard requests from data nodes, the response handling part execute-next OR the failure handling part retry-failed-shard is executed on the search thread pool. In scenarios where an individual shard search request had failed, such as due to connection/gc issue, the subsequent retry on the other shard (replica) happens via forking the request and submitting it forcefully on the search thread-pool. In such scenarios the current search threadpool size (load) is not honoured by design to reduce wastage of work already done. In non-happy scenarios, this is desirable. However, under heavy workload scenarios, if the node is already under duress this force retry actually worsen the situation.
Describe the solution you'd like
Aligned on the similar principles of how we are addressing the Back-pressure in the Indexing Path #478, the new back-pressure model in the Search/Query path will need to account for the inflight requests in the system and their associated cost based on the query contexts. Considerations need to be provided based on the query attributes, search phases and the cardinality of requests across its lifecycle at different node roles i.e. coordinator and data. Based on these parameters cost of a new request can be evaluated against the existing workload on the node, and a throttling decision can be made under duress. This should allow shard search requests to fail fast on the problematic path (node/shard) based on recent/outstanding requests, such that these requests instead can be retried on another active path (one of the replica), or returned with partial partial results (best-effort), without melting down the nodes.
In addition to smart rejections as a prevention mechanism, the new system should also leverage the search cancellation model extensively, to checkpoint and cancel the tasks (and sub-tasks) associated with the problematic/stuck queries, in order to provide a predictable recovery. This will allow cluster to continue doing the useful work as possible.
Proposal in detail
-
To manage resource utilization effectively across the cluster
Instead of limiting the requests based on the queue occupancy, we will utilize a Cost Model to identify how cheap or expensive a query is in terms of its resource consumption. Query cost on a coordinator node will be a function of the query type, its constructs (contexts and clauses), aggregation type, number of buckets, cardinality/fan-out (number of shard to search) and documents to be returned as part of the response. While query cost on the data nodes will be function of data to be scanned, documents sizes etc. This will help estimate and compute the cost of a query at intermediate stages and map them to the key resources on the node such as memory, compute and disk/network IO. Resource availability will be modelled as different TokenBuckets, and a request is permitted if the total available tokens across bucket is greater than the estimated cost of the request. The total available tokens on the node will always be finite based on the instance types, and token exhaustion will be considered as a soft limit breach situation. Burst support will be provided by the accounting model, to aid and support transient spikes, while also to confirm the state of duress by observing additional metrics, before taking a throttling decision for a request.
-
To recover from duress situation
In addition to original estimation, the Intermediate resource consumption of a query will be calculated at different stages, such as during partial query/fetch result aggregation on the coordinator. This will help compare and check-point the original estimates against the actual utilisation, and take decisions on burst vs throttle, based on the overall load on the node. Highly resource guzzling queries under duress situations can be proactively cancelled to prevent the complete meltdown of the host. The intermediate checkpointing and cancellation will allow recovery for continuation of useful work.
-
To build backpressure for node to node resiliency
We need to come up with a dedicated shard tracking structures, maintained per Primary and Replica Shards, on every Coordinator node, to track the complete life-cycle of a search operation tasks across those shards. This has to be take account of every request, at the granularity of different phases such as query and fetch phase. Also, each data node will have a similar tracking structures per shard on the node, maintained and updated at the transport action layer, to track the inflight search operations on the node. This will allow maintain the end to end view of the request, along with the current state, thereby allowing an easy identification of the stuck or slow tasks in the system. System can track the lingering requests on the node, in case of single black/grey shards issues, as well as issues due to multiple shards on the node. This will allow the back-pressure to propagate early, from the data nodes to the coordinator nodes, while preventing build up on coordinator. Additionally, coordinators can prevent new requests taking down the problematic path. Coordinators in such cases can take early decisions to either fast fail (parital-results), or retry on another shard, until system recovers.
Is your feature request related to a problem? Please describe.
While serving search workload from open search cluster, the current protection mechanism on data nodes such as ThreadPoolQueueSize and CircuitBreakers are not fully efficient to protect the cluster against traffic surge, partial failures, slow node or a single rogue (resource-guzzling) query.
Problem Statement in Detail
1. Count of requests in the search queue is not the accurate reflection of the load on the nodes.
Search queue sizes are effectively fixed, and the count of requests in the search queue is not the accurate reflection of the actual load on the node. There is a need to estimate the cost of the query (based on query context) and map it against the available resources on the node, to take an admission decision. This is applicable for individual search task (shard-search-request) on the data nodes as well. Essentially we need to model the queue sizes into resource maps to selectively control the inflow.
2. Single rogue query can adversely impact data nodes and can create cascading affect of node failures.
Single bad query today can potential end up tripping the data nodes and create a cascading effect of failures across the cluster. We need to come up with fail safe mechanism where problematic queries are proactively identified, blocked or terminated in cases where a data node is running in duress. This is an extension to point 1 above, where we need checkpointing, and terminate a rogue query, in case it has breached its original estimated by some degree.
3. A single slow path can bottleneck coordinators and impact the overall service quality and availability
Each of the fanout transport request from coordinator to data nodes for transport actions such as
indices:data/read/search[phase/queryuses the async-io from the coordinator. While coordinator is still aggregating or awaiting response from data nodes, the ongoing execution is not accounted in the search queue size on the coordinator node. Due to this reason coordinator continues to process more incoming requests, creating more pressure on the downstream data nodes. This lack of backpressure often results into huge number of in-flight requests on the coordinator node, if data nodes are taking longer to respond back. Since coordinator manages the request contexts, partial results, failure retries, a single slow path in the cluster can bottleneck coordinators and impact the overall service quality and availability.4. Force retries under heavy workload can worsen the situation.
Once coordinator receives responses for shard requests from data nodes, the response handling part execute-next OR the failure handling part retry-failed-shard is executed on the search thread pool. In scenarios where an individual shard search request had failed, such as due to connection/gc issue, the subsequent retry on the other shard (replica) happens via forking the request and submitting it forcefully on the search thread-pool. In such scenarios the current search threadpool size (load) is not honoured by design to reduce wastage of work already done. In non-happy scenarios, this is desirable. However, under heavy workload scenarios, if the node is already under duress this force retry actually worsen the situation.
Describe the solution you'd like
Aligned on the similar principles of how we are addressing the Back-pressure in the Indexing Path #478, the new back-pressure model in the Search/Query path will need to account for the inflight requests in the system and their associated cost based on the query contexts. Considerations need to be provided based on the query attributes, search phases and the cardinality of requests across its lifecycle at different node roles i.e. coordinator and data. Based on these parameters cost of a new request can be evaluated against the existing workload on the node, and a throttling decision can be made under duress. This should allow shard search requests to fail fast on the problematic path (node/shard) based on recent/outstanding requests, such that these requests instead can be retried on another active path (one of the replica), or returned with partial partial results (best-effort), without melting down the nodes.
In addition to smart rejections as a prevention mechanism, the new system should also leverage the search cancellation model extensively, to checkpoint and cancel the tasks (and sub-tasks) associated with the problematic/stuck queries, in order to provide a predictable recovery. This will allow cluster to continue doing the useful work as possible.
Proposal in detail
To manage resource utilization effectively across the cluster
Instead of limiting the requests based on the queue occupancy, we will utilize a Cost Model to identify how cheap or expensive a query is in terms of its resource consumption. Query cost on a coordinator node will be a function of the query type, its constructs (contexts and clauses), aggregation type, number of buckets, cardinality/fan-out (number of shard to search) and documents to be returned as part of the response. While query cost on the data nodes will be function of data to be scanned, documents sizes etc. This will help estimate and compute the cost of a query at intermediate stages and map them to the key resources on the node such as memory, compute and disk/network IO. Resource availability will be modelled as different TokenBuckets, and a request is permitted if the total available tokens across bucket is greater than the estimated cost of the request. The total available tokens on the node will always be finite based on the instance types, and token exhaustion will be considered as a soft limit breach situation. Burst support will be provided by the accounting model, to aid and support transient spikes, while also to confirm the state of duress by observing additional metrics, before taking a throttling decision for a request.
To recover from duress situation
In addition to original estimation, the Intermediate resource consumption of a query will be calculated at different stages, such as during partial query/fetch result aggregation on the coordinator. This will help compare and check-point the original estimates against the actual utilisation, and take decisions on burst vs throttle, based on the overall load on the node. Highly resource guzzling queries under duress situations can be proactively cancelled to prevent the complete meltdown of the host. The intermediate checkpointing and cancellation will allow recovery for continuation of useful work.
To build backpressure for node to node resiliency
We need to come up with a dedicated shard tracking structures, maintained per Primary and Replica Shards, on every Coordinator node, to track the complete life-cycle of a search operation tasks across those shards. This has to be take account of every request, at the granularity of different phases such as query and fetch phase. Also, each data node will have a similar tracking structures per shard on the node, maintained and updated at the transport action layer, to track the inflight search operations on the node. This will allow maintain the end to end view of the request, along with the current state, thereby allowing an easy identification of the stuck or slow tasks in the system. System can track the lingering requests on the node, in case of single black/grey shards issues, as well as issues due to multiple shards on the node. This will allow the back-pressure to propagate early, from the data nodes to the coordinator nodes, while preventing build up on coordinator. Additionally, coordinators can prevent new requests taking down the problematic path. Coordinators in such cases can take early decisions to either fast fail (parital-results), or retry on another shard, until system recovers.