Skip to content

[Story] Use datasource statistics in cudf-polars #19388

@rjzamora

Description

@rjzamora

In streaming cudf-polars, we do not use datasource statistics (row-count and unique-value estimates) to inform the physical plan yet. We use the average file size (per column) to set the partition size for Scan/DataFrameScan operations, but we don't leverage sampled statistics to choose between shuffling and tree reductions, or to repartition after Join or GroupBy operations.

Now that #19276 is in, we now have the necessary classes to store/track the statistics needed for these potential optimizations.

Next Steps:

  • [0] Make the number of sampled parquet files and parquet row-groups configurable ([FEA] Make max_file_samples and max_rg_samples configurable in cudf-polars #19389).
  • [1] Implement a post_traversal pass over the un-lowered IR graph to populate dict[IR, dict[str, ColumnStats]] and dict[IR, RowCount] data structure with base (i.e. source) statistics ([FEA] Use post_traversal to populate "base" column statistics #19390).
    • This traversal will not update the ColumnStats.unique_stats attribute for each column yet.
    • The goal of this traversal is to make sure DataSourceInfo and source-based row-count estimates are fully propagated.
    • We can also use this traversal to call add_unique_stats_column for known GroupBy and Distinct key columns. This way, the first call too DataSourceInfo.unique_stats(*) (expected during a later traversal) will collect row-group information for all known GroupBy/Distinct keys.
  • [2] We leverage DataSourceInfo.unique_stats(*) statistics during lowering to avoid the need for the unique_fraction user configuration ([FEA] Leverage column statistics to replace unique_fraction configuration #19391).
    • NOTE: This step can also be implemented after (4). Steps (3)-(5) do not depend on this feature.
    • This will require us to attach the dict[IR, dict[str, ColumnStats]] data structure to the caching-visitor state (so it is accessible to the lowering logic for each IR node).
  • [3] We update the traversal in (1) to also collect join and join-key information in other data structures ([FEA] Collect join-key information while gathering statistics in cudf-polars #19392).
  • [4] Implement a second post_traversal pass over the un-lowered IR graph to leverage the join heuristics in (3) to adjust the dict[IR, RowCount] values and populate the ColumnStats.unique_stats attributes ([FEA] Add a second IR-statistics traversal for cardinality estimates #19393).
  • [5] Use the features from (1)-(4) to inject repartitioning after operations that lead to a drop in the cardinality estimate. In order to do this well, we probably need to refine the changes made in (0)-(4). If row-count estimates are insufficient, this optimization may be ineffective and/or risky.

Metadata

Metadata

Assignees

Labels

PythonAffects Python cuDF API.cudf-polarsIssues specific to cudf-polarsfeature requestNew feature or request

Type

No type

Projects

Status

Todo

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions