-
Notifications
You must be signed in to change notification settings - Fork 1k
Add API to "initialize" column statistics #19447
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c7feddf
477d57f
1d404c6
9f51936
cb3ff97
0e80add
1cd6bc7
7d6ea69
8bea521
74ad4f1
4627854
19eff91
7ee10ef
03bfdb2
41e0ec4
f02fbaf
ff39a1c
e8fa8d6
08402fb
b17b1c3
fc959ec
1c83d45
7b0befa
9a3b8a9
b35cbaa
6cce566
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ | |
| from collections.abc import Generator, Iterator | ||
|
|
||
| from cudf_polars.dsl.expr import NamedExpr | ||
| from cudf_polars.dsl.ir import IR | ||
| from cudf_polars.dsl.nodebase import Node | ||
|
|
||
|
|
||
|
|
@@ -124,17 +125,20 @@ class ColumnStats: | |
| ---------- | ||
| name | ||
| Column name. | ||
| source | ||
| children | ||
| Child ColumnStats objects. | ||
| source_info | ||
| Datasource information. | ||
| source_name | ||
| Source-column name. | ||
| unique_stats | ||
| Unique-value statistics. | ||
| """ | ||
|
|
||
| __slots__ = ("name", "source_info", "source_name", "unique_stats") | ||
| __slots__ = ("children", "name", "source_info", "source_name", "unique_stats") | ||
|
|
||
| name: str | ||
| children: tuple[ColumnStats, ...] | ||
| source_info: DataSourceInfo | ||
| source_name: str | ||
| unique_stats: UniqueStats | ||
|
|
@@ -143,11 +147,54 @@ def __init__( | |
| self, | ||
| name: str, | ||
| *, | ||
| children: tuple[ColumnStats, ...] = (), | ||
| source_info: DataSourceInfo | None = None, | ||
| source_name: str | None = None, | ||
| unique_stats: UniqueStats | None = None, | ||
| ) -> None: | ||
| self.name = name | ||
| self.children = children | ||
| self.source_info = source_info or DataSourceInfo() | ||
| self.source_name = source_name or name | ||
| self.unique_stats = unique_stats or UniqueStats() | ||
|
|
||
| def new_parent( | ||
| self, | ||
| *, | ||
| name: str | None = None, | ||
| ) -> ColumnStats: | ||
| """ | ||
| Initialize a new parent ColumnStats object. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| name | ||
| The new column name. | ||
|
|
||
| Returns | ||
| ------- | ||
| A new ColumnStats object. | ||
|
|
||
| Notes | ||
| ----- | ||
| This API preserves the original DataSourceInfo reference. | ||
| """ | ||
| return ColumnStats( | ||
| name=name or self.name, | ||
| children=(self,), | ||
| # Want to reference the same DataSourceInfo | ||
| source_info=self.source_info, | ||
TomAugspurger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| source_name=self.source_name, | ||
| # Want fresh UniqueStats so we can mutate in place | ||
| unique_stats=UniqueStats(), | ||
| ) | ||
|
|
||
|
|
||
| class StatsCollector: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have we discussed tracking statistics for expression nodes? Curious if you see any value there
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now, we only care about tracking statistics for pre-lowered IR nodes. In the near term, we are mostly interested in accounting for Join and GroupBy. However, it is true that we may need to traverse an |
||
| """Column statistics collector.""" | ||
|
|
||
| __slots__ = ("column_stats", "row_count") | ||
|
|
||
| def __init__(self) -> None: | ||
| self.row_count: dict[IR, ColumnStat[int]] = {} | ||
| self.column_stats: dict[IR, dict[str, ColumnStats]] = {} | ||
rjzamora marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,208 @@ | ||
| # SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| """Utilities for tracking column statistics.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import itertools | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| from cudf_polars.dsl.ir import ( | ||
| IR, | ||
| DataFrameScan, | ||
| Distinct, | ||
| GroupBy, | ||
| HConcat, | ||
| Join, | ||
| Scan, | ||
| Union, | ||
| ) | ||
| from cudf_polars.dsl.traversal import post_traversal | ||
| from cudf_polars.experimental.base import ( | ||
| ColumnStats, | ||
| StatsCollector, | ||
| ) | ||
| from cudf_polars.experimental.dispatch import initialize_column_stats | ||
|
|
||
| if TYPE_CHECKING: | ||
| from collections.abc import Sequence | ||
|
|
||
| from cudf_polars.utils.config import ConfigOptions | ||
|
|
||
|
|
||
| def collect_base_stats(root: IR, config_options: ConfigOptions) -> StatsCollector: | ||
| """ | ||
| Collect base datasource statistics. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| root | ||
| Root IR node for collecting base datasource statistics. | ||
| config_options | ||
| GPUEngine configuration options. | ||
|
|
||
| Returns | ||
| ------- | ||
| A new StatsCollector object with populated datasource statistics. | ||
| """ | ||
| stats: StatsCollector = StatsCollector() | ||
| for node in post_traversal([root]): | ||
| stats.column_stats[node] = initialize_column_stats(node, stats, config_options) | ||
| return stats | ||
|
|
||
|
|
||
| def _update_unique_stats_columns( | ||
| child_column_stats: dict[str, ColumnStats], | ||
| key_names: Sequence[str], | ||
| config_options: ConfigOptions, | ||
| ) -> None: | ||
| """Update set of unique-stats columns in datasource.""" | ||
| assert config_options.executor.name == "streaming", ( | ||
| "'in-memory' executor not supported in 'add_source_stats'" | ||
| ) | ||
| unique_fraction = config_options.executor.unique_fraction | ||
| for name in key_names: | ||
| if ( | ||
| name not in unique_fraction | ||
| and (column_stats := child_column_stats.get(name)) is not None | ||
| and (source_stats := column_stats.source_info) is not None | ||
| ): | ||
| source_stats.add_unique_stats_column(column_stats.source_name or name) | ||
|
|
||
|
|
||
| @initialize_column_stats.register(IR) | ||
| def _default_initialize_column_stats( | ||
| ir: IR, stats: StatsCollector, config_options: ConfigOptions | ||
| ) -> dict[str, ColumnStats]: | ||
| # Default `initialize_column_stats` implementation. | ||
| if len(ir.children) == 1: | ||
| (child,) = ir.children | ||
| child_column_stats = stats.column_stats.get(child, {}) | ||
| return { | ||
| name: child_column_stats.get(name, ColumnStats(name=name)).new_parent() | ||
| for name in ir.schema | ||
| } | ||
| else: # pragma: no cover | ||
| # Multi-child nodes loose all information by default. | ||
| return {name: ColumnStats(name=name) for name in ir.schema} | ||
|
|
||
|
|
||
| @initialize_column_stats.register(Distinct) | ||
| def _( | ||
| ir: Distinct, stats: StatsCollector, config_options: ConfigOptions | ||
| ) -> dict[str, ColumnStats]: | ||
| # Use default initialize_column_stats after updating | ||
| # the known unique-stats columns. | ||
| (child,) = ir.children | ||
| child_column_stats = stats.column_stats.get(child, {}) | ||
| key_names = ir.subset or ir.schema | ||
| _update_unique_stats_columns(child_column_stats, list(key_names), config_options) | ||
| return _default_initialize_column_stats(ir, stats, config_options) | ||
|
|
||
|
|
||
| @initialize_column_stats.register(Join) | ||
| def _( | ||
| ir: Join, stats: StatsCollector, config_options: ConfigOptions | ||
| ) -> dict[str, ColumnStats]: | ||
| # Copy column statistics from both the left and right children. | ||
| # Special cases to consider: | ||
| # - If a column name appears in both sides of the join, | ||
| # we take it from the "primary" column (right for "Right" | ||
| # joins, left for all other joins). | ||
| # - If a column name doesn't appear in either child, it | ||
| # corresponds to a non-"primary" column with a suffix. | ||
|
|
||
| children, on = ir.children, (ir.left_on, ir.right_on) | ||
| how = ir.options[0] | ||
| suffix = ir.options[3] | ||
| if how == "Right": | ||
| children, on = children[::-1], on[::-1] | ||
| primary, other = children | ||
| primary_child_stats = stats.column_stats.get(primary, {}) | ||
| other_child_stats = stats.column_stats.get(other, {}) | ||
|
|
||
| # Build output column statistics | ||
| column_stats: dict[str, ColumnStats] = {} | ||
| for name in ir.schema: | ||
| if name in primary.schema: | ||
| # "Primary" child stats take preference. | ||
| column_stats[name] = primary_child_stats[name].new_parent() | ||
| elif name in other.schema: | ||
| # "Other" column stats apply to everything else. | ||
| column_stats[name] = other_child_stats[name].new_parent() | ||
| else: | ||
| # If the column name was not in either child table, | ||
| # a suffix was added to a column in "other". | ||
| _name = name.removesuffix(suffix) | ||
| column_stats[name] = other_child_stats[_name].new_parent(name=name) | ||
|
|
||
| # Update children | ||
| for p_key, o_key in zip(*on, strict=True): | ||
| column_stats[p_key.name].children = ( | ||
| primary_child_stats[p_key.name], | ||
| other_child_stats[o_key.name], | ||
| ) | ||
|
|
||
| return column_stats | ||
|
|
||
|
|
||
| @initialize_column_stats.register(GroupBy) | ||
| def _( | ||
| ir: GroupBy, stats: StatsCollector, config_options: ConfigOptions | ||
| ) -> dict[str, ColumnStats]: | ||
| (child,) = ir.children | ||
| child_column_stats = stats.column_stats.get(child, {}) | ||
|
|
||
| # Update set of source columns we may lazily sample | ||
| _update_unique_stats_columns( | ||
| child_column_stats, [n.name for n in ir.keys], config_options | ||
| ) | ||
| return _default_initialize_column_stats(ir, stats, config_options) | ||
|
|
||
|
|
||
| @initialize_column_stats.register(HConcat) | ||
| def _( | ||
| ir: HConcat, stats: StatsCollector, config_options: ConfigOptions | ||
| ) -> dict[str, ColumnStats]: | ||
| child_column_stats = dict( | ||
| itertools.chain.from_iterable( | ||
| stats.column_stats.get(c, {}).items() for c in ir.children | ||
| ) | ||
| ) | ||
| return { | ||
| name: child_column_stats.get(name, ColumnStats(name=name)).new_parent() | ||
| for name in ir.schema | ||
| } | ||
|
|
||
|
|
||
| @initialize_column_stats.register(Union) | ||
| def _( | ||
| ir: IR, stats: StatsCollector, config_options: ConfigOptions | ||
| ) -> dict[str, ColumnStats]: | ||
| # Union looses source information for now. | ||
| return { | ||
| name: ColumnStats( | ||
| name=name, | ||
| children=tuple(stats.column_stats[child][name] for child in ir.children), | ||
| ) | ||
| for name in ir.schema | ||
| } | ||
|
|
||
|
|
||
| @initialize_column_stats.register(Scan) | ||
| def _( | ||
| ir: Scan, stats: StatsCollector, config_options: ConfigOptions | ||
| ) -> dict[str, ColumnStats]: | ||
| from cudf_polars.experimental.io import _extract_scan_stats | ||
|
|
||
| return _extract_scan_stats(ir, config_options) | ||
|
|
||
|
|
||
| @initialize_column_stats.register(DataFrameScan) | ||
| def _( | ||
| ir: DataFrameScan, stats: StatsCollector, config_options: ConfigOptions | ||
| ) -> dict[str, ColumnStats]: | ||
| from cudf_polars.experimental.io import _extract_dataframescan_stats | ||
|
|
||
| return _extract_dataframescan_stats(ir) |
Uh oh!
There was an error while loading. Please reload this page.