Skip to content

Commit 55c601d

Browse files
committed
Expose logical rules via package exports
Signed-off-by: 400Ping <jiekai.chang326@gmail.com>
1 parent 9f2f92f commit 55c601d

13 files changed

+86
-30
lines changed

python/ray/data/_internal/logical/optimizers.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,17 @@
88
Plan,
99
Rule,
1010
)
11-
from ray.data._internal.logical.rules.combine_shuffles import CombineShuffles
12-
from ray.data._internal.logical.rules.configure_map_task_memory import (
11+
from ray.data._internal.logical.rules import (
12+
CombineShuffles,
1313
ConfigureMapTaskMemoryUsingOutputSize,
14-
)
15-
from ray.data._internal.logical.rules.inherit_batch_format import InheritBatchFormatRule
16-
from ray.data._internal.logical.rules.inherit_target_max_block_size import (
14+
FuseOperators,
15+
InheritBatchFormatRule,
1716
InheritTargetMaxBlockSizeRule,
17+
LimitPushdownRule,
18+
PredicatePushdown,
19+
ProjectionPushdown,
20+
SetReadParallelismRule,
1821
)
19-
from ray.data._internal.logical.rules.limit_pushdown import LimitPushdownRule
20-
from ray.data._internal.logical.rules.operator_fusion import FuseOperators
21-
from ray.data._internal.logical.rules.predicate_pushdown import PredicatePushdown
22-
from ray.data._internal.logical.rules.projection_pushdown import ProjectionPushdown
23-
from ray.data._internal.logical.rules.set_read_parallelism import SetReadParallelismRule
2422
from ray.util.annotations import DeveloperAPI
2523

2624
_LOGICAL_RULESET = Ruleset(
Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,22 @@
1-
from ray.data._internal.logical.rules.operator_fusion import FuseOperators
1+
"""Expose rule classes in ray.data._internal.logical.rules.
22
3-
__all__ = ["FuseOperators"]
3+
This module dynamically imports each submodule in the package and re-exports
4+
names listed in the submodule's __all__ attribute. This allows callers to
5+
import rules directly from ray.data._internal.logical.rules.
6+
"""
7+
8+
from importlib import import_module
9+
from pkgutil import iter_modules
10+
11+
__all__ = []
12+
13+
for _loader, _module_name, _is_pkg in iter_modules(__path__):
14+
if _module_name.startswith("_"):
15+
continue
16+
_module = import_module(f"{__name__}.{_module_name}")
17+
_exported_names = getattr(_module, "__all__", [])
18+
for _name in _exported_names:
19+
globals()[_name] = getattr(_module, _name)
20+
__all__.extend(_exported_names)
21+
22+
__all__.sort()

python/ray/data/_internal/logical/rules/combine_shuffles.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@
44
Plan,
55
Rule,
66
)
7-
from ray.data._internal.logical.operators.all_to_all_operator import (
7+
from ray.data._internal.logical.operators import (
88
Aggregate,
99
Repartition,
1010
Sort,
11+
StreamingRepartition,
1112
)
12-
from ray.data._internal.logical.operators.map_operator import StreamingRepartition
13+
14+
__all__ = [
15+
"CombineShuffles",
16+
]
1317

1418

1519
class CombineShuffles(Rule):

python/ray/data/_internal/logical/rules/configure_map_task_memory.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77
from ray.data._internal.logical.interfaces.physical_plan import PhysicalPlan
88
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
99

10+
__all__ = [
11+
"ConfigureMapTaskMemoryRule",
12+
"ConfigureMapTaskMemoryUsingOutputSize",
13+
]
14+
1015

1116
class ConfigureMapTaskMemoryRule(Rule, abc.ABC):
1217
def apply(self, plan: PhysicalPlan) -> PhysicalPlan:

python/ray/data/_internal/logical/rules/inherit_batch_format.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22
from typing import Iterable
33

44
from ray.data._internal.logical.interfaces import LogicalOperator, LogicalPlan, Rule
5-
from ray.data._internal.logical.operators.all_to_all_operator import AbstractAllToAll
6-
from ray.data._internal.logical.operators.map_operator import MapBatches
5+
from ray.data._internal.logical.operators import AbstractAllToAll, MapBatches
6+
7+
__all__ = [
8+
"InheritBatchFormatRule",
9+
]
710

811

912
class InheritBatchFormatRule(Rule):

python/ray/data/_internal/logical/rules/inherit_target_max_block_size.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
from ray.data._internal.execution.interfaces import PhysicalOperator
44
from ray.data._internal.logical.interfaces import PhysicalPlan, Rule
55

6+
__all__ = [
7+
"InheritTargetMaxBlockSizeRule",
8+
]
9+
610

711
class InheritTargetMaxBlockSizeRule(Rule):
812
"""For each op that has overridden the default target max block size,

python/ray/data/_internal/logical/rules/limit_pushdown.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,18 @@
33
from typing import List
44

55
from ray.data._internal.logical.interfaces import LogicalOperator, LogicalPlan, Rule
6-
from ray.data._internal.logical.operators.map_operator import AbstractMap
7-
from ray.data._internal.logical.operators.n_ary_operator import Union
8-
from ray.data._internal.logical.operators.one_to_one_operator import (
6+
from ray.data._internal.logical.operators import (
7+
AbstractMap,
98
AbstractOneToOne,
109
Limit,
10+
Union,
1111
)
1212

13+
__all__ = [
14+
"LimitPushdownRule",
15+
]
16+
17+
1318
logger = logging.getLogger(__name__)
1419

1520

python/ray/data/_internal/logical/rules/operator_fusion.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,24 @@
2626
TaskPoolMapOperator,
2727
)
2828
from ray.data._internal.logical.interfaces import PhysicalPlan, Rule
29-
from ray.data._internal.logical.operators.all_to_all_operator import (
29+
from ray.data._internal.logical.operators import (
3030
AbstractAllToAll,
31-
RandomShuffle,
32-
Repartition,
33-
)
34-
from ray.data._internal.logical.operators.map_operator import (
3531
AbstractMap,
3632
AbstractUDFMap,
3733
MapBatches,
34+
RandomShuffle,
35+
Repartition,
3836
StreamingRepartition,
3937
)
4038
from ray.data._internal.streaming_repartition import StreamingRepartitionRefBundler
4139
from ray.util.annotations import DeveloperAPI
4240

41+
__all__ = [
42+
"FuseOperators",
43+
"are_remote_args_compatible",
44+
]
45+
46+
4347
# Scheduling strategy can be inherited from upstream operator if not specified.
4448
INHERITABLE_REMOTE_ARGS = ["scheduling_strategy"]
4549

python/ray/data/_internal/logical/rules/predicate_pushdown.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,16 @@
99
PredicatePassThroughBehavior,
1010
Rule,
1111
)
12-
from ray.data._internal.logical.operators.map_operator import Filter, Project
12+
from ray.data._internal.logical.operators import Filter, Project
1313
from ray.data._internal.planner.plan_expression.expression_visitors import (
1414
_ColumnSubstitutionVisitor,
1515
)
1616
from ray.data.expressions import Expr, col
1717

18+
__all__ = [
19+
"PredicatePushdown",
20+
]
21+
1822

1923
class PredicatePushdown(Rule):
2024
"""Pushes down predicates across the graph.

python/ray/data/_internal/logical/rules/projection_pushdown.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
LogicalPlan,
77
Rule,
88
)
9-
from ray.data._internal.logical.operators.map_operator import Project
9+
from ray.data._internal.logical.operators import Project
1010
from ray.data._internal.planner.plan_expression.expression_visitors import (
1111
_ColumnReferenceCollector,
1212
_ColumnSubstitutionVisitor,
@@ -19,6 +19,10 @@
1919
StarExpr,
2020
)
2121

22+
__all__ = [
23+
"ProjectionPushdown",
24+
]
25+
2226

2327
def _collect_referenced_columns(exprs: List[Expr]) -> Optional[List[str]]:
2428
"""

0 commit comments

Comments
 (0)