Skip to content

Commit 20df9ad

Browse files
authored
Merge branch 'master' into eoakes/fix-jobs-diagram
2 parents 1aacb02 + a1bfd6a commit 20df9ad

File tree

19 files changed

+457
-109
lines changed

19 files changed

+457
-109
lines changed

ci/raydepsets/cli.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@ def __init__(
9696
check: Optional[bool] = False,
9797
build_all_configs: Optional[bool] = False,
9898
):
99+
"""Initialize the dependency set manager.
100+
101+
Args:
102+
config_path: Path to the depsets config file.
103+
workspace_dir: Path to the workspace directory.
104+
uv_cache_dir: Directory to cache uv dependencies.
105+
check: Whether to check if lock files are up to date.
106+
build_all_configs: Whether to build all configs or just the specified one.
107+
"""
99108
self.workspace = Workspace(workspace_dir)
100109
self.config = self.workspace.load_configs(config_path)
101110
self.config_name = os.path.basename(config_path)
@@ -109,6 +118,7 @@ def __init__(
109118
self.copy_to_temp_dir()
110119

111120
def get_output_paths(self) -> List[Path]:
121+
"""Get all output paths for depset nodes in topological order."""
112122
output_paths = []
113123
for node in topological_sort(self.build_graph):
114124
if self.build_graph.nodes[node]["node_type"] == "depset":
@@ -126,6 +136,7 @@ def copy_to_temp_dir(self):
126136
)
127137

128138
def get_diffs(self) -> List[str]:
139+
"""Compare current lock files with previously saved copies and return unified diffs."""
129140
diffs = []
130141
for output_path in self.output_paths:
131142
new_lock_file_fp, old_lock_file_fp = self.get_source_and_dest(output_path)
@@ -142,6 +153,7 @@ def get_diffs(self) -> List[str]:
142153
return diffs
143154

