Limit maximum size of dynamic filter collected by coordinator#12963
Limit maximum size of dynamic filter collected by coordinator#12963arhimondr merged 7 commits intotrinodb:masterfrom
Conversation
71f46b2 to
5b7ceea
Compare
5b7ceea to
70d5cfd
Compare
There was a problem hiding this comment.
why this extensive refactor is needed for Limit maximum size of dynamic filter collected by coordinator?
There was a problem hiding this comment.
The refactor is needed to merge dynamic filters incrementally. Enforcing maximum filter size before domains are merged together may result in hitting the limit prematurely
There was a problem hiding this comment.
I think this refactor is probably an overkill for capping max DF size. I think it actually introduces quadratic computational complexity too (merging multiple domains at once is more efficient).
I would rather just keep track of overall DF size.
Do we really need to cap DF size since tasks already cap DF and number of tasks is limited?
For partitioned join, each domain is separate so union is not needed. For broadcast join, we simply skip collecting subsequent domains once we get a first one.
There was a problem hiding this comment.
I think it actually introduces quadratic computational complexity too (merging multiple domains at once is more efficient).
The ValueSet is currently being merged iteratively anyway: https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/predicate/ValueSet.java#L135, https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/predicate/SortedRangeSet.java#L580, https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/predicate/EquatableValueSet.java#L277. It may still result in additional objects allocation along the way, but the overhead shouldn't be significant.
I would rather just keep track of overall DF size.
It may result in hitting the limit prematurely.
Do we really need to cap DF size since tasks already cap DF and number of tasks is limited?
In fault tolerant execution we may create thousands and thousands of tasks as the total number of tasks is no longer limited by the cluster size
For partitioned join, each domain is separate so union is not needed.
This is no longer true for fault tolerant execution, as the filters are collected before the exchange.
There was a problem hiding this comment.
According to 4fb9bb7 union is now more efficient than O(N^2) but there is still room for improvement.
It may result in hitting the limit prematurely.
Is that really that big of a problem?
In fault tolerant execution we may create thousands and thousands of tasks as the total number of tasks is no longer limited by the cluster size
That really stresses the issue with quadratic computation. If every new task adds a tiny bit of information that coordinator needs to union over and over again.
This is no longer true for fault tolerant execution, as the filters are collected before the exchange.
I'm worried that we are repurposing DF mechanism for use-cases that won't be really that relevant for Tardigrade and that proper long-term solution is really adaptive planning. DF in Tardigrade was suppsed to be easy, but it exploded to rather significant effort. It might be best to just continue, but the complexity will remain even if new code won't be used much by community
cc @martint
There was a problem hiding this comment.
Is that really that big of a problem?
For fault tolerant execution it is. There's a high chance the domains collected by different tasks may contain almost identical set of values as the values are going to be distributed uniformly across the tasks.
That really stresses the issue with quadratic computation. If every new task adds a tiny bit of information that coordinator needs to union over and over again.
I can add an optimization and only union when the size limit is reached.
I'm worried that we are repurposing DF mechanism for use-cases that won't be really that relevant for Tardigrade and that proper long-term solution is really adaptive planning. DF in Tardigrade was suppsed to be easy, but it exploded to rather significant effort. It might be best to just continue, but the complexity will remain even if new code won't be used much by community
For now we see this as a long term solution. For adaptive re-planning we are thinking about starting with something simple and rely only on the "data size" metric for each partition that is "free". Collecting more advanced statistics (such as NDV, etc.) at shuffle boundary is expensive and we have to be careful to make sure that the extra optimizations that are possible with the advance statistics are going to pay off the stats collection cost.
There was a problem hiding this comment.
The long term goal is to unify both execution models. We should be able to take advantage of fault-tolerant execution while still being able to do speculative or quasi-pipelined execution where appropriate. Unless we're planning to get rid of DF, it does make sense to make it work with fault tolerant execution.
Also, keep in mind that fault tolerant execution is not just for long running batch queries. It can be useful for interactive queries when running clusters in unreliable hardware or ephemeral instances (e.g., spot instances in AWS)
There was a problem hiding this comment.
The long term goal is to unify both execution models.
Unification can also mean hybrid model, where short queries (few mins) execute with query-level restarts (maybe speculatively), while long running queries execute with task level retries.
Unless we're planning to get rid of DF, it does make sense to make it work with fault tolerant execution.
I don't think it's that simple. IMO the complexity or Tardigrade + "new" DFs is not worth for interactive queries:
- there is perf penality
- there is higher cumulative cost too as data needs to be kept in memory
- Overall system/setup is much more complicated
I think we should try to make Tardigrade default execution mode for long running or memory intensive queries first (that is actually intended Tardigrade use-case). However, I'm not sure this can be done without having better (non-S3) shuffle service in OS
Also, keep in mind that fault tolerant execution is not just for long running batch queries. It can be useful for interactive queries when running clusters in unreliable hardware or ephemeral instances
Why can we start by making Tardigrade a default execution mode for large/long queries in OS? I don't think Tardigrade is that beneficial for interactive queries. Time gap for failures is much shorter, it seems that query restarts are sufficient.
There was a problem hiding this comment.
I would also add that using shuffle service for every query is a big paradigm shift in how Trino runs queries and manages resources. Shuffle service is not native to Trino. There are setups where setting up shuffle service won't be possible (e.g. native deployments). It's also something that Trino users don't have operational experience with.
There was a problem hiding this comment.
DF is not only beneficial for interactive queries. It helps with cutting computational costs for long running queries as well. While we don't care too much about low latency queries yet, we do care about overall system efficiency for fault tolerant execution (currently there's less than 15% regression in CPU efficiency with fault tolerant execution enabled, the current efficiency regression comes from applying encryption and compression for data exchanges that in theory could be disabled)
974f6b6 to
d82c691
Compare
|
Added optimization to avoid unnecessary unions. Now the union is done only when the size limit is reached. |
core/trino-main/src/main/java/io/trino/execution/DynamicFilterConfig.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Should this condition be a checkState or is this a valid scenario ?
There was a problem hiding this comment.
With the current implementation it is possible for this method to be called more than once.
To be honest I'm not a huge fan of the current flow (stageDynamicFilters + stageNumberOfTasks) and likely with the future iterations of fault tolerant execution when fragments and stages no longer correspond to 1-1 relation it might not even work. So probably in the future we might want to make this explicit (when scheduler set's expected dynamic filters and the expected number of tasks directly). But I decided to postpone this refactor until necessary.
There was a problem hiding this comment.
In the previous implementation we had tried to deliver all the dynamic filters that a consumer was interested in within a single batch. This is because the consumer here is going to send dynamic filters to all worker nodes via HttpRemoteTask#sendUpdate. Is it possible to do something like that here as well or do you feel the overheads are small enough that it doesn't matter ?
There was a problem hiding this comment.
I assume in practice all the filters will be delivered in one shot anyway as it is more likely for the same thread to add more filters before the update is scheduled asynchronously. However I suppose even if it's split into 2 requests it shouldn't be a big deal. We should probably run a benchmark though. But I would be surprised if it had any meaningful impact on performance.
There was a problem hiding this comment.
Should this be checkState or it's valid for stageCannotScheduleMoreTasks to get called multiple times for a StageId ?
There was a problem hiding this comment.
With the current implementation it is possible for this method to be called more than once. I'm not sure if that's the right approach though. I guess we can change it later as a separate refactor.
There was a problem hiding this comment.
Since we know the expected tasks count before collection begins, could we just create a dense bitmap ourselves ? Roaring could still be more compact due to using RLE, but I'm not sure if we really need to squeeze space that much here.
There was a problem hiding this comment.
The expected number of tasks is supplied later on asynchronously. Also while implementing an efficient bitmap is not extremely difficult it is also non trivial, thus I would probably stick with the library.
core/trino-main/src/main/java/io/trino/sql/planner/LocalDynamicFilterConsumer.java
Outdated
Show resolved
Hide resolved
Union collected domains iteratively
A set of collected tasks is stored for each dynamic filter. When the number of dynamic filters collected is high and there are thousands of tasks a naive way of storing collected tasks (via a Set of Integer) could create a significant memory overhead.
Union domains only when the size limit is exceeded
In DynamicFilterCollectionContext the new domain is optimistically added to the queue. If the collection is finished in the meantime the queue has to be properly cleaned.
Union domains only when the size limit is exceeded
d82c691 to
ad5b2c0
Compare
|
Updated |
raunaqmorarka
left a comment
There was a problem hiding this comment.
I've run TPC benchmarks and confirmed no regressions
Description
With fault tolerant execution enabled the number of tasks producing dynamic filters could be much higher (much larger than the total number of nodes available in the cluster). The limit is introduced to avoid pushing the coordinator out of memory by collecting to many distinct values from multiple tasks.
Improvement
Core engine
Not user visible (dynamic filtering is not yet enabled for fault tolerant execution)
Related issues, pull requests, and links
#12753
#12152
Documentation
(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
(x) No release notes entries required.
( ) Release notes entries required with the following suggested text: