Skip to content

Introduce classes for collecting source statistics#19276

Merged
rapids-bot[bot] merged 35 commits intorapidsai:branch-25.08from
rjzamora:stats-classes
Jul 16, 2025
Merged

Introduce classes for collecting source statistics#19276
rapids-bot[bot] merged 35 commits intorapidsai:branch-25.08from
rjzamora:stats-classes

Conversation

@rjzamora
Copy link
Copy Markdown
Member

@rjzamora rjzamora commented Jul 2, 2025

Description

Probably supersedes #19130

The goal of this PR is to define the classes needed to store column statistics for an IR node. Some cirteria:

  • We need the statistics for a column to contain a reference to the underlying datasource information (e.g. unique-value statistics, row-count, and average storage/file size).
  • We want caching for each datasource and column.
  • We want the option to perform metadata/data sampling lazily on the datasource.
  • We want our Parquet partitioning logic to use the same infrastructure (to avoid redundant sampling).
  • We want to record when a specific statistic is "exact" (rather than estimated).

Also related:

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

@rjzamora rjzamora self-assigned this Jul 2, 2025
@rjzamora rjzamora added 2 - In Progress Currently a work in progress improvement Improvement / enhancement to an existing function non-breaking Non-breaking change cudf.polars labels Jul 2, 2025
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot bot commented Jul 2, 2025

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@github-actions github-actions bot added Python Affects Python cuDF API. cudf-polars Issues specific to cudf-polars labels Jul 2, 2025
@GPUtester GPUtester moved this to In Progress in cuDF Python Jul 2, 2025
@rjzamora
Copy link
Copy Markdown
Member Author

rjzamora commented Jul 2, 2025

/ok to test

@rjzamora rjzamora marked this pull request as ready for review July 3, 2025 13:34
@rjzamora rjzamora requested a review from a team as a code owner July 3, 2025 13:34
@rjzamora rjzamora requested review from mroeschke and wence- July 3, 2025 13:34
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I am adding the "base" stats/info classes to the base module, because these classes should not have any type dependencies (and should be available to use in other modules without any circular-dependency worries). We may want to add a dedicated statistics module to implement the IR-specific logic for populating/propagating this logic, but I'll leave that decision for a follow-up.

return f"{type(node).__name__.lower()}-{hash(node)}"


class RowCountInfo:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'v decided to create a class for each individual type of statistic that we may want to keep track of. Right now, these are: RowCountInfo, UniqueInfo, and StorageSizeInfo. Each of these classes includes an exact attribute so that we can record/track when some piece of information is known "exactly".

self.exact = exact


class DataSourceInfo:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we will want to record/track RowCountInfo, UniqueInfo, and StorageSizeInfo for the underlying datasources used in our query. In order to manage this "source" information in one place, I'm introducing a DataSourceInfo class. This class is designed with "lazy" metadata/data sampling in mind.

We define Parquet- and DataFrame-specific sub-classes for DataSourceInfo in experimental.io.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think DataSourceInfo can be an abstract base class

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will get a DataSourceInfo object for CSV and Json data, so it can't be an abstract base class unless I add another EmptySourceInfo class for them to use (or duplicate the logic in CsvSourceInfo and JsonSourceInfo).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right makes sense

self.name = name
self.source = source or DataSourceInfo()
self.source_name = source_name or name
self.unique_info = unique_info or UniqueInfo()
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This attribute is not populated in this PR (we only focus on populating source for now). When we actually use the statistics for more than partitioning (in a follow-up), we will use this attribute to account for changes in the unique-value information as IR nodes "modify" the data.

Notes:

  • We will not modify the source or source_name attributes when we copy/transfer a ColumnStats object for a parent IR node. We will either preserve the source information, or loose it. The unique_info attribute, however, can certainly change.
  • The row-count (cardinality) estimate is not stored at the ColumnStats level, because it technically "belongs" to the table, rather than the individual column. We do have access to the source RowCountInfo through ColumnStats.source.row_count. However, I'm expecting us to track the cardinality estimate for each IR node outside the ColumnStats dictionary for each IR node.

self._num_row_groups_per_file = num_row_groups_per_sampled_file
self._row_count = RowCountInfo(value=row_count, exact=exact)

def _sample_row_groups(self) -> None:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the row-group sampling logic is separate from the metadata sampling logic. Sampling "real" row-groups is probably more expensive. Therefore, we want to avoid doing it until absolutely necessary.

We also want to use the add_unique_stats_column method when we implement "full" statistics collection to make sure we collect unique-value statistics for all necessary columns at once. I'm thinking that we can call this method for IR nodes like GroupBy and Distinct in the initial IR-graph traversal to collect "base" statistics information. The follow-up traversal would then sample/cache the unique-vale statistics for all necessary columns at once on the first call to _get_unique_fractions or ColumnStats.source.unique(source_name).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Therefore, we want to avoid doing it until absolutely necessary.

I'll be pedantic (sorry) and suggest / ask whether this should be "avoid doing it unless absolutely necessary". Is there any difference between doing all the row group sampling at the start of IR translation / lowering, vs. doing it while lowering a specific node? (And presumably it has to be done before we're actually executing an IR node, since we'll use the information gathered to pick what IR node to use).

I should have read ahead, but I think your second paragraph is getting to this.

Copy link
Copy Markdown
Member Author

@rjzamora rjzamora Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any difference between doing all the row group sampling at the start of IR translation / lowering, vs. doing it while lowering a specific node?

Yeah, my thinking is that there is not a real difference as long as we sample all the necessary columns from the same datasource at once. The benefit of making the row-group sampling lazy is that we enable this kind of "aggregated" sampling. I'm expecting the final statistics-collection procedure to look something like this:

  1. We use post_traversal to collect/propagate "base" datasource information to all IR nodes.
    • Since the underlying DataSourceInfo objects collect Parquet statistics lazily, we don't need to do any expensive row-group sampling during this traversal.
    • As we post-traverse the IR graph, we can use DataSourceInfo.add_unique_stats_column to populate the set of datasource column names needing unique-value statistics (e.g. group_by and Distinct keys).
    • We can also use this traversal to populate data-structures that record Join keys (not included in this PR)
  2. We use post_traversal to estimate the intermediate cardinality and UniqueInfo for each IR node.
    • This traversal will trigger metadata sampling (to estimate/collect datasource cardinality), and may trigger row-group sampling.
  3. We lower the IR graph as usual.
    • This traversal will trigger row-group sampling if any unique-value statistics are needed (and the relevant UniqueInfo wasn't already cached).
    • We can use the intermediate cardinality estimates from traversal (2) to inject Repartition optimizations.

Comment on lines +594 to +601
@functools.lru_cache(maxsize=10)
def _sample_pq_stats(
paths: tuple[str, ...],
max_file_samples: int,
max_rg_samples: int,
) -> PqSourceInfo:
"""Return Parquet datasource information."""
return PqSourceInfo(paths, max_file_samples, max_rg_samples)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of think that this cache should be per query.

Rationale, between queries I don't think we can guarantee that the statistics are valid, because the files might be overwritten.

WDYT?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't comment on this explicitly yet.

You are correct that we need to cache a bit more than a tuple of paths if we want to account for over-written data. Dask tries to do this with a checksum. We can start by clearing the cache for every new query, but I'd love to find a way to preserve the cache between queries (very useful for remote data in practice).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to avoid caching (at least beyond single query) until we have a way of knowing if the contents might have changed through something like an etag from the filesystem giving us the files.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the current implementation will clear the caches every time we call evaluate_streaming. Hopefully that is reasonable for now?

Copy link
Copy Markdown
Contributor

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I forgot to hit submit on this earlier so some of the comments might be out of date / answered elsewhere. I'll go through and close them out if they are).

@rjzamora
Copy link
Copy Markdown
Member Author

Update: I think this PR is now accomplishing what we need, but I'll be happy to revise.

Nest steps:

  1. (Maybe this PR?) Make the number of sampled parquet files and parquet row-groups configurable.
  2. Implement a post_traversal pass over the un-lowered IR graph to populate dict[IR, dict[str, ColumnStats]] and dict[IR, RowCount] data structure with base (i.e. source) statistics.
    • This traversal will not update the ColumnStats.unique_count attribute for each column yet.
    • The goal of this traversal is to make sure DataSourceInfo and source-based row-count estimates are fully propagated.
    • We can also use this traversal to call add_unique_stats_column for known GroupBy and Distinct key columns. This way, the first call too DataSourceInfo.unique_* (expected during lowering) will collect row-group information for all known GroupBy/Distinctkeys.
  3. We leverage DataSourceInfo.unique_* statistics during lowering to avoid the need for the unique_fraction user configuration.
    • This will require us to attach the dict[IR, dict[str, ColumnStats]] data structure to the caching-visitor state (so it is accessible to the lowering logic for each IR node).
  4. We update the traversal in (1) to also collect join and join-key information in other data structures.
  5. Implement a second post_traversal pass over the un-lowered IR graph to leverage the join heuristics in (3) to adjust the dict[IR, RowCount] values and populate the ColumnStats.unique_count attributes.
  6. Use the features from (1)-(5) to inject repartitioning after operations that lead to a drop in the cardinality estimate.

@Matt711 Matt711 self-requested a review July 11, 2025 15:20
Copy link
Copy Markdown
Contributor

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is coming together nicely. Left some comments about implementation details inline.

In the future, I'd love to have a table that lists

  1. What statistics do we use (table row count, column unique count, etc.)
  2. What do we each statistic from (X from parquet file metadata, Y from sampling row groups, etc.)
  3. When do we collect each statistic (some discussing at https://github.com/rapidsai/cudf/pull/19276/files#r2190645784)
  4. When do we use them, and why

But I recognize we might not be there yet.

(Maybe this PR?) Make the number of sampled parquet files and parquet row-groups configurable.

That's easy enough to do here or separately I think. That shouldn't be too complex either way.

Copy link
Copy Markdown
Contributor

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple non-blocking comments, but overall this looks nice.

Copy link
Copy Markdown
Contributor

@Matt711 Matt711 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still looking through the PR...

self.exact = exact


class DataSourceInfo:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think DataSourceInfo can be an abstract base class

Copy link
Copy Markdown
Contributor

@Matt711 Matt711 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora, I left some minor suggestions. I'm also curious where you're going to use these statistics first?

@rjzamora rjzamora added 5 - Ready to Merge Testing and reviews complete, ready to merge and removed 3 - Ready for Review Ready for review by team labels Jul 16, 2025
@rjzamora
Copy link
Copy Markdown
Member Author

/merge

@rapids-bot rapids-bot bot merged commit 3a8a00e into rapidsai:branch-25.08 Jul 16, 2025
92 checks passed
@github-project-automation github-project-automation bot moved this from In Progress to Done in cuDF Python Jul 16, 2025
@rapids-bot
Copy link
Copy Markdown
Contributor

rapids-bot bot commented Jul 16, 2025

Failed to merge PR using squash strategy.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

5 - Ready to Merge Testing and reviews complete, ready to merge cudf-polars Issues specific to cudf-polars improvement Improvement / enhancement to an existing function non-breaking Non-breaking change Python Affects Python cuDF API.

Projects

Archived in project

Development

Successfully merging this pull request may close these issues.

5 participants