Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c7feddf
expand test coverage
rjzamora Jul 21, 2025
477d57f
Merge branch 'branch-25.08' into base-stats-traversal
rjzamora Jul 21, 2025
1d404c6
Merge branch 'branch-25.10' into base-stats-traversal
rjzamora Jul 21, 2025
9f51936
consolidate stats tests
rjzamora Jul 22, 2025
cb3ff97
add distinct coverage
rjzamora Jul 22, 2025
0e80add
don't need coverage for multi-child fall-back for now
rjzamora Jul 22, 2025
1cd6bc7
Merge remote-tracking branch 'upstream/branch-25.10' into base-stats-…
rjzamora Jul 22, 2025
7d6ea69
drop unnecessary _extract_base_stats_preserve
rjzamora Jul 22, 2025
8bea521
Fix Join behavior
rjzamora Jul 22, 2025
74ad4f1
Merge remote-tracking branch 'upstream/branch-25.10' into base-stats-…
rjzamora Jul 22, 2025
4627854
drop unnecessary dispatch functions
rjzamora Jul 22, 2025
19eff91
update docs and change 'rename' to 'copy'
rjzamora Jul 23, 2025
7ee10ef
update names
rjzamora Jul 23, 2025
03bfdb2
update test coverage
rjzamora Jul 23, 2025
41e0ec4
Merge remote-tracking branch 'upstream/branch-25.10' into base-stats-…
rjzamora Jul 23, 2025
f02fbaf
add back union test needed for coverage
rjzamora Jul 24, 2025
ff39a1c
Merge remote-tracking branch 'upstream/branch-25.10' into base-stats-…
rjzamora Jul 24, 2025
e8fa8d6
track child-ColumnStats in ColumnStats
rjzamora Jul 24, 2025
08402fb
Merge remote-tracking branch 'upstream/branch-25.10' into base-stats-…
rjzamora Jul 24, 2025
b17b1c3
Merge branch 'branch-25.10' into base-stats-traversal
rjzamora Jul 28, 2025
fc959ec
Merge branch 'branch-25.10' into base-stats-traversal
rjzamora Jul 28, 2025
1c83d45
Merge branch 'branch-25.10' into base-stats-traversal
rjzamora Jul 28, 2025
7b0befa
Merge branch 'branch-25.10' into base-stats-traversal
rjzamora Aug 14, 2025
9a3b8a9
Merge remote-tracking branch 'upstream/branch-25.10' into base-stats-…
rjzamora Aug 15, 2025
b35cbaa
update overview.md
rjzamora Aug 15, 2025
6cce566
Merge branch 'branch-25.10' into base-stats-traversal
rjzamora Aug 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions python/cudf_polars/cudf_polars/experimental/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from collections.abc import Generator, Iterator

from cudf_polars.dsl.expr import NamedExpr
from cudf_polars.dsl.ir import IR
from cudf_polars.dsl.nodebase import Node


Expand Down Expand Up @@ -124,17 +125,20 @@ class ColumnStats:
----------
name
Column name.
source
children
Child ColumnStats objects.
source_info
Datasource information.
source_name
Source-column name.
unique_stats
Unique-value statistics.
"""

__slots__ = ("name", "source_info", "source_name", "unique_stats")
__slots__ = ("children", "name", "source_info", "source_name", "unique_stats")

name: str
children: tuple[ColumnStats, ...]
source_info: DataSourceInfo
source_name: str
unique_stats: UniqueStats
Expand All @@ -143,11 +147,54 @@ def __init__(
self,
name: str,
*,
children: tuple[ColumnStats, ...] = (),
source_info: DataSourceInfo | None = None,
source_name: str | None = None,
unique_stats: UniqueStats | None = None,
) -> None:
self.name = name
self.children = children
self.source_info = source_info or DataSourceInfo()
self.source_name = source_name or name
self.unique_stats = unique_stats or UniqueStats()

def new_parent(
self,
*,
name: str | None = None,
) -> ColumnStats:
"""
Initialize a new parent ColumnStats object.

Parameters
----------
name
The new column name.

Returns
-------
A new ColumnStats object.

Notes
-----
This API preserves the original DataSourceInfo reference.
"""
return ColumnStats(
name=name or self.name,
children=(self,),
# Want to reference the same DataSourceInfo
source_info=self.source_info,
source_name=self.source_name,
# Want fresh UniqueStats so we can mutate in place
unique_stats=UniqueStats(),
)


class StatsCollector:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we discussed tracking statistics for expression nodes? Curious if you see any value there

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we only care about tracking statistics for pre-lowered IR nodes. In the near term, we are mostly interested in accounting for Join and GroupBy. However, it is true that we may need to traverse an Expr graph within a Select node to estimate changes in cardinality and unique count. We shouldn't need to keep track of anything in StatsCollector for this, but the exact design is TBD.

"""Column statistics collector."""

__slots__ = ("column_stats", "row_count")

def __init__(self) -> None:
self.row_count: dict[IR, ColumnStat[int]] = {}
self.column_stats: dict[IR, dict[str, ColumnStats]] = {}
37 changes: 36 additions & 1 deletion python/cudf_polars/cudf_polars/experimental/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

from cudf_polars.dsl import ir
from cudf_polars.dsl.ir import IR
from cudf_polars.experimental.base import PartitionInfo
from cudf_polars.experimental.base import (
ColumnStats,
PartitionInfo,
StatsCollector,
)
from cudf_polars.utils.config import ConfigOptions


Expand Down Expand Up @@ -97,3 +101,34 @@ def generate_ir_tasks(
task_graph
"""
raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover


@singledispatch
def initialize_column_stats(
ir: IR, stats: StatsCollector, config_options: ConfigOptions
) -> dict[str, ColumnStats]:
"""
Initialize column statistics for an IR node.

Parameters
----------
ir
The IR node to collect source statistics for.
stats
The `StatsCollector` object containing known source statistics.
config_options
GPUEngine configuration options.

Returns
-------
base_stats_mapping
Mapping between column names and base ``ColumnStats`` objects.

Notes
-----
Base column stats correspond to ``ColumnStats`` objects **without**
populated ``unique_stats`` information. The purpose of this function
is to propagate ``DataSourceInfo`` references and set ``children``
attributes for each column of each IR node.
"""
raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover
208 changes: 208 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""Utilities for tracking column statistics."""

from __future__ import annotations

import itertools
from typing import TYPE_CHECKING

from cudf_polars.dsl.ir import (
IR,
DataFrameScan,
Distinct,
GroupBy,
HConcat,
Join,
Scan,
Union,
)
from cudf_polars.dsl.traversal import post_traversal
from cudf_polars.experimental.base import (
ColumnStats,
StatsCollector,
)
from cudf_polars.experimental.dispatch import initialize_column_stats

if TYPE_CHECKING:
from collections.abc import Sequence

from cudf_polars.utils.config import ConfigOptions


def collect_base_stats(root: IR, config_options: ConfigOptions) -> StatsCollector:
"""
Collect base datasource statistics.

Parameters
----------
root
Root IR node for collecting base datasource statistics.
config_options
GPUEngine configuration options.

Returns
-------
A new StatsCollector object with populated datasource statistics.
"""
stats: StatsCollector = StatsCollector()
for node in post_traversal([root]):
stats.column_stats[node] = initialize_column_stats(node, stats, config_options)
return stats


def _update_unique_stats_columns(
child_column_stats: dict[str, ColumnStats],
key_names: Sequence[str],
config_options: ConfigOptions,
) -> None:
"""Update set of unique-stats columns in datasource."""
assert config_options.executor.name == "streaming", (
"'in-memory' executor not supported in 'add_source_stats'"
)
unique_fraction = config_options.executor.unique_fraction
for name in key_names:
if (
name not in unique_fraction
and (column_stats := child_column_stats.get(name)) is not None
and (source_stats := column_stats.source_info) is not None
):
source_stats.add_unique_stats_column(column_stats.source_name or name)


@initialize_column_stats.register(IR)
def _default_initialize_column_stats(
ir: IR, stats: StatsCollector, config_options: ConfigOptions
) -> dict[str, ColumnStats]:
# Default `initialize_column_stats` implementation.
if len(ir.children) == 1:
(child,) = ir.children
child_column_stats = stats.column_stats.get(child, {})
return {
name: child_column_stats.get(name, ColumnStats(name=name)).new_parent()
for name in ir.schema
}
else: # pragma: no cover
# Multi-child nodes loose all information by default.
return {name: ColumnStats(name=name) for name in ir.schema}


@initialize_column_stats.register(Distinct)
def _(
ir: Distinct, stats: StatsCollector, config_options: ConfigOptions
) -> dict[str, ColumnStats]:
# Use default initialize_column_stats after updating
# the known unique-stats columns.
(child,) = ir.children
child_column_stats = stats.column_stats.get(child, {})
key_names = ir.subset or ir.schema
_update_unique_stats_columns(child_column_stats, list(key_names), config_options)
return _default_initialize_column_stats(ir, stats, config_options)


@initialize_column_stats.register(Join)
def _(
ir: Join, stats: StatsCollector, config_options: ConfigOptions
) -> dict[str, ColumnStats]:
# Copy column statistics from both the left and right children.
# Special cases to consider:
# - If a column name appears in both sides of the join,
# we take it from the "primary" column (right for "Right"
# joins, left for all other joins).
# - If a column name doesn't appear in either child, it
# corresponds to a non-"primary" column with a suffix.

children, on = ir.children, (ir.left_on, ir.right_on)
how = ir.options[0]
suffix = ir.options[3]
if how == "Right":
children, on = children[::-1], on[::-1]
primary, other = children
primary_child_stats = stats.column_stats.get(primary, {})
other_child_stats = stats.column_stats.get(other, {})

# Build output column statistics
column_stats: dict[str, ColumnStats] = {}
for name in ir.schema:
if name in primary.schema:
# "Primary" child stats take preference.
column_stats[name] = primary_child_stats[name].new_parent()
elif name in other.schema:
# "Other" column stats apply to everything else.
column_stats[name] = other_child_stats[name].new_parent()
else:
# If the column name was not in either child table,
# a suffix was added to a column in "other".
_name = name.removesuffix(suffix)
column_stats[name] = other_child_stats[_name].new_parent(name=name)

# Update children
for p_key, o_key in zip(*on, strict=True):
column_stats[p_key.name].children = (
primary_child_stats[p_key.name],
other_child_stats[o_key.name],
)

return column_stats


@initialize_column_stats.register(GroupBy)
def _(
ir: GroupBy, stats: StatsCollector, config_options: ConfigOptions
) -> dict[str, ColumnStats]:
(child,) = ir.children
child_column_stats = stats.column_stats.get(child, {})

# Update set of source columns we may lazily sample
_update_unique_stats_columns(
child_column_stats, [n.name for n in ir.keys], config_options
)
return _default_initialize_column_stats(ir, stats, config_options)


@initialize_column_stats.register(HConcat)
def _(
ir: HConcat, stats: StatsCollector, config_options: ConfigOptions
) -> dict[str, ColumnStats]:
child_column_stats = dict(
itertools.chain.from_iterable(
stats.column_stats.get(c, {}).items() for c in ir.children
)
)
return {
name: child_column_stats.get(name, ColumnStats(name=name)).new_parent()
for name in ir.schema
}


@initialize_column_stats.register(Union)
def _(
ir: IR, stats: StatsCollector, config_options: ConfigOptions
) -> dict[str, ColumnStats]:
# Union looses source information for now.
return {
name: ColumnStats(
name=name,
children=tuple(stats.column_stats[child][name] for child in ir.children),
)
for name in ir.schema
}


@initialize_column_stats.register(Scan)
def _(
ir: Scan, stats: StatsCollector, config_options: ConfigOptions
) -> dict[str, ColumnStats]:
from cudf_polars.experimental.io import _extract_scan_stats

return _extract_scan_stats(ir, config_options)


@initialize_column_stats.register(DataFrameScan)
def _(
ir: DataFrameScan, stats: StatsCollector, config_options: ConfigOptions
) -> dict[str, ColumnStats]:
from cudf_polars.experimental.io import _extract_dataframescan_stats

return _extract_dataframescan_stats(ir)
Loading