144155
def diff_lock_files(self):
156+
"""Check if lock files are up to date and raise an error if not."""
145157
diffs = self.get_diffs()
146158
if len(diffs) > 0:
147159
raise RuntimeError(
@@ -151,9 +163,11 @@ def diff_lock_files(self):
151163
click.echo("Lock files are up to date.")
152164

153165
def get_source_and_dest(self, output_path: str) -> tuple[Path, Path]:
166+
"""Get the source workspace path and temporary destination path for a lock file."""
154167
return (self.get_path(output_path), (Path(self.temp_dir) / output_path))
155168

156169
def _build(self, build_all_configs: Optional[bool] = False):
170+
"""Build the dependency graph from config depsets."""
157171
for depset in self.config.depsets:
158172
if depset.operation == "compile":
159173
self.build_graph.add_node(
@@ -201,11 +215,13 @@ def _build(self, build_all_configs: Optional[bool] = False):
201215
self.subgraph_config_nodes()
202216

203217
def subgraph_dependency_nodes(self, depset_name: str):
218+
"""Reduce the build graph to only include the specified depset and its ancestors."""
204219
dependency_nodes = networkx_ancestors(self.build_graph, depset_name)
205220
nodes = dependency_nodes | {depset_name}
206221
self.build_graph = self.build_graph.subgraph(nodes).copy()
207222

208223
def subgraph_config_nodes(self):
224+
"""Reduce the build graph to nodes matching the current config and their ancestors."""
209225
# Get all nodes that have the target config name
210226
config_nodes = [
211227
node
@@ -224,6 +240,7 @@ def subgraph_config_nodes(self):
224240
self.build_graph = self.build_graph.subgraph(nodes).copy()
225241

226242
def execute(self, single_depset_name: Optional[str] = None):
243+
"""Execute all depsets in topological order, optionally limited to a single depset."""
227244
if single_depset_name:
228245
# check if the depset exists
229246
_get_depset(self.config.depsets, single_depset_name)
@@ -240,6 +257,7 @@ def execute(self, single_depset_name: Optional[str] = None):
240257
def exec_uv_cmd(
241258
self, cmd: str, args: List[str], stdin: Optional[bytes] = None
242259
) -> str:
260+
"""Execute a uv pip command with the given arguments."""
243261
cmd = [self._uv_binary, "pip", cmd, *args]
244262
click.echo(f"Executing command: {' '.join(cmd)}")
245263
status = subprocess.run(
@@ -252,6 +270,7 @@ def exec_uv_cmd(
252270
return status.stdout.decode("utf-8")
253271

254272
def execute_pre_hook(self, pre_hook: str):
273+
"""Execute a pre-hook shell command."""
255274
status = subprocess.run(
256275
shlex.split(pre_hook),
257276
cwd=self.workspace.dir,
@@ -265,6 +284,7 @@ def execute_pre_hook(self, pre_hook: str):
265284
click.echo(f"Executed pre_hook {pre_hook} successfully")
266285

267286
def execute_depset(self, depset: Depset):
287+
"""Execute a single depset based on its operation type (compile, subset, or expand)."""
268288
if depset.operation == "compile":
269289
self.compile(
270290
constraints=depset.constraints,
@@ -389,15 +409,18 @@ def expand(
389409
)
390410

391411
def read_lock_file(self, file_path: Path) -> List[str]:
412+
"""Read and return the contents of a lock file as a list of lines."""
392413
if not file_path.exists():
393414
raise RuntimeError(f"Lock file {file_path} does not exist")
394415
with open(file_path, "r") as f:
395416
return f.readlines()
396417

397418
def get_path(self, path: str) -> Path:
419+
"""Convert a relative path to an absolute path within the workspace."""
398420
return Path(self.workspace.dir) / path
399421

400422
def check_subset_exists(self, source_depset: Depset, requirements: List[str]):
423+
"""Verify that all requirements exist in the source depset."""
401424
for req in requirements:
402425
if req not in self.get_expanded_depset_requirements(source_depset.name, []):
403426
raise RuntimeError(
@@ -424,15 +447,18 @@ def get_expanded_depset_requirements(
424447
return list(set(requirements_list))
425448

426449
def cleanup(self):
450+
"""Remove the temporary directory used for lock file comparisons."""
427451
if self.temp_dir:
428452
shutil.rmtree(self.temp_dir)
429453

430454

431455
def _get_bytes(packages: List[str]) -> bytes:
456+
"""Convert a list of package names to newline-separated UTF-8 bytes."""
432457
return ("\n".join(packages) + "\n").encode("utf-8")
433458

434459

435460
def _get_depset(depsets: List[Depset], name: str) -> Depset:
461+
"""Find and return a depset by name from a list of depsets."""
436462
for depset in depsets:
437463
if depset.name == name:
438464
return depset
@@ -452,6 +478,7 @@ def _flatten_flags(flags: List[str]) -> List[str]:
452478

453479

454480
def _override_uv_flags(flags: List[str], args: List[str]) -> List[str]:
481+
"""Override existing uv flags in args with new values from flags."""
455482
flag_names = {f.split()[0] for f in flags if f.startswith("--")}
456483
new_args = []
457484
skip_next = False
@@ -468,6 +495,7 @@ def _override_uv_flags(flags: List[str], args: List[str]) -> List[str]:
468495

469496

470497
def _uv_binary():
498+
"""Get the path to the uv binary for the current platform."""
471499
r = runfiles.Create()
472500
system = platform.system()
473501
processor = platform.processor()

doc/source/data/api/aggregate.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ compute aggregations.
2525
AbsMax
2626
Quantile
2727
Unique
28+
CountDistinct
2829
ValueCounter
2930
MissingValuePercentage
3031
ZeroPercentage

doc/source/serve/advanced-guides/multi-app-container.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,3 +167,5 @@ If raylet is running inside a container, then that container needs the necessary
167167
* This error should only occur when you're running the Ray cluster inside a container. If you see this error when starting the replica actor, try volume mounting `/var/lib/containers` in the container that runs raylet. That is, add `-v /var/lib/containers:/var/lib/containers` to the command that starts the Docker container.
168168
* **cannot clone: Operation not permitted; Error: cannot re-exec process**
169169
* This error should only occur when you're running the Ray cluster inside a container. This error implies that you don't have the permissions to use Podman to start a container. You need to start the container that runs raylet, with privileged permissions by adding `--privileged`.
170+
* **Very slow or hanging container startup**
171+
* This is typically caused by using the default podman storage driver (`vfs`) with large container images. Podman runs in rootless mode, so its startup sequence involves modifying permissions of files in the container. The default storage driver is very slow to do this. Try configuring podman to use the `overlay` storage driver instead. You may need to also configure the `mount_program` to point to `/usr/bin/fuse-overlayfs` (or your appropriate local path).

python/ray/data/aggregate.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,64 @@ def _normalize_nans(x: Collection) -> Set:
10321032
return {v if not (isinstance(v, float) and np.isnan(v)) else np.nan for v in x}
10331033

10341034

1035+
@PublicAPI
1036+
class CountDistinct(Unique):
1037+
"""Defines distinct count aggregation.
1038+
1039+
This aggregation computes the count of distinct values in a column.
1040+
It is similar to SQL's COUNT(DISTINCT column_name) operation.
1041+
1042+
Example:
1043+
1044+
.. testcode::
1045+
1046+
import ray
1047+
from ray.data.aggregate import CountDistinct
1048+
1049+
# Create a dataset with repeated values
1050+
ds = ray.data.from_items([
1051+
{"category": "A"}, {"category": "B"}, {"category": "A"},
1052+
{"category": "C"}, {"category": "A"}, {"category": "B"}
1053+
])
1054+
1055+
# Count distinct categories
1056+
result = ds.aggregate(CountDistinct(on="category"))
1057+
# result: {'count_distinct(category)': 3}
1058+
1059+
# Using with groupby
1060+
ds = ray.data.from_items([
1061+
{"group": "X", "category": "A"}, {"group": "X", "category": "B"},
1062+
{"group": "Y", "category": "A"}, {"group": "Y", "category": "A"}
1063+
])
1064+
result = ds.groupby("group").aggregate(CountDistinct(on="category")).take_all()
1065+
# result: [{'group': 'X', 'count_distinct(category)': 2},
1066+
# {'group': 'Y', 'count_distinct(category)': 1}]
1067+
1068+
Args:
1069+
on: The name of the column to count distinct values on.
1070+
ignore_nulls: Whether to ignore null values when counting distinct items.
1071+
Default is True (nulls are excluded from the count).
1072+
alias_name: Optional name for the resulting column. If not provided,
1073+
defaults to "count_distinct({on})".
1074+
"""
1075+
1076+
def __init__(
1077+
self,
1078+
on: str,
1079+
ignore_nulls: bool = True,
1080+
alias_name: Optional[str] = None,
1081+
):
1082+
super().__init__(
1083+
on=on,
1084+
ignore_nulls=ignore_nulls,
1085+
alias_name=alias_name if alias_name else f"count_distinct({str(on)})",
1086+
)
1087+
1088+
def finalize(self, accumulator: Set[Any]) -> int:
1089+
"""Return the count of distinct values."""
1090+
return len(accumulator)
1091+
1092+
10351093
@PublicAPI
10361094
class ValueCounter(AggregateFnV2):
10371095
"""Counts the number of times each value appears in a column.

python/ray/data/read_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4089,7 +4089,7 @@ def read_unity_catalog(
40894089
40904090
This function works by leveraging Unity Catalog's credential vending feature, which grants temporary, least-privilege
40914091
credentials for the cloud storage location backing the requested table or data files. It authenticates via the Unity Catalog
4092-
REST API (Unity Catalog credential vending for external system access, `Databricks Docs <https://docs.databricks.com/en/data-governance/unity-catalog/credential-vending.html>`_),
4092+
REST API (Unity Catalog credential vending for external system access, `Databricks Docs <https://docs.databricks.com/en/external-access/credential-vending.html>`_),
40934093
ensuring that permissions are enforced at the Databricks principal (user, group, or service principal) making the request.
40944094
The function supports reading data directly from AWS S3, Azure Data Lake, or GCP GCS in standard formats including Delta and Parquet.
40954095

python/ray/data/tests/test_groupby_e2e.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
AbsMax,
2222
AggregateFn,
2323
Count,
24+
CountDistinct,
2425
Max,
2526
Mean,
2627
Min,
@@ -508,6 +509,7 @@ def test_groupby_arrow_multi_agg(
508509
.aggregate(
509510
Count(),
510511
Count("B"),
512+
CountDistinct("B"),
511513
Sum("B"),
512514
Min("B"),
513515
Max("B"),
@@ -526,6 +528,7 @@ def test_groupby_arrow_multi_agg(
526528
"B": [
527529
"count",
528530
"count",
531+
"nunique",
529532
"sum",
530533
"min",
531534
"max",
@@ -542,6 +545,7 @@ def test_groupby_arrow_multi_agg(
542545
"A",
543546
"count()",
544547
"count(B)",
548+
"count_distinct(B)",
545549
"sum(B)",
546550
"min(B)",
547551
"max(B)",
@@ -637,6 +641,9 @@ def test_groupby_multi_agg_with_nans(
637641
.groupby("A")
638642
.aggregate(
639643
Count("B", alias_name="count_b", ignore_nulls=ignore_nulls),
644+
CountDistinct(
645+
"B", alias_name="count_distinct_b", ignore_nulls=ignore_nulls
646+
),
640647
Sum("B", alias_name="sum_b", ignore_nulls=ignore_nulls),
641648
Min("B", alias_name="min_b", ignore_nulls=ignore_nulls),
642649
Max("B", alias_name="max_b", ignore_nulls=ignore_nulls),
@@ -654,6 +661,7 @@ def test_groupby_multi_agg_with_nans(
654661
{
655662
"B": [
656663
("count_b", lambda s: s.count() if ignore_nulls else len(s)),
664+
("count_distinct_b", lambda s: s.nunique(dropna=ignore_nulls)),
657665
("sum_b", lambda s: s.sum(skipna=ignore_nulls)),
658666
("min_b", lambda s: s.min(skipna=ignore_nulls)),
659667
("max_b", lambda s: s.max(skipna=ignore_nulls)),
@@ -674,6 +682,7 @@ def test_groupby_multi_agg_with_nans(
674682
grouped_df.columns = [
675683
"A",
676684
"count_b",
685+
"count_distinct_b",
677686
"sum_b",
678687
"min_b",
679688
"max_b",
@@ -744,6 +753,7 @@ def test_groupby_aggregations_are_associative(
744753

745754
aggs = [
746755
Count("B", alias_name="count_b", ignore_nulls=ignore_nulls),
756+
CountDistinct("B", alias_name="count_distinct_b", ignore_nulls=ignore_nulls),
747757
Sum("B", alias_name="sum_b", ignore_nulls=ignore_nulls),
748758
Min("B", alias_name="min_b", ignore_nulls=ignore_nulls),
749759
Max("B", alias_name="max_b", ignore_nulls=ignore_nulls),
@@ -759,6 +769,7 @@ def test_groupby_aggregations_are_associative(
759769
{
760770
"B": [
761771
("count", lambda s: s.count() if ignore_nulls else len(s)),
772+
("count_distinct", lambda s: s.nunique(dropna=ignore_nulls)),
762773
("sum", lambda s: s.sum(skipna=ignore_nulls, min_count=1)),
763774
("min", lambda s: s.min(skipna=ignore_nulls)),
764775
("max", lambda s: s.max(skipna=ignore_nulls)),
@@ -779,6 +790,7 @@ def test_groupby_aggregations_are_associative(
779790
grouped_df.columns = [
780791
"A",
781792
"count_b",
793+
"count_distinct_b",
782794
"sum_b",
783795
"min_b",
784796
"max_b",

0 commit comments

Comments
 (0)