Skip to content
41 changes: 36 additions & 5 deletions python/ray/data/_internal/logical/optimizers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from typing import List
from typing import Callable, List

from .ruleset import Ruleset
from ray.data._internal.logical.interfaces import (
LogicalPlan,
Optimizer,
PhysicalPlan,
Plan,
Rule,
)
from ray.data._internal.logical.rules.configure_map_task_memory import (
Expand Down Expand Up @@ -65,6 +66,27 @@ def rules(self) -> List[Rule]:
return [rule_cls() for rule_cls in get_physical_ruleset()]


def get_plan_conversion_fns() -> List[Callable[[Plan], Plan]]:
"""Get the list of transformation functions to convert a logical plan
to an optimized physical plan.

This returns the 3 transformation steps:
1. Logical optimization
2. Planning (logical -> physical operators)
3. Physical optimization

Returns:
A list of transformation functions, each taking a Plan and returning a Plan.
"""
from ray.data._internal.planner import create_planner

return [
LogicalOptimizer().optimize, # Logical optimization
create_planner().plan, # Planning
PhysicalOptimizer().optimize, # Physical optimization
]


def get_execution_plan(logical_plan: LogicalPlan) -> PhysicalPlan:
"""Get the physical execution plan for the provided logical plan.

Expand All @@ -73,9 +95,18 @@ def get_execution_plan(logical_plan: LogicalPlan) -> PhysicalPlan:
(2) planning: convert logical to physical operators.
(3) physical optimization: optimize physical operators.
"""
from ray.data._internal.planner import create_planner

optimized_logical_plan = LogicalOptimizer().optimize(logical_plan)
# 1. Get planning functions
optimize_logical, plan, optimize_physical = get_plan_conversion_fns()

# 2. Logical -> Logical (Optimized)
optimized_logical_plan = optimize_logical(logical_plan)

# 3. Rewire Logical -> Logical (Optimized)
logical_plan._dag = optimized_logical_plan.dag
physical_plan = create_planner().plan(optimized_logical_plan)
return PhysicalOptimizer().optimize(physical_plan)

# 4. Logical (Optimized) -> Physical
physical_plan = plan(optimized_logical_plan)

# 5. Physical (Optimized) -> Physical
return optimize_physical(physical_plan)
34 changes: 24 additions & 10 deletions python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ray.data._internal.logical.interfaces.logical_plan import LogicalPlan
from ray.data._internal.logical.interfaces.operator import Operator
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.optimizers import get_plan_conversion_fns
from ray.data._internal.stats import DatasetStats
from ray.data.block import BlockMetadataWithSchema, _take_first_non_empty_schema
from ray.data.context import DataContext
Expand Down Expand Up @@ -113,19 +114,32 @@ def __repr__(self) -> str:

def explain(self) -> str:
"""Return a string representation of the logical and physical plan."""
from ray.data._internal.logical.optimizers import get_execution_plan

logical_plan = self._logical_plan
logical_plan_str, _ = self.generate_plan_string(logical_plan.dag)
logical_plan_str = "-------- Logical Plan --------\n" + logical_plan_str
convert_fns = [lambda x: x] + get_plan_conversion_fns()
titles: List[str] = [
"Logical Plan",
"Logical Plan (Optimized)",
"Physical Plan",
"Physical Plan (Optimized)",
]
Comment on lines +118 to +124
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
convert_fns = [lambda x: x] + get_plan_conversion_fns()
titles: List[str] = [
"Logical Plan",
"Logical Plan (Optimized)",
"Physical Plan",
"Physical Plan (Optimized)",
]
titles, plan_transform_fn = zip(*[
("Logical Plan", None),
("Logical Plan (Optimized)", optimize_logical),
("Physical Plan", plan),
("Physical Plan (Optimized)", optimize_physical),
])


physical_plan = get_execution_plan(self._logical_plan)
physical_plan_str, _ = self.generate_plan_string(
physical_plan.dag, show_op_repr=True
)
physical_plan_str = "-------- Physical Plan --------\n" + physical_plan_str
# 1. Set initial plan
plan = self._logical_plan

sections = []
for title, convert_fn in zip(titles, convert_fns):

# 2. Convert plan to new plan
plan = convert_fn(plan)

# 3. Generate plan str from new plan.
plan_str, _ = self.generate_plan_string(plan.dag, show_op_repr=True)

banner = f"\n-------- {title} --------\n"
section = f"{banner}{plan_str}"
sections.append(section)

return logical_plan_str + physical_plan_str
return "".join(sections)

@staticmethod
def generate_plan_string(
Expand Down
15 changes: 13 additions & 2 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6092,10 +6092,21 @@ def explain(self):

.. testoutput::

<BLANKLINE>
-------- Logical Plan --------
Map(<lambda>)
+- ReadRange
MapRows[Map(<lambda>)]
+- Read[ReadRange]
<BLANKLINE>
-------- Logical Plan (Optimized) --------
MapRows[Map(<lambda>)]
+- Read[ReadRange]
<BLANKLINE>
-------- Physical Plan --------
TaskPoolMapOperator[Map(<lambda>)]
+- TaskPoolMapOperator[ReadRange]
+- InputDataBuffer[Input]
<BLANKLINE>
-------- Physical Plan (Optimized) --------
TaskPoolMapOperator[ReadRange->Map(<lambda>)]
+- InputDataBuffer[Input]
<BLANKLINE>
Expand Down
61 changes: 45 additions & 16 deletions python/ray/data/tests/test_consumption.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,38 +448,67 @@ def test_dataset_explain(ray_start_regular_shared, capsys):

ds.explain()
captured = capsys.readouterr()
assert captured.out.rstrip() == (
assert captured.out.strip() == (
"-------- Logical Plan --------\n"
"Map(<lambda>)\n"
"+- ReadRange\n"
"-------- Physical Plan --------\n"
"MapRows[Map(<lambda>)]\n"
"+- Read[ReadRange]\n"
"\n-------- Logical Plan (Optimized) --------\n"
"MapRows[Map(<lambda>)]\n"
"+- Read[ReadRange]\n"
"\n-------- Physical Plan --------\n"
"TaskPoolMapOperator[Map(<lambda>)]\n"
"+- TaskPoolMapOperator[ReadRange]\n"
" +- InputDataBuffer[Input]\n"
"\n-------- Physical Plan (Optimized) --------\n"
"TaskPoolMapOperator[ReadRange->Map(<lambda>)]\n"
"+- InputDataBuffer[Input]"
)

ds = ds.filter(lambda x: x["id"] > 0)
ds.explain()
captured = capsys.readouterr()
assert captured.out.rstrip() == (
assert captured.out.strip() == (
"-------- Logical Plan --------\n"
"Filter(<lambda>)\n"
"+- Map(<lambda>)\n"
" +- ReadRange\n"
"-------- Physical Plan --------\n"
"Filter[Filter(<lambda>)]\n"
Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richardliaw should this be verbose mode?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @iamjustinhsu, maybe we can in pick up #57798 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean to combine the PRs? I think we should keep these separate because they serve different purposes, although merge conflicts will be a bit messy.

"+- MapRows[Map(<lambda>)]\n"
" +- Read[ReadRange]\n"
"\n-------- Logical Plan (Optimized) --------\n"
"Filter[Filter(<lambda>)]\n"
"+- MapRows[Map(<lambda>)]\n"
" +- Read[ReadRange]\n"
"\n-------- Physical Plan --------\n"
"TaskPoolMapOperator[Filter(<lambda>)]\n"
"+- TaskPoolMapOperator[Map(<lambda>)]\n"
" +- TaskPoolMapOperator[ReadRange]\n"
" +- InputDataBuffer[Input]\n"
"\n-------- Physical Plan (Optimized) --------\n"
"TaskPoolMapOperator[ReadRange->Map(<lambda>)->Filter(<lambda>)]\n"
"+- InputDataBuffer[Input]"
)
ds = ds.random_shuffle().map(lambda x: x)
ds.explain()
captured = capsys.readouterr()
assert captured.out.rstrip() == (
assert captured.out.strip() == (
"-------- Logical Plan --------\n"
"Map(<lambda>)\n"
"+- RandomShuffle\n"
" +- Filter(<lambda>)\n"
" +- Map(<lambda>)\n"
" +- ReadRange\n"
"-------- Physical Plan --------\n"
"MapRows[Map(<lambda>)]\n"
"+- RandomShuffle[RandomShuffle]\n"
" +- Filter[Filter(<lambda>)]\n"
" +- MapRows[Map(<lambda>)]\n"
" +- Read[ReadRange]\n"
"\n-------- Logical Plan (Optimized) --------\n"
"MapRows[Map(<lambda>)]\n"
"+- RandomShuffle[RandomShuffle]\n"
" +- Filter[Filter(<lambda>)]\n"
" +- MapRows[Map(<lambda>)]\n"
" +- Read[ReadRange]\n"
"\n-------- Physical Plan --------\n"
"TaskPoolMapOperator[Map(<lambda>)]\n"
"+- AllToAllOperator[RandomShuffle]\n"
" +- TaskPoolMapOperator[Filter(<lambda>)]\n"
" +- TaskPoolMapOperator[Map(<lambda>)]\n"
" +- TaskPoolMapOperator[ReadRange]\n"
" +- InputDataBuffer[Input]\n"
"\n-------- Physical Plan (Optimized) --------\n"
"TaskPoolMapOperator[Map(<lambda>)]\n"
"+- AllToAllOperator[ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle]\n"
" +- InputDataBuffer[Input]"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import sys
from typing import Any, Dict, List

import pandas as pd
import pytest

import ray
from ray.data import Dataset
from ray.data._internal.logical.interfaces import Plan
from ray.data.block import BlockMetadata
from ray.data.datasource import Datasource
from ray.data.datasource.datasource import ReadTask
Expand All @@ -12,9 +15,9 @@


def _check_valid_plan_and_result(
ds,
expected_plan,
expected_result,
ds: Dataset,
expected_plan: Plan,
expected_result: List[Dict[str, Any]],
expected_physical_plan_ops=None,
):
assert ds.take_all() == expected_result
Expand Down
11 changes: 8 additions & 3 deletions python/ray/data/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,9 +617,14 @@ def test_projection_pushdown_non_partitioned(ray_start_regular_shared, temp_dir)

assert ds._plan.explain().strip() == (
"-------- Logical Plan --------\n"
"Project\n"
"+- ReadParquet\n"
"-------- Physical Plan --------\n"
"Project[Project]\n"
"+- Read[ReadParquet]\n"
"\n-------- Logical Plan (Optimized) --------\n"
"Read[ReadParquet]\n"
"\n-------- Physical Plan --------\n"
"TaskPoolMapOperator[ReadParquet]\n"
"+- InputDataBuffer[Input]\n"
"\n-------- Physical Plan (Optimized) --------\n"
"TaskPoolMapOperator[ReadParquet]\n"
"+- InputDataBuffer[Input]"
)
Expand Down