Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import time
import traceback
import uuid
import warnings
from collections import defaultdict
from datetime import UTC, datetime
from pathlib import Path
Expand Down Expand Up @@ -359,7 +358,6 @@ class RunConfig:
runtime: str
stream_policy: str | None
cluster: str
scheduler: str # Deprecated, kept for backward compatibility
n_workers: int
versions: PackageVersions = dataclasses.field(
default_factory=PackageVersions.collect
Expand Down Expand Up @@ -414,43 +412,16 @@ def from_args(cls, args: argparse.Namespace) -> RunConfig:
"""Create a RunConfig from command line arguments."""
executor: ExecutorType = args.executor
cluster = args.cluster
scheduler = args.scheduler
runtime = args.runtime
stream_policy = args.stream_policy

# Handle "auto" stream policy
if stream_policy == "auto":
stream_policy = None

# Deal with deprecated scheduler argument
# and non-streaming executors
# Deal with non-streaming executors
if executor == "in-memory" or executor == "cpu":
cluster = None
scheduler = None
elif scheduler is not None:
if cluster is not None:
raise ValueError(
"Cannot specify both -s/--scheduler and -c/--cluster. "
"Please use -c/--cluster only."
)
else:
warnings.warn(
"The -s/--scheduler argument is deprecated. Use -c/--cluster instead.",
FutureWarning,
stacklevel=2,
)
cluster = "single" if scheduler == "synchronous" else "distributed"
elif cluster is not None:
match cluster:
case "single":
scheduler = "synchronous"
case "distributed":
scheduler = "distributed"
case "spmd": # launched via rrun, not Dask
scheduler = None
else:
cluster = "single"
scheduler = "synchronous"

path = args.path
name = args.query_set
Expand Down Expand Up @@ -520,7 +491,6 @@ def from_args(cls, args: argparse.Namespace) -> RunConfig:
queries=args.query,
executor=executor,
cluster=cluster,
scheduler=scheduler,
runtime=runtime,
stream_policy=stream_policy,
n_workers=args.n_workers,
Expand Down Expand Up @@ -968,19 +938,6 @@ def build_parser(num_queries: int = 22) -> argparse.ArgumentParser:
- spmd : SPMD execution via rrun launcher
- ray : Ray actor-based multi-GPU execution"""),
)
parser.add_argument(
"-s",
"--scheduler",
default=None,
type=str,
choices=["synchronous", "distributed"],
help=textwrap.dedent("""\
*Deprecated*: Use --cluster instead.

Scheduler type to use with the 'streaming' executor.
- synchronous : Run locally in a single process
- distributed : Use Dask for multi-GPU execution"""),
)
parser.add_argument(
"--runtime",
type=str,
Expand Down
67 changes: 1 addition & 66 deletions python/cudf_polars/cudf_polars/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import importlib.util
import json
import os
import warnings
from typing import TYPE_CHECKING, Any, Generic, Literal, TypeVar

from rmm.pylibrmm import CudaStreamFlags, CudaStreamPool
Expand Down Expand Up @@ -56,7 +55,6 @@
"RayContext",
"Runtime",
"SPMDContext",
"Scheduler", # Deprecated, kept for backward compatibility
"ShuffleMethod",
"StreamingExecutor",
"StreamingFallbackMode",
Expand Down Expand Up @@ -178,20 +176,6 @@ class Cluster(enum.StrEnum):
RAY = "ray"


class Scheduler(enum.StrEnum):
"""
**Deprecated**: Use :class:`Cluster` instead.

The scheduler to use for the task-based streaming executor.

* ``Scheduler.SYNCHRONOUS`` : Single-GPU execution (use ``Cluster.SINGLE`` instead)
* ``Scheduler.DISTRIBUTED`` : Multi-GPU execution (use ``Cluster.DISTRIBUTED`` instead)
"""

SYNCHRONOUS = "synchronous"
DISTRIBUTED = "distributed"


class ShuffleMethod(enum.StrEnum):
"""
The method to use for shuffling data between workers with the streaming executor.
Expand Down Expand Up @@ -591,12 +575,6 @@ class StreamingExecutor:
* ``Cluster.DISTRIBUTED``: Multi-GPU distributed execution (requires
an active Dask cluster)

scheduler
**Deprecated**: Use ``cluster`` instead.

For backward compatibility:
* ``Scheduler.SYNCHRONOUS`` maps to ``Cluster.SINGLE``
* ``Scheduler.DISTRIBUTED`` maps to ``Cluster.DISTRIBUTED``
fallback_mode
How to handle errors when the GPU engine fails to execute a query.
``StreamingFallbackMode.WARN`` by default.
Expand Down Expand Up @@ -707,13 +685,6 @@ class StreamingExecutor:
default=None,
)
)
scheduler: Scheduler | None = dataclasses.field(
default_factory=_make_default_factory(
f"{_env_prefix}__SCHEDULER",
Scheduler.__call__,
default=None,
)
)
fallback_mode: StreamingFallbackMode = dataclasses.field(
default_factory=_make_default_factory(
f"{_env_prefix}__FALLBACK_MODE",
Expand Down Expand Up @@ -796,30 +767,7 @@ def __post_init__(self) -> None: # noqa: D105
raise ValueError("The rapidsmpf streaming engine requires rapidsmpf.")
object.__setattr__(self, "shuffle_method", "rapidsmpf")

# Handle backward compatibility for deprecated scheduler parameter
if self.scheduler is not None:
if self.cluster is not None:
raise ValueError(
"Cannot specify both 'scheduler' and 'cluster'. "
"The 'scheduler' parameter is deprecated. "
"Please use only 'cluster' instead."
)
else:
warnings.warn(
"""The 'scheduler' parameter is deprecated. Please use 'cluster' instead.
Use 'cluster="single"' instead of 'scheduler="synchronous"' and "
'cluster="distributed"' instead of 'scheduler="distributed"'.""",
FutureWarning,
stacklevel=2,
)
# Map old scheduler values to new cluster values
if self.scheduler == "synchronous":
object.__setattr__(self, "cluster", Cluster.SINGLE)
elif self.scheduler == "distributed":
object.__setattr__(self, "cluster", Cluster.DISTRIBUTED)
# Clear scheduler to avoid confusion
object.__setattr__(self, "scheduler", None)
elif self.cluster is None:
if self.cluster is None:
object.__setattr__(self, "cluster", Cluster.SINGLE)
assert self.cluster is not None, "Expected cluster to be set."

Expand Down Expand Up @@ -1095,19 +1043,6 @@ def from_polars_engine(
**user_memory_resource_config
)

# Backward compatibility for "cardinality_factor"
# TODO: Remove this in 25.10
if "cardinality_factor" in user_executor_options:
warnings.warn(
"The 'cardinality_factor' configuration is deprecated. "
"Please use 'unique_fraction' instead.",
FutureWarning,
stacklevel=2,
)
cardinality_factor = user_executor_options.pop("cardinality_factor")
if "unique_fraction" not in user_executor_options:
user_executor_options["unique_fraction"] = cardinality_factor

# These are user-provided options, so we need to actually validate
# them.

Expand Down
50 changes: 0 additions & 50 deletions python/cudf_polars/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,46 +345,6 @@ def test_validate_cluster() -> None:
)


def test_scheduler_deprecated() -> None:
# Test that using deprecated scheduler parameter emits warning
# and correctly maps to cluster parameter

# Test scheduler="synchronous" maps to cluster="single"
with pytest.warns(FutureWarning, match="'scheduler' parameter is deprecated"):
config = ConfigOptions.from_polars_engine(
pl.GPUEngine(
executor="streaming",
executor_options={"scheduler": "synchronous"},
)
)
assert config.executor.name == "streaming"
assert config.executor.cluster == "single"
assert config.executor.scheduler is None # Should be cleared after mapping

# Test scheduler="distributed" maps to cluster="distributed"
with pytest.warns(FutureWarning, match="'scheduler' parameter is deprecated"):
config = ConfigOptions.from_polars_engine(
pl.GPUEngine(
executor="streaming",
executor_options={"scheduler": "distributed"},
)
)
assert config.executor.name == "streaming"
assert config.executor.cluster == "distributed"
assert config.executor.scheduler is None # Should be cleared after mapping

# Test that specifying both cluster and scheduler raises an error
with pytest.raises(
ValueError, match="Cannot specify both 'scheduler' and 'cluster'"
):
ConfigOptions.from_polars_engine(
pl.GPUEngine(
executor="streaming",
executor_options={"cluster": "single", "scheduler": "synchronous"},
)
)


def test_validate_shuffle_method_defaults(
*,
rapidsmpf_distributed_available: bool,
Expand Down Expand Up @@ -557,16 +517,6 @@ def test_fallback_mode_default(monkeypatch: pytest.MonkeyPatch) -> None:
ConfigOptions.from_polars_engine(engine)


def test_cardinality_factor_compat() -> None:
with pytest.warns(FutureWarning, match="configuration is deprecated"):
ConfigOptions.from_polars_engine(
pl.GPUEngine(
executor="streaming",
executor_options={"cardinality_factor": {}},
)
)


@pytest.mark.parametrize(
"option",
[
Expand Down
Loading