Introduce classes for collecting source statistics#19276
Introduce classes for collecting source statistics#19276rapids-bot[bot] merged 35 commits intorapidsai:branch-25.08from
Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
|
/ok to test |
There was a problem hiding this comment.
Note that I am adding the "base" stats/info classes to the base module, because these classes should not have any type dependencies (and should be available to use in other modules without any circular-dependency worries). We may want to add a dedicated statistics module to implement the IR-specific logic for populating/propagating this logic, but I'll leave that decision for a follow-up.
| return f"{type(node).__name__.lower()}-{hash(node)}" | ||
|
|
||
|
|
||
| class RowCountInfo: |
There was a problem hiding this comment.
I'v decided to create a class for each individual type of statistic that we may want to keep track of. Right now, these are: RowCountInfo, UniqueInfo, and StorageSizeInfo. Each of these classes includes an exact attribute so that we can record/track when some piece of information is known "exactly".
| self.exact = exact | ||
|
|
||
|
|
||
| class DataSourceInfo: |
There was a problem hiding this comment.
I believe we will want to record/track RowCountInfo, UniqueInfo, and StorageSizeInfo for the underlying datasources used in our query. In order to manage this "source" information in one place, I'm introducing a DataSourceInfo class. This class is designed with "lazy" metadata/data sampling in mind.
We define Parquet- and DataFrame-specific sub-classes for DataSourceInfo in experimental.io.
There was a problem hiding this comment.
I think DataSourceInfo can be an abstract base class
There was a problem hiding this comment.
You will get a DataSourceInfo object for CSV and Json data, so it can't be an abstract base class unless I add another EmptySourceInfo class for them to use (or duplicate the logic in CsvSourceInfo and JsonSourceInfo).
| self.name = name | ||
| self.source = source or DataSourceInfo() | ||
| self.source_name = source_name or name | ||
| self.unique_info = unique_info or UniqueInfo() |
There was a problem hiding this comment.
This attribute is not populated in this PR (we only focus on populating source for now). When we actually use the statistics for more than partitioning (in a follow-up), we will use this attribute to account for changes in the unique-value information as IR nodes "modify" the data.
Notes:
- We will not modify the
sourceorsource_nameattributes when we copy/transfer aColumnStatsobject for a parent IR node. We will either preserve the source information, or loose it. Theunique_infoattribute, however, can certainly change. - The row-count (cardinality) estimate is not stored at the
ColumnStatslevel, because it technically "belongs" to the table, rather than the individual column. We do have access to the sourceRowCountInfothroughColumnStats.source.row_count. However, I'm expecting us to track the cardinality estimate for each IR node outside theColumnStatsdictionary for each IR node.
| self._num_row_groups_per_file = num_row_groups_per_sampled_file | ||
| self._row_count = RowCountInfo(value=row_count, exact=exact) | ||
|
|
||
| def _sample_row_groups(self) -> None: |
There was a problem hiding this comment.
Note that the row-group sampling logic is separate from the metadata sampling logic. Sampling "real" row-groups is probably more expensive. Therefore, we want to avoid doing it until absolutely necessary.
We also want to use the add_unique_stats_column method when we implement "full" statistics collection to make sure we collect unique-value statistics for all necessary columns at once. I'm thinking that we can call this method for IR nodes like GroupBy and Distinct in the initial IR-graph traversal to collect "base" statistics information. The follow-up traversal would then sample/cache the unique-vale statistics for all necessary columns at once on the first call to _get_unique_fractions or ColumnStats.source.unique(source_name).
There was a problem hiding this comment.
Therefore, we want to avoid doing it until absolutely necessary.
I'll be pedantic (sorry) and suggest / ask whether this should be "avoid doing it unless absolutely necessary". Is there any difference between doing all the row group sampling at the start of IR translation / lowering, vs. doing it while lowering a specific node? (And presumably it has to be done before we're actually executing an IR node, since we'll use the information gathered to pick what IR node to use).
I should have read ahead, but I think your second paragraph is getting to this.
There was a problem hiding this comment.
Is there any difference between doing all the row group sampling at the start of IR translation / lowering, vs. doing it while lowering a specific node?
Yeah, my thinking is that there is not a real difference as long as we sample all the necessary columns from the same datasource at once. The benefit of making the row-group sampling lazy is that we enable this kind of "aggregated" sampling. I'm expecting the final statistics-collection procedure to look something like this:
- We use
post_traversalto collect/propagate "base" datasource information to all IR nodes.- Since the underlying
DataSourceInfoobjects collect Parquet statistics lazily, we don't need to do any expensive row-group sampling during this traversal. - As we post-traverse the IR graph, we can use
DataSourceInfo.add_unique_stats_columnto populate the set of datasource column names needing unique-value statistics (e.g.group_byandDistinctkeys). - We can also use this traversal to populate data-structures that record Join keys (not included in this PR)
- Since the underlying
- We use
post_traversalto estimate the intermediate cardinality andUniqueInfofor each IR node.- This traversal will trigger metadata sampling (to estimate/collect datasource cardinality), and may trigger row-group sampling.
- We lower the IR graph as usual.
- This traversal will trigger row-group sampling if any unique-value statistics are needed (and the relevant
UniqueInfowasn't already cached). - We can use the intermediate cardinality estimates from traversal (2) to inject
Repartitionoptimizations.
- This traversal will trigger row-group sampling if any unique-value statistics are needed (and the relevant
| @functools.lru_cache(maxsize=10) | ||
| def _sample_pq_stats( | ||
| paths: tuple[str, ...], | ||
| max_file_samples: int, | ||
| max_rg_samples: int, | ||
| ) -> PqSourceInfo: | ||
| """Return Parquet datasource information.""" | ||
| return PqSourceInfo(paths, max_file_samples, max_rg_samples) |
There was a problem hiding this comment.
I kind of think that this cache should be per query.
Rationale, between queries I don't think we can guarantee that the statistics are valid, because the files might be overwritten.
WDYT?
There was a problem hiding this comment.
Sorry I didn't comment on this explicitly yet.
You are correct that we need to cache a bit more than a tuple of paths if we want to account for over-written data. Dask tries to do this with a checksum. We can start by clearing the cache for every new query, but I'd love to find a way to preserve the cache between queries (very useful for remote data in practice).
There was a problem hiding this comment.
I'd prefer to avoid caching (at least beyond single query) until we have a way of knowing if the contents might have changed through something like an etag from the filesystem giving us the files.
There was a problem hiding this comment.
Yeah, the current implementation will clear the caches every time we call evaluate_streaming. Hopefully that is reasonable for now?
TomAugspurger
left a comment
There was a problem hiding this comment.
(I forgot to hit submit on this earlier so some of the comments might be out of date / answered elsewhere. I'll go through and close them out if they are).
|
Update: I think this PR is now accomplishing what we need, but I'll be happy to revise. Nest steps:
|
There was a problem hiding this comment.
This is coming together nicely. Left some comments about implementation details inline.
In the future, I'd love to have a table that lists
- What statistics do we use (table row count, column unique count, etc.)
- What do we each statistic from (X from parquet file metadata, Y from sampling row groups, etc.)
- When do we collect each statistic (some discussing at https://github.com/rapidsai/cudf/pull/19276/files#r2190645784)
- When do we use them, and why
But I recognize we might not be there yet.
(Maybe this PR?) Make the number of sampled parquet files and parquet row-groups configurable.
That's easy enough to do here or separately I think. That shouldn't be too complex either way.
TomAugspurger
left a comment
There was a problem hiding this comment.
A couple non-blocking comments, but overall this looks nice.
Matt711
left a comment
There was a problem hiding this comment.
Still looking through the PR...
| self.exact = exact | ||
|
|
||
|
|
||
| class DataSourceInfo: |
There was a problem hiding this comment.
I think DataSourceInfo can be an abstract base class
|
/merge |
|
Failed to merge PR using squash strategy. |
Description
Probably supersedes #19130
The goal of this PR is to define the classes needed to store column statistics for an
IRnode. Some cirteria:Also related:
post_traversalAPI to cudf-polars #19258Checklist