Skip to content

Limit maximum size of dynamic filter collected by coordinator#12963

Merged
arhimondr merged 7 commits intotrinodb:masterfrom
arhimondr:df-limits-coordinator
Jul 15, 2022
Merged

Limit maximum size of dynamic filter collected by coordinator#12963
arhimondr merged 7 commits intotrinodb:masterfrom
arhimondr:df-limits-coordinator

Conversation

@arhimondr
Copy link
Contributor

Description

With fault tolerant execution enabled the number of tasks producing dynamic filters could be much higher (much larger than the total number of nodes available in the cluster). The limit is introduced to avoid pushing the coordinator out of memory by collecting to many distinct values from multiple tasks.

Is this change a fix, improvement, new feature, refactoring, or other?

Improvement

Is this a change to the core query engine, a connector, client library, or the SPI interfaces? (be specific)

Core engine

How would you describe this change to a non-technical end user or system administrator?

Not user visible (dynamic filtering is not yet enabled for fault tolerant execution)

Related issues, pull requests, and links

#12753
#12152

Documentation

(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.

Release notes

(x) No release notes entries required.
( ) Release notes entries required with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

@arhimondr arhimondr requested a review from raunaqmorarka June 23, 2022 20:00
@cla-bot cla-bot bot added the cla-signed label Jun 23, 2022
@arhimondr arhimondr force-pushed the df-limits-coordinator branch 6 times, most recently from 71f46b2 to 5b7ceea Compare June 24, 2022 22:11
@arhimondr arhimondr changed the title [WIP] Limit maximum size of dynamic filter collected by coordinator Limit maximum size of dynamic filter collected by coordinator Jun 24, 2022
@arhimondr arhimondr force-pushed the df-limits-coordinator branch from 5b7ceea to 70d5cfd Compare June 25, 2022 03:27
Copy link
Member

@sopel39 sopel39 Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this extensive refactor is needed for Limit maximum size of dynamic filter collected by coordinator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refactor is needed to merge dynamic filters incrementally. Enforcing maximum filter size before domains are merged together may result in hitting the limit prematurely

Copy link
Member

@sopel39 sopel39 Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this refactor is probably an overkill for capping max DF size. I think it actually introduces quadratic computational complexity too (merging multiple domains at once is more efficient).

I would rather just keep track of overall DF size.

Do we really need to cap DF size since tasks already cap DF and number of tasks is limited?
For partitioned join, each domain is separate so union is not needed. For broadcast join, we simply skip collecting subsequent domains once we get a first one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it actually introduces quadratic computational complexity too (merging multiple domains at once is more efficient).

The ValueSet is currently being merged iteratively anyway: https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/predicate/ValueSet.java#L135, https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/predicate/SortedRangeSet.java#L580, https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/predicate/EquatableValueSet.java#L277. It may still result in additional objects allocation along the way, but the overhead shouldn't be significant.

I would rather just keep track of overall DF size.

It may result in hitting the limit prematurely.

Do we really need to cap DF size since tasks already cap DF and number of tasks is limited?

In fault tolerant execution we may create thousands and thousands of tasks as the total number of tasks is no longer limited by the cluster size

For partitioned join, each domain is separate so union is not needed.

This is no longer true for fault tolerant execution, as the filters are collected before the exchange.

Copy link
Member

@sopel39 sopel39 Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to 4fb9bb7 union is now more efficient than O(N^2) but there is still room for improvement.

It may result in hitting the limit prematurely.

Is that really that big of a problem?

In fault tolerant execution we may create thousands and thousands of tasks as the total number of tasks is no longer limited by the cluster size

That really stresses the issue with quadratic computation. If every new task adds a tiny bit of information that coordinator needs to union over and over again.

This is no longer true for fault tolerant execution, as the filters are collected before the exchange.

I'm worried that we are repurposing DF mechanism for use-cases that won't be really that relevant for Tardigrade and that proper long-term solution is really adaptive planning. DF in Tardigrade was suppsed to be easy, but it exploded to rather significant effort. It might be best to just continue, but the complexity will remain even if new code won't be used much by community

cc @martint

Copy link
Contributor Author

@arhimondr arhimondr Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that really that big of a problem?

For fault tolerant execution it is. There's a high chance the domains collected by different tasks may contain almost identical set of values as the values are going to be distributed uniformly across the tasks.

That really stresses the issue with quadratic computation. If every new task adds a tiny bit of information that coordinator needs to union over and over again.

I can add an optimization and only union when the size limit is reached.

I'm worried that we are repurposing DF mechanism for use-cases that won't be really that relevant for Tardigrade and that proper long-term solution is really adaptive planning. DF in Tardigrade was suppsed to be easy, but it exploded to rather significant effort. It might be best to just continue, but the complexity will remain even if new code won't be used much by community

For now we see this as a long term solution. For adaptive re-planning we are thinking about starting with something simple and rely only on the "data size" metric for each partition that is "free". Collecting more advanced statistics (such as NDV, etc.) at shuffle boundary is expensive and we have to be careful to make sure that the extra optimizations that are possible with the advance statistics are going to pay off the stats collection cost.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The long term goal is to unify both execution models. We should be able to take advantage of fault-tolerant execution while still being able to do speculative or quasi-pipelined execution where appropriate. Unless we're planning to get rid of DF, it does make sense to make it work with fault tolerant execution.

Also, keep in mind that fault tolerant execution is not just for long running batch queries. It can be useful for interactive queries when running clusters in unreliable hardware or ephemeral instances (e.g., spot instances in AWS)

Copy link
Member

@sopel39 sopel39 Jul 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The long term goal is to unify both execution models.

Unification can also mean hybrid model, where short queries (few mins) execute with query-level restarts (maybe speculatively), while long running queries execute with task level retries.

Unless we're planning to get rid of DF, it does make sense to make it work with fault tolerant execution.

I don't think it's that simple. IMO the complexity or Tardigrade + "new" DFs is not worth for interactive queries:

  1. there is perf penality
  2. there is higher cumulative cost too as data needs to be kept in memory
  3. Overall system/setup is much more complicated

I think we should try to make Tardigrade default execution mode for long running or memory intensive queries first (that is actually intended Tardigrade use-case). However, I'm not sure this can be done without having better (non-S3) shuffle service in OS

Also, keep in mind that fault tolerant execution is not just for long running batch queries. It can be useful for interactive queries when running clusters in unreliable hardware or ephemeral instances

Why can we start by making Tardigrade a default execution mode for large/long queries in OS? I don't think Tardigrade is that beneficial for interactive queries. Time gap for failures is much shorter, it seems that query restarts are sufficient.

Copy link
Member

@sopel39 sopel39 Jul 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also add that using shuffle service for every query is a big paradigm shift in how Trino runs queries and manages resources. Shuffle service is not native to Trino. There are setups where setting up shuffle service won't be possible (e.g. native deployments). It's also something that Trino users don't have operational experience with.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DF is not only beneficial for interactive queries. It helps with cutting computational costs for long running queries as well. While we don't care too much about low latency queries yet, we do care about overall system efficiency for fault tolerant execution (currently there's less than 15% regression in CPU efficiency with fault tolerant execution enabled, the current efficiency regression comes from applying encryption and compression for data exchanges that in theory could be disabled)

@arhimondr arhimondr force-pushed the df-limits-coordinator branch 2 times, most recently from 974f6b6 to d82c691 Compare June 27, 2022 23:23
@arhimondr
Copy link
Contributor Author

Added optimization to avoid unnecessary unions. Now the union is done only when the size limit is reached.

Copy link
Member

@raunaqmorarka raunaqmorarka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor comments

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this condition be a checkState or is this a valid scenario ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current implementation it is possible for this method to be called more than once.

To be honest I'm not a huge fan of the current flow (stageDynamicFilters + stageNumberOfTasks) and likely with the future iterations of fault tolerant execution when fragments and stages no longer correspond to 1-1 relation it might not even work. So probably in the future we might want to make this explicit (when scheduler set's expected dynamic filters and the expected number of tasks directly). But I decided to postpone this refactor until necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous implementation we had tried to deliver all the dynamic filters that a consumer was interested in within a single batch. This is because the consumer here is going to send dynamic filters to all worker nodes via HttpRemoteTask#sendUpdate. Is it possible to do something like that here as well or do you feel the overheads are small enough that it doesn't matter ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume in practice all the filters will be delivered in one shot anyway as it is more likely for the same thread to add more filters before the update is scheduled asynchronously. However I suppose even if it's split into 2 requests it shouldn't be a big deal. We should probably run a benchmark though. But I would be surprised if it had any meaningful impact on performance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be checkState or it's valid for stageCannotScheduleMoreTasks to get called multiple times for a StageId ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current implementation it is possible for this method to be called more than once. I'm not sure if that's the right approach though. I guess we can change it later as a separate refactor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we know the expected tasks count before collection begins, could we just create a dense bitmap ourselves ? Roaring could still be more compact due to using RLE, but I'm not sure if we really need to squeeze space that much here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expected number of tasks is supplied later on asynchronously. Also while implementing an efficient bitmap is not extremely difficult it is also non trivial, thus I would probably stick with the library.

Union collected domains iteratively
A set of collected tasks is stored for each dynamic filter. When the
number of dynamic filters collected is high and there are thousands of
tasks a naive way of storing collected tasks (via a Set of Integer)
could create a significant memory overhead.
Union domains only when the size limit is exceeded
In DynamicFilterCollectionContext the new domain is optimistically
added to the queue. If the collection is finished in the meantime the
queue has to be properly cleaned.
Union domains only when the size limit is exceeded
@arhimondr arhimondr force-pushed the df-limits-coordinator branch from d82c691 to ad5b2c0 Compare July 14, 2022 20:43
@arhimondr
Copy link
Contributor Author

Updated

Copy link
Member

@raunaqmorarka raunaqmorarka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've run TPC benchmarks and confirmed no regressions

@arhimondr arhimondr merged commit 145ec31 into trinodb:master Jul 15, 2022
@arhimondr arhimondr deleted the df-limits-coordinator branch July 15, 2022 16:44
@github-actions github-actions bot added this to the 391 milestone Jul 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

5 participants