Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def get_executor_options(
and benchmark.__name__ == "PDSHQueries"
and run_config.executor == "streaming"
):
executor_options["cardinality_factor"] = {
executor_options["unique_fraction"] = {
"c_custkey": 0.05,
"l_orderkey": 1.0,
"l_partkey": 0.1,
Expand Down
41 changes: 24 additions & 17 deletions python/cudf_polars/cudf_polars/experimental/distinct.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
from cudf_polars.dsl.ir import Distinct
from cudf_polars.experimental.base import PartitionInfo
from cudf_polars.experimental.dispatch import lower_ir_node
from cudf_polars.experimental.utils import _fallback_inform, _lower_ir_fallback
from cudf_polars.experimental.utils import (
_fallback_inform,
_get_unique_fractions,
_lower_ir_fallback,
)

if TYPE_CHECKING:
from collections.abc import MutableMapping
Expand All @@ -29,7 +33,7 @@ def lower_distinct(
partition_info: MutableMapping[IR, PartitionInfo],
config_options: ConfigOptions,
*,
cardinality: float | None = None,
unique_fraction: float | None = None,
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
"""
Lower a Distinct IR into partition-wise stages.
Expand All @@ -46,8 +50,9 @@ def lower_distinct(
associated partitioning information.
config_options
GPUEngine configuration options.
cardinality
Cardinality factor to use for algorithm selection.
unique_fraction
Fraction of unique values to total values. Used for algorithm selection.
A value of `1.0` means the column is unique.

Returns
-------
Expand Down Expand Up @@ -112,14 +117,14 @@ def lower_distinct(
# partitions. For now, we raise an error to fall back
# to one partition.
raise NotImplementedError("Unsupported slice for multiple partitions.")
elif cardinality is not None:
# Use cardinality to determine partitioningcardinality
n_ary = min(max(int(1.0 / cardinality), 2), child_count)
output_count = max(int(cardinality * child_count), 1)
elif unique_fraction is not None:
# Use unique_fraction to determine partitioning
n_ary = min(max(int(1.0 / unique_fraction), 2), child_count)
output_count = max(int(unique_fraction * child_count), 1)

if output_count > 1 and require_tree_reduction:
# Need to reduce down to a single partition even
# if the cardinality is large.
# if the unique_fraction is large.
output_count = 1
_fallback_inform(
"Unsupported unique options for multiple partitions.",
Expand Down Expand Up @@ -168,20 +173,22 @@ def _(
"'in-memory' executor not supported in 'lower_ir_node'"
)

subset: frozenset = ir.subset or frozenset(ir.schema)
cardinality_factor = {
c: max(min(f, 1.0), 0.00001)
for c, f in config_options.executor.cardinality_factor.items()
if c in subset
}
cardinality = max(cardinality_factor.values()) if cardinality_factor else None
subset: frozenset[str] = ir.subset or frozenset(ir.schema)
unique_fraction_dict = _get_unique_fractions(
tuple(subset),
config_options.executor.unique_fraction,
)
unique_fraction = (
max(unique_fraction_dict.values()) if unique_fraction_dict else None
)

try:
return lower_distinct(
ir,
child,
partition_info,
config_options,
cardinality=cardinality,
unique_fraction=unique_fraction,
)
except NotImplementedError as err:
return _lower_ir_fallback(ir, rec, msg=str(err))
19 changes: 10 additions & 9 deletions python/cudf_polars/cudf_polars/experimental/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from cudf_polars.dsl.utils.naming import unique_names
from cudf_polars.experimental.base import PartitionInfo
from cudf_polars.experimental.repartition import Repartition
from cudf_polars.experimental.utils import _leaf_column_names
from cudf_polars.experimental.utils import _get_unique_fractions, _leaf_column_names

if TYPE_CHECKING:
from collections.abc import Generator, MutableMapping, Sequence
Expand Down Expand Up @@ -190,13 +190,14 @@ def _decompose_unique(
"'in-memory' executor not supported in '_decompose_unique'"
)

cardinality: float | None = None
if cardinality_factor := {
max(min(v, 1.0), 0.00001)
for k, v in config_options.executor.cardinality_factor.items()
if k in _leaf_column_names(child)
}:
cardinality = max(cardinality_factor)
unique_fraction_dict = _get_unique_fractions(
_leaf_column_names(child),
config_options.executor.unique_fraction,
)

unique_fraction = (
max(unique_fraction_dict.values()) if unique_fraction_dict else None
)

input_ir, partition_info = lower_distinct(
Distinct(
Expand All @@ -210,7 +211,7 @@ def _decompose_unique(
input_ir,
partition_info,
config_options,
cardinality=cardinality,
unique_fraction=unique_fraction,
)

return column, input_ir, partition_info
Expand Down
26 changes: 9 additions & 17 deletions python/cudf_polars/cudf_polars/experimental/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from cudf_polars.experimental.dispatch import lower_ir_node
from cudf_polars.experimental.repartition import Repartition
from cudf_polars.experimental.shuffle import Shuffle
from cudf_polars.experimental.utils import _lower_ir_fallback
from cudf_polars.experimental.utils import _get_unique_fractions, _lower_ir_fallback

if TYPE_CHECKING:
from collections.abc import Generator, MutableMapping
Expand Down Expand Up @@ -188,25 +188,17 @@ def _(

config_options = rec.state["config_options"]
assert config_options.executor.name == "streaming", (
"'in-memory' executor not supported in 'generate_ir_tasks'"
"'in-memory' executor not supported in 'lower_ir_node'"
)

child_count = partition_info[child].count
cardinality_factor = {
c: min(f, 1.0)
for c, f in config_options.executor.cardinality_factor.items()
if c in groupby_key_columns
}
if cardinality_factor:
# The `cardinality_factor` dictionary can be used
# to specify a mapping between column names and
# cardinality "factors". Each factor estimates the
# fractional number of unique values in the column.
# Each value should be in the range (0, 1].
post_aggregation_count = max(
int(max(cardinality_factor.values()) * child_count),
1,
)
if unique_fraction_dict := _get_unique_fractions(
groupby_key_columns,
config_options.executor.unique_fraction,
):
# Use unique_fraction to determine output partitioning
unique_fraction = max(unique_fraction_dict.values())
post_aggregation_count = max(int(unique_fraction * child_count), 1)

new_node: IR
name_generator = unique_names(ir.schema.keys())
Expand Down
14 changes: 13 additions & 1 deletion python/cudf_polars/cudf_polars/experimental/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from cudf_polars.experimental.base import PartitionInfo

if TYPE_CHECKING:
from collections.abc import MutableMapping
from collections.abc import MutableMapping, Sequence

from cudf_polars.containers import DataFrame
from cudf_polars.dsl.expr import Expr
Expand Down Expand Up @@ -98,3 +98,15 @@ def _leaf_column_names(expr: Expr) -> tuple[str, ...]:
return (expr.name,)
else:
return ()


def _get_unique_fractions(
column_names: Sequence[str],
user_unique_fractions: dict[str, float],
) -> dict[str, float]:
"""Return unique-fraction statistics subset."""
return {
c: max(min(f, 1.0), 0.00001)
for c, f in user_unique_fractions.items()
if c in column_names
}
25 changes: 19 additions & 6 deletions python/cudf_polars/cudf_polars/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,13 @@ class StreamingExecutor:
The maximum number of rows to process per partition. 1_000_000 by default.
When the number of rows exceeds this value, the query will be split into
multiple partitions and executed in parallel.
cardinality_factor
unique_fraction
A dictionary mapping column names to floats between 0 and 1 (inclusive
on the right).

Each factor estimates the fractional number of unique values in the
column. By default, ``1.0`` is used for any column not included in
``cardinality_factor``.
``unique_fraction``.
target_partition_size
Target partition size for IO tasks. This configuration currently
controls how large parquet files are split into multiple partitions.
Expand Down Expand Up @@ -197,7 +197,7 @@ class StreamingExecutor:
scheduler: Scheduler = Scheduler.SYNCHRONOUS
fallback_mode: StreamingFallbackMode = StreamingFallbackMode.WARN
max_rows_per_partition: int = 1_000_000
cardinality_factor: dict[str, float] = dataclasses.field(default_factory=dict)
unique_fraction: dict[str, float] = dataclasses.field(default_factory=dict)
target_partition_size: int = 0
groupby_n_ary: int = 32
broadcast_join_limit: int = 0
Expand Down Expand Up @@ -234,8 +234,8 @@ def __post_init__(self) -> None:
# Type / value check everything else
if not isinstance(self.max_rows_per_partition, int):
raise TypeError("max_rows_per_partition must be an int")
if not isinstance(self.cardinality_factor, dict):
raise TypeError("cardinality_factor must be a dict of column name to float")
if not isinstance(self.unique_fraction, dict):
raise TypeError("unique_fraction must be a dict of column name to float")
if not isinstance(self.target_partition_size, int):
raise TypeError("target_partition_size must be an int")
if not isinstance(self.groupby_n_ary, int):
Expand All @@ -249,7 +249,7 @@ def __hash__(self) -> int:
# cardinality factory, a dict, isn't natively hashable. We'll dump it
# to json and hash that.
d = dataclasses.asdict(self)
d["cardinality_factor"] = json.dumps(d["cardinality_factor"])
d["unique_fraction"] = json.dumps(d["unique_fraction"])
return hash(tuple(sorted(d.items())))


Expand Down Expand Up @@ -324,6 +324,19 @@ def from_polars_engine(cls, engine: pl.GPUEngine) -> Self:
user_parquet_options = engine.config.get("parquet_options", {})
user_raise_on_fail = engine.config.get("raise_on_fail", False)

# Backward compatibility for "cardinality_factor"
# TODO: Remove this in 25.10
if "cardinality_factor" in user_executor_options:
warnings.warn(
"The 'cardinality_factor' configuration is deprecated. "
"Please use 'unique_fraction' instead.",
FutureWarning,
stacklevel=2,
)
cardinality_factor = user_executor_options.pop("cardinality_factor")
if "unique_fraction" not in user_executor_options:
user_executor_options["unique_fraction"] = cardinality_factor

# These are user-provided options, so we need to actually validate
# them.

Expand Down
2 changes: 1 addition & 1 deletion python/cudf_polars/tests/experimental/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def test_groupby_agg_config_options(df, op, keys):
executor_options={
"max_rows_per_partition": 4,
# Trigger shuffle-based groupby
"cardinality_factor": {"z": 0.5},
"unique_fraction": {"z": 0.5},
# Check that we can change the n-ary factor
"groupby_n_ary": 8,
"scheduler": DEFAULT_SCHEDULER,
Expand Down
2 changes: 1 addition & 1 deletion python/cudf_polars/tests/experimental/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def test_preserve_partitioning():
"max_rows_per_partition": 2,
"scheduler": DEFAULT_SCHEDULER,
"broadcast_join_limit": 2,
"cardinality_factor": {"a": 1.0},
"unique_fraction": {"a": 1.0},
},
)
left = pl.LazyFrame({"a": [1, 2, 3, 4] * 5, "b": range(20)})
Expand Down
6 changes: 3 additions & 3 deletions python/cudf_polars/tests/experimental/test_unique.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_unique(df, keep, subset, maintain_order, cardinality):
executor_options={
"max_rows_per_partition": 50,
"scheduler": DEFAULT_SCHEDULER,
"cardinality_factor": cardinality,
"unique_fraction": cardinality,
"fallback_mode": "silent",
},
)
Expand All @@ -55,7 +55,7 @@ def test_unique_fallback(df):
executor_options={
"max_rows_per_partition": 50,
"scheduler": DEFAULT_SCHEDULER,
"cardinality_factor": {"y": 1.0},
"unique_fraction": {"y": 1.0},
"fallback_mode": "raise",
},
)
Expand All @@ -76,7 +76,7 @@ def test_unique_select(df, maintain_order, cardinality):
executor_options={
"max_rows_per_partition": 4,
"scheduler": DEFAULT_SCHEDULER,
"cardinality_factor": cardinality,
"unique_fraction": cardinality,
"fallback_mode": "silent",
},
)
Expand Down
12 changes: 11 additions & 1 deletion python/cudf_polars/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def test_validate_shuffle_method() -> None:
"option",
[
"max_rows_per_partition",
"cardinality_factor",
"unique_fraction",
"target_partition_size",
"groupby_n_ary",
"broadcast_join_limit",
Expand All @@ -254,6 +254,16 @@ def test_validate_max_rows_per_partition(option: str) -> None:
)


def test_cardinality_factor_compat() -> None:
with pytest.warns(FutureWarning, match="configuration is deprecated"):
ConfigOptions.from_polars_engine(
pl.GPUEngine(
executor="streaming",
executor_options={"cardinality_factor": {}},
)
)


@pytest.mark.parametrize("option", ["chunked", "chunk_read_limit", "pass_read_limit"])
def test_validate_parquet_options(option: str) -> None:
with pytest.raises(TypeError, match=f"{option} must be"):
Expand Down