-
Notifications
You must be signed in to change notification settings - Fork 1k
[WIP] Track column statistics in cuDF-Polars #19130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b9bb173
8699a9e
2d590dc
6620a6f
511d059
2cf53d2
4505d7d
49ce228
4fee62c
88247f4
2d0c43d
455ad2d
e4f284c
63e650a
c076ec4
6672aa3
2def2df
daedd2d
adce001
b7f52e4
49802a9
afd1a9b
c6af3b4
6b162b3
6d168eb
2208fad
ab981c5
7cfd6bf
46486a2
ba7a0b2
fb71946
c79b16b
baf9581
a71dce0
22ef2e2
8b7340d
db8c2b2
f974f23
4d3b6e9
7319ab1
2c59639
c5a5669
9e4a464
03e7c96
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| # SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. | ||
| # SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| """Traversal and visitor utilities for nodes.""" | ||
|
|
@@ -49,6 +49,40 @@ def traversal(nodes: Sequence[NodeT]) -> Generator[NodeT, None, None]: | |
| lifo.append(child) | ||
|
|
||
|
|
||
| def post_traversal(nodes: Sequence[NodeT]) -> Generator[NodeT, None, None]: | ||
rjzamora marked this conversation as resolved.
Show resolved
Hide resolved
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NOTE: This was copied from wence-:more-tablestat-doodles. |
||
| """ | ||
| Post-order traversal of nodes in an expression. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| nodes | ||
| Roots of expressions to traverse. | ||
|
|
||
| Yields | ||
| ------ | ||
| Unique nodes in the expressions, child before parent, children | ||
| in-order from left to right. | ||
| """ | ||
| seen = set() | ||
| lifo = [] | ||
rjzamora marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| for node in nodes: | ||
| if node not in seen: | ||
| lifo.append(node) | ||
| seen.add(node) | ||
|
|
||
| while lifo: | ||
| node = lifo[-1] | ||
| for child in node.children: | ||
| if child not in seen: | ||
rjzamora marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| lifo.append(child) | ||
| seen.add(child) | ||
| break | ||
| else: | ||
| yield node | ||
| lifo.pop() | ||
|
|
||
|
|
||
| def reuse_if_unchanged(node: NodeT, fn: GenericTransformer[NodeT, NodeT]) -> NodeT: | ||
| """ | ||
| Recipe for transforming nodes that returns the old object if unchanged. | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -7,9 +7,10 @@ | |||||
| from typing import TYPE_CHECKING, Any | ||||||
|
|
||||||
| if TYPE_CHECKING: | ||||||
| from collections.abc import Generator, Iterator | ||||||
| from collections.abc import Callable, Generator, Iterator | ||||||
|
|
||||||
| from cudf_polars.dsl.expr import NamedExpr | ||||||
| from cudf_polars.dsl.ir import IR | ||||||
| from cudf_polars.dsl.nodebase import Node | ||||||
|
|
||||||
|
|
||||||
|
|
@@ -44,3 +45,133 @@ def __rich_repr__(self) -> Generator[Any, None, None]: | |||||
| def get_key_name(node: Node) -> str: | ||||||
| """Generate the key name for a Node.""" | ||||||
| return f"{type(node).__name__.lower()}-{hash(node)}" | ||||||
|
|
||||||
|
|
||||||
| class UniqueSourceStats: | ||||||
| """ | ||||||
| Unique source statistics. | ||||||
|
|
||||||
| Parameters | ||||||
| ---------- | ||||||
| count | ||||||
| Unique-value count. | ||||||
| fraction | ||||||
| Unique-value fraction. | ||||||
| """ | ||||||
|
|
||||||
| __slots__ = ("count", "fraction") | ||||||
|
|
||||||
| def __init__( | ||||||
| self, | ||||||
| *, | ||||||
| count: int | None = None, | ||||||
| fraction: float | None = None, | ||||||
| ): | ||||||
| self.count = count | ||||||
| self.fraction = fraction | ||||||
|
|
||||||
|
|
||||||
| class ColumnSourceStats: | ||||||
| """ | ||||||
| Column source statistics. | ||||||
|
|
||||||
| Parameters | ||||||
| ---------- | ||||||
| cardinality | ||||||
| Cardinality (row count). | ||||||
| unique_stats | ||||||
| Unique-value statistics. | ||||||
| storage_size_per_file | ||||||
| Average un-compressed storage size for this | ||||||
| column in a single file. This value is used to | ||||||
| calculate the partition count for an IR node. | ||||||
| exact | ||||||
| Tuple of attributes that have not been estimated | ||||||
| by partial sampling, and are known exactly, | ||||||
|
|
||||||
| Notes | ||||||
| ----- | ||||||
| Source statistics are statistics coming from "source" | ||||||
| nodes like ``Scan` and ``DataFrameScan``. | ||||||
| """ | ||||||
|
|
||||||
| __slots__ = ( | ||||||
| "_unique_stats", | ||||||
| "cardinality", | ||||||
| "exact", | ||||||
| "storage_size_per_file", | ||||||
| ) | ||||||
|
|
||||||
| def __init__( | ||||||
| self, | ||||||
| *, | ||||||
| cardinality: int | None = None, | ||||||
| storage_size_per_file: int | None = None, | ||||||
| exact: tuple[str, ...] = (), | ||||||
| unique_stats: Any = None, | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| ): | ||||||
| self.cardinality = cardinality | ||||||
| self.storage_size_per_file = storage_size_per_file | ||||||
| self.exact = exact | ||||||
| self._unique_stats: Callable[..., UniqueSourceStats] | UniqueSourceStats | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the motivation for making If possible, I'd prefer the simplicity of just having a plain value here.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this seems to be from I wonder if we have enough information here to know what columns we actually want statistics for (based on which columns we do operations on that would make use of those statistics). I see
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I think you're understanding is correct. To be completely honest, I've wasted quite a bit of time struggling to find an "elegant" way to do everything I want:
The current implementation "works" and largely handles caching pretty well. However, I'm still pretty unhappy with the design. I wonder if the "key" to cleaning this up is to roll back the decision to store the column-specific source statistics under class UniqueStats:
# Simple class to hold materialized unique-value stats
count: int | None
fraction: float | None
class SourceSampler:
# Lazy source sampler.
# - We would use a `PqSourceSampler` for parquet.
# - These samplers can be generated by a helper function with lru caching.
def file_storage_size(self, column: str) -> int
def cardinality(self) -> int
def add_shuffle_keys(self, columns: Sequence[str]) -> None
def unique_stats(self, column: str) -> UniqueStats
class ColumnStats:
name: str
unique_count: int | None
source_name: str | None
source_sampler: SourceSampler | NoneWith this (or something like it), we can completely avoid sampling "real" source data until we are actually "using". As the source-stats are propagated through the graph, we could use
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll just note that I too don't have an elegant design for this in mind :/ It's a big feature and the requirements we need out of it are still a bit fuzzy (to me). I think I mostly agree with your list of desires, maybe other than the `"avoid traversing the IR graph multiple times" just since the information we gather will affect the IR graph we produce. Maybe that can all be done in one shot, since the source stats will be gathered early on. I do think it's worth spending the time to get the design right (easy for me to say, I know). But I'm also fine with getting something in sooner rather than later and iterating on it, especially if the "right" design isn't clear.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a working design in another branch that is closer to making me happy. I'm going to start breaking off pieces and opening separate PRs now. |
||||||
| if unique_stats is None: | ||||||
| self._unique_stats = UniqueSourceStats() | ||||||
| elif isinstance(unique_stats, UniqueSourceStats) or callable(unique_stats): | ||||||
| self._unique_stats = unique_stats | ||||||
| else: # pragma: no cover | ||||||
| raise TypeError(f"Unexpected unique_stats argument, got {unique_stats}") | ||||||
|
|
||||||
| @property | ||||||
| def unique_stats(self) -> UniqueSourceStats: | ||||||
| """Get unique-value statistics.""" | ||||||
| if callable(self._unique_stats): | ||||||
| return self._unique_stats() | ||||||
| return self._unique_stats | ||||||
|
|
||||||
| @property | ||||||
| def unique_count(self) -> int | None: | ||||||
| """Get unique count.""" | ||||||
| return self.unique_stats.count | ||||||
|
|
||||||
| @property | ||||||
| def unique_fraction(self) -> float | None: | ||||||
| """Get unique fraction.""" | ||||||
| return self.unique_stats.fraction | ||||||
|
|
||||||
|
|
||||||
| class ColumnStats: | ||||||
| """ | ||||||
| Column statistics. | ||||||
|
|
||||||
| Parameters | ||||||
| ---------- | ||||||
| name | ||||||
| Column name. | ||||||
| unique_count | ||||||
| Unique-count estimate. | ||||||
| source_stats | ||||||
| Column-source statistics. | ||||||
| """ | ||||||
|
|
||||||
| __slots__ = ("name", "source_stats", "unique_count") | ||||||
|
|
||||||
| def __init__( | ||||||
| self, | ||||||
| *, | ||||||
| name: str | None = None, | ||||||
| unique_count: int | None = None, | ||||||
| source_stats: ColumnSourceStats | None = None, | ||||||
| ) -> None: | ||||||
| self.name = name | ||||||
| self.unique_count = unique_count | ||||||
| self.source_stats = source_stats | ||||||
|
|
||||||
|
|
||||||
| class StatsCollector: | ||||||
| """Column statistics collector.""" | ||||||
|
|
||||||
| __slots__ = ("cardinality", "column_stats") | ||||||
|
|
||||||
| def __init__(self) -> None: | ||||||
| self.cardinality: dict[IR, int] = {} | ||||||
| self.column_stats: dict[IR, dict[str, ColumnStats]] = {} | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,7 +13,11 @@ | |
| from cudf_polars.dsl.ir import Distinct | ||
| from cudf_polars.experimental.base import PartitionInfo | ||
| from cudf_polars.experimental.dispatch import lower_ir_node | ||
| from cudf_polars.experimental.utils import _fallback_inform, _lower_ir_fallback | ||
| from cudf_polars.experimental.utils import ( | ||
| _fallback_inform, | ||
| _get_unique_fractions, | ||
| _lower_ir_fallback, | ||
| ) | ||
|
|
||
| if TYPE_CHECKING: | ||
| from collections.abc import MutableMapping | ||
|
|
@@ -29,7 +33,7 @@ def lower_distinct( | |
| partition_info: MutableMapping[IR, PartitionInfo], | ||
| config_options: ConfigOptions, | ||
| *, | ||
| cardinality: float | None = None, | ||
| unique_fraction: float | None = None, | ||
| ) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: | ||
| """ | ||
| Lower a Distinct IR into partition-wise stages. | ||
|
|
@@ -46,8 +50,8 @@ def lower_distinct( | |
| associated partitioning information. | ||
| config_options | ||
| GPUEngine configuration options. | ||
| cardinality | ||
| Cardinality factor to use for algorithm selection. | ||
| unique_fraction | ||
| Fractional unique count to use for algorithm selection. | ||
|
|
||
| Returns | ||
| ------- | ||
|
|
@@ -112,14 +116,14 @@ def lower_distinct( | |
| # partitions. For now, we raise an error to fall back | ||
| # to one partition. | ||
| raise NotImplementedError("Unsupported slice for multiple partitions.") | ||
| elif cardinality is not None: | ||
| # Use cardinality to determine partitioningcardinality | ||
| n_ary = min(max(int(1.0 / cardinality), 2), child_count) | ||
| output_count = max(int(cardinality * child_count), 1) | ||
| elif unique_fraction is not None: | ||
| # Use unique_fraction to determine partitioning | ||
| n_ary = min(max(int(1.0 / unique_fraction), 2), child_count) | ||
| output_count = max(int(unique_fraction * child_count), 1) | ||
|
|
||
| if output_count > 1 and require_tree_reduction: | ||
| # Need to reduce down to a single partition even | ||
| # if the cardinality is large. | ||
| # if the unique_fraction is large. | ||
| output_count = 1 | ||
| _fallback_inform( | ||
| "Unsupported unique options for multiple partitions.", | ||
|
|
@@ -164,24 +168,30 @@ def _( | |
| # Extract child partitioning | ||
| child, partition_info = rec(ir.children[0]) | ||
| config_options = rec.state["config_options"] | ||
| column_stats = rec.state["stats"].column_stats.get(ir.children[0], {}) | ||
|
|
||
| assert config_options.executor.name == "streaming", ( | ||
| "'in-memory' executor not supported in 'lower_ir_node'" | ||
| ) | ||
|
|
||
| subset: frozenset = ir.subset or frozenset(ir.schema) | ||
| cardinality_factor = { | ||
| c: max(min(f, 1.0), 0.00001) | ||
| for c, f in config_options.executor.cardinality_factor.items() | ||
| if c in subset | ||
| } | ||
| cardinality = max(cardinality_factor.values()) if cardinality_factor else None | ||
| subset: frozenset[str] = ir.subset or frozenset(ir.schema) | ||
| unique_fraction_dict = _get_unique_fractions( | ||
| tuple(subset), | ||
| config_options.executor.unique_fraction, | ||
| column_stats, | ||
| ) | ||
|
|
||
| unique_fraction = ( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a difference between
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar question at https://github.com/rapidsai/cudf/pull/19130/files#diff-c93704a29e1c3263ea7267bf2028f187f43d40727bda1b8ffbcdd87076bac77bR187 in The output here is So how about |
||
| max(unique_fraction_dict.values()) if unique_fraction_dict else None | ||
| ) | ||
|
|
||
| try: | ||
| return lower_distinct( | ||
| ir, | ||
| child, | ||
| partition_info, | ||
| config_options, | ||
| cardinality=cardinality, | ||
| unique_fraction=unique_fraction, | ||
| ) | ||
| except NotImplementedError as err: | ||
| return _lower_ir_fallback(ir, rec, msg=str(err)) | ||
Uh oh!
There was an error while loading. Please reload this page.