Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
b9bb173
start with lawrences doodle
rjzamora Jun 5, 2025
8699a9e
save work
rjzamora Jun 6, 2025
2d590dc
revise basic class structure
rjzamora Jun 9, 2025
6620a6f
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 9, 2025
511d059
tests passing
rjzamora Jun 10, 2025
2cf53d2
change the config name
rjzamora Jun 10, 2025
4505d7d
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 10, 2025
49ce228
Merge branch 'branch-25.08' into column-stats
rjzamora Jun 10, 2025
4fee62c
remove TableSourceStats
rjzamora Jun 11, 2025
88247f4
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 11, 2025
2d0c43d
minor cleanup
rjzamora Jun 11, 2025
455ad2d
Merge branch 'column-stats' of github.com:rjzamora/cudf into column-s…
rjzamora Jun 11, 2025
e4f284c
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 12, 2025
63e650a
test coverage
rjzamora Jun 12, 2025
c076ec4
Update python/cudf_polars/cudf_polars/dsl/traversal.py
rjzamora Jun 12, 2025
6672aa3
use LRU instead of FIFO
rjzamora Jun 12, 2025
2def2df
Merge branch 'column-stats' of github.com:rjzamora/cudf into column-s…
rjzamora Jun 12, 2025
daedd2d
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 12, 2025
adce001
avoid key errors
rjzamora Jun 12, 2025
b7f52e4
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 12, 2025
49802a9
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 26, 2025
afd1a9b
rename column_statistics to column_stats
rjzamora Jun 26, 2025
c6af3b4
more renaming of statistics to stats
rjzamora Jun 26, 2025
6b162b3
pull config_options back out of StatsCollector
rjzamora Jun 26, 2025
6d168eb
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 26, 2025
2208fad
fix typo
rjzamora Jun 26, 2025
ab981c5
change _get_unique_fractions input types
rjzamora Jun 26, 2025
7cfd6bf
Merge branch 'branch-25.08' into column-stats
rjzamora Jun 26, 2025
46486a2
Merge branch 'branch-25.08' into column-stats
rjzamora Jun 27, 2025
ba7a0b2
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 27, 2025
fb71946
tweak coverage
rjzamora Jun 27, 2025
c79b16b
more coverage tweaks
rjzamora Jun 27, 2025
baf9581
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 30, 2025
a71dce0
add back post_traversal to fix coverage
rjzamora Jun 30, 2025
22ef2e2
add post_traversal test
rjzamora Jun 30, 2025
8b7340d
save experiment
rjzamora Jun 30, 2025
db8c2b2
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jul 1, 2025
f974f23
Merge remote-tracking branch 'upstream/branch-25.08' into column-stat…
rjzamora Jul 1, 2025
4d3b6e9
fix groupby
rjzamora Jul 1, 2025
7319ab1
remove custom caching
rjzamora Jul 1, 2025
2c59639
Merge branch 'column-stats-v2' into column-stats
rjzamora Jul 1, 2025
c5a5669
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jul 1, 2025
9e4a464
rename
rjzamora Jul 1, 2025
03e7c96
adjust coverage
rjzamora Jul 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion python/cudf_polars/cudf_polars/dsl/traversal.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""Traversal and visitor utilities for nodes."""
Expand Down Expand Up @@ -49,6 +49,40 @@ def traversal(nodes: Sequence[NodeT]) -> Generator[NodeT, None, None]:
lifo.append(child)


def post_traversal(nodes: Sequence[NodeT]) -> Generator[NodeT, None, None]:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: This was copied from wence-:more-tablestat-doodles.

"""
Post-order traversal of nodes in an expression.

Parameters
----------
nodes
Roots of expressions to traverse.

Yields
------
Unique nodes in the expressions, child before parent, children
in-order from left to right.
"""
seen = set()
lifo = []

for node in nodes:
if node not in seen:
lifo.append(node)
seen.add(node)

while lifo:
node = lifo[-1]
for child in node.children:
if child not in seen:
lifo.append(child)
seen.add(child)
break
else:
yield node
lifo.pop()


def reuse_if_unchanged(node: NodeT, fn: GenericTransformer[NodeT, NodeT]) -> NodeT:
"""
Recipe for transforming nodes that returns the old object if unchanged.
Expand Down
133 changes: 132 additions & 1 deletion python/cudf_polars/cudf_polars/experimental/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from collections.abc import Generator, Iterator
from collections.abc import Callable, Generator, Iterator

from cudf_polars.dsl.expr import NamedExpr
from cudf_polars.dsl.ir import IR
from cudf_polars.dsl.nodebase import Node


Expand Down Expand Up @@ -44,3 +45,133 @@ def __rich_repr__(self) -> Generator[Any, None, None]:
def get_key_name(node: Node) -> str:
"""Generate the key name for a Node."""
return f"{type(node).__name__.lower()}-{hash(node)}"


class UniqueSourceStats:
"""
Unique source statistics.

Parameters
----------
count
Unique-value count.
fraction
Unique-value fraction.
"""

__slots__ = ("count", "fraction")

def __init__(
self,
*,
count: int | None = None,
fraction: float | None = None,
):
self.count = count
self.fraction = fraction


class ColumnSourceStats:
"""
Column source statistics.

Parameters
----------
cardinality
Cardinality (row count).
unique_stats
Unique-value statistics.
storage_size_per_file
Average un-compressed storage size for this
column in a single file. This value is used to
calculate the partition count for an IR node.
exact
Tuple of attributes that have not been estimated
by partial sampling, and are known exactly,

Notes
-----
Source statistics are statistics coming from "source"
nodes like ``Scan` and ``DataFrameScan``.
"""

__slots__ = (
"_unique_stats",
"cardinality",
"exact",
"storage_size_per_file",
)

def __init__(
self,
*,
cardinality: int | None = None,
storage_size_per_file: int | None = None,
exact: tuple[str, ...] = (),
unique_stats: Any = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
unique_stats: Any = None,
unique_stats: Callable[[], UniqueSourceStats] | UniqueSourceStates | None = None,

):
self.cardinality = cardinality
self.storage_size_per_file = storage_size_per_file
self.exact = exact
self._unique_stats: Callable[..., UniqueSourceStats] | UniqueSourceStats
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation for making unique_stats a callable? Delayed evaluation of something?

If possible, I'd prefer the simplicity of just having a plain value here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this seems to be from functools.partial(UniquePqSampler.__call__, name). If I'm reading this correctly, we build one of these per column in the file. And perhaps we don't want to collect these types of statistics, which seems to involve a reading some row groups and doing a distinct_count, if we don't actually use it. Hence the delayed collection. Makes sense.

I wonder if we have enough information here to know what columns we actually want statistics for (based on which columns we do operations on that would make use of those statistics). I see _get_unique_fractions uses unique_fraction.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you're understanding is correct. To be completely honest, I've wasted quite a bit of time struggling to find an "elegant" way to do everything I want:

  • Whenever we need to estimate the number/fraction of unique values in a column, we should sample the metadata/data to make an estimate.
  • Right now we rely on the user-provided "cardinality_factor" config value (renamed to "unique_fraction" here). In my opinion, this config value is a "hack".
  • It is rare for unique-value statistics to be in the parquet metadata, so this PR focuses on reading a small number of real row-groups
  • This PR does centralize unique-value queries in one location: _get_unique_fractions
  • It is relatively common to query unique-value statistics for multiple columns in the same dataset. Therefore, we definitely want to collect these statistics lazily (especially important for remote data).
  • It would be nice to avoid traversing the IR graph multiple times to collect source data, because we will ultimately need to travers the graph again multiple times (to apply join heuristics and "lower" the graph).

The current implementation "works" and largely handles caching pretty well. However, I'm still pretty unhappy with the design.

I wonder if the "key" to cleaning this up is to roll back the decision to store the column-specific source statistics under ColumnStats.source_stats instead of storing a reference to the general table-source statistics. For example, we probably want something like:

class UniqueStats:
    # Simple class to hold materialized unique-value stats
    count: int | None
    fraction: float | None

class SourceSampler:
    # Lazy source sampler.
    #   - We would use a `PqSourceSampler` for parquet.
    #   - These samplers can be generated by a helper function with lru caching.
    def file_storage_size(self, column: str) -> int
    def cardinality(self) -> int
    def add_shuffle_keys(self, columns: Sequence[str]) -> None
    def unique_stats(self, column: str) -> UniqueStats

class ColumnStats:
    name: str
    unique_count: int | None
    source_name: str | None
    source_sampler: SourceSampler | None

With this (or something like it), we can completely avoid sampling "real" source data until we are actually "using". As the source-stats are propagated through the graph, we could use SourceSampler.add_shuffle_keys to populate the set of source statistics that may require unique-value sampling.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll just note that I too don't have an elegant design for this in mind :/ It's a big feature and the requirements we need out of it are still a bit fuzzy (to me).

I think I mostly agree with your list of desires, maybe other than the `"avoid traversing the IR graph multiple times" just since the information we gather will affect the IR graph we produce. Maybe that can all be done in one shot, since the source stats will be gathered early on.

I do think it's worth spending the time to get the design right (easy for me to say, I know). But I'm also fine with getting something in sooner rather than later and iterating on it, especially if the "right" design isn't clear.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a working design in another branch that is closer to making me happy. I'm going to start breaking off pieces and opening separate PRs now.

if unique_stats is None:
self._unique_stats = UniqueSourceStats()
elif isinstance(unique_stats, UniqueSourceStats) or callable(unique_stats):
self._unique_stats = unique_stats
else: # pragma: no cover
raise TypeError(f"Unexpected unique_stats argument, got {unique_stats}")

@property
def unique_stats(self) -> UniqueSourceStats:
"""Get unique-value statistics."""
if callable(self._unique_stats):
return self._unique_stats()
return self._unique_stats

@property
def unique_count(self) -> int | None:
"""Get unique count."""
return self.unique_stats.count

@property
def unique_fraction(self) -> float | None:
"""Get unique fraction."""
return self.unique_stats.fraction


class ColumnStats:
"""
Column statistics.

Parameters
----------
name
Column name.
unique_count
Unique-count estimate.
source_stats
Column-source statistics.
"""

__slots__ = ("name", "source_stats", "unique_count")

def __init__(
self,
*,
name: str | None = None,
unique_count: int | None = None,
source_stats: ColumnSourceStats | None = None,
) -> None:
self.name = name
self.unique_count = unique_count
self.source_stats = source_stats


class StatsCollector:
"""Column statistics collector."""

__slots__ = ("cardinality", "column_stats")

def __init__(self) -> None:
self.cardinality: dict[IR, int] = {}
self.column_stats: dict[IR, dict[str, ColumnStats]] = {}
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def get_executor_options(
and benchmark.__name__ == "PDSHQueries"
and run_config.executor == "streaming"
):
executor_options["cardinality_factor"] = {
executor_options["unique_fraction"] = {
"c_custkey": 0.05,
"l_orderkey": 1.0,
"l_partkey": 0.1,
Expand Down
25 changes: 23 additions & 2 deletions python/cudf_polars/cudf_polars/experimental/dispatch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Multi-partition dispatch functions."""

Expand All @@ -12,8 +12,9 @@
from typing import TypeAlias

from cudf_polars.dsl.ir import IR
from cudf_polars.experimental.base import PartitionInfo
from cudf_polars.experimental.base import PartitionInfo, StatsCollector
from cudf_polars.typing import GenericTransformer
from cudf_polars.utils.config import ConfigOptions


LowerIRTransformer: TypeAlias = (
Expand Down Expand Up @@ -82,3 +83,23 @@ def generate_ir_tasks(
task_graph
"""
raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover


@singledispatch
def add_source_stats(
ir: IR, stats: StatsCollector, config_options: ConfigOptions
) -> None:
"""
Add basic source statistics for an IR node.

Parameters
----------
ir
The IR node to collect source statistics for.
stats
The `StatsCollector` object to update with new
source statistics.
config_options
GPUEngine configuration options.
"""
raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover
44 changes: 27 additions & 17 deletions python/cudf_polars/cudf_polars/experimental/distinct.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
from cudf_polars.dsl.ir import Distinct
from cudf_polars.experimental.base import PartitionInfo
from cudf_polars.experimental.dispatch import lower_ir_node
from cudf_polars.experimental.utils import _fallback_inform, _lower_ir_fallback
from cudf_polars.experimental.utils import (
_fallback_inform,
_get_unique_fractions,
_lower_ir_fallback,
)

if TYPE_CHECKING:
from collections.abc import MutableMapping
Expand All @@ -29,7 +33,7 @@ def lower_distinct(
partition_info: MutableMapping[IR, PartitionInfo],
config_options: ConfigOptions,
*,
cardinality: float | None = None,
unique_fraction: float | None = None,
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
"""
Lower a Distinct IR into partition-wise stages.
Expand All @@ -46,8 +50,8 @@ def lower_distinct(
associated partitioning information.
config_options
GPUEngine configuration options.
cardinality
Cardinality factor to use for algorithm selection.
unique_fraction
Fractional unique count to use for algorithm selection.

Returns
-------
Expand Down Expand Up @@ -112,14 +116,14 @@ def lower_distinct(
# partitions. For now, we raise an error to fall back
# to one partition.
raise NotImplementedError("Unsupported slice for multiple partitions.")
elif cardinality is not None:
# Use cardinality to determine partitioningcardinality
n_ary = min(max(int(1.0 / cardinality), 2), child_count)
output_count = max(int(cardinality * child_count), 1)
elif unique_fraction is not None:
# Use unique_fraction to determine partitioning
n_ary = min(max(int(1.0 / unique_fraction), 2), child_count)
output_count = max(int(unique_fraction * child_count), 1)

if output_count > 1 and require_tree_reduction:
# Need to reduce down to a single partition even
# if the cardinality is large.
# if the unique_fraction is large.
output_count = 1
_fallback_inform(
"Unsupported unique options for multiple partitions.",
Expand Down Expand Up @@ -164,24 +168,30 @@ def _(
# Extract child partitioning
child, partition_info = rec(ir.children[0])
config_options = rec.state["config_options"]
column_stats = rec.state["stats"].column_stats.get(ir.children[0], {})

assert config_options.executor.name == "streaming", (
"'in-memory' executor not supported in 'lower_ir_node'"
)

subset: frozenset = ir.subset or frozenset(ir.schema)
cardinality_factor = {
c: max(min(f, 1.0), 0.00001)
for c, f in config_options.executor.cardinality_factor.items()
if c in subset
}
cardinality = max(cardinality_factor.values()) if cardinality_factor else None
subset: frozenset[str] = ir.subset or frozenset(ir.schema)
unique_fraction_dict = _get_unique_fractions(
tuple(subset),
config_options.executor.unique_fraction,
column_stats,
)

unique_fraction = (
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a difference between {} and None here? I think that unique_fraction_dict could be {} here if both column_statistics and config_options.executor.unique_fraction are empty. Then we'll have bool(unique_fraction_dict) is False and get the None.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question at https://github.com/rapidsai/cudf/pull/19130/files#diff-c93704a29e1c3263ea7267bf2028f187f43d40727bda1b8ffbcdd87076bac77bR187 in expressions.py.

The output here is float | None, which does have a very different meaning in lower_distinct. But the input max_fraction_dict seems to always be a dict (possibly empty).

So how about unique_fraction = max(unique_fraction_dict.values(), default=None)? Not a big deal either way.

max(unique_fraction_dict.values()) if unique_fraction_dict else None
)

try:
return lower_distinct(
ir,
child,
partition_info,
config_options,
cardinality=cardinality,
unique_fraction=unique_fraction,
)
except NotImplementedError as err:
return _lower_ir_fallback(ir, rec, msg=str(err))
Loading