|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import inspect |
| 4 | +from typing import Any, Generic, Optional, Protocol, Type, ParamSpec |
| 5 | + |
| 6 | +import pandas as pd |
| 7 | + |
| 8 | +from dataflow.core.operator import OperatorABC |
| 9 | +from dataflow.utils.storage import DataFlowStorage |
| 10 | + |
| 11 | +from .memory_storage import InMemoryStorage |
| 12 | + |
| 13 | + |
| 14 | +_INITP = ParamSpec("_INITP") |
| 15 | +_RUNP = ParamSpec("_RUNP") |
| 16 | + |
| 17 | + |
| 18 | +class _OperatorProto(Protocol[_INITP, _RUNP]): |
| 19 | + """Structural type that captures both ``__init__`` and ``run`` signatures. |
| 20 | +
|
| 21 | + Pyright / Pylance infers ``_INITP`` and ``_RUNP`` from the concrete |
| 22 | + operator so that :meth:`op_cls_init` and :meth:`run` expose the |
| 23 | + original parameter lists for IDE auto-complete. |
| 24 | + """ |
| 25 | + |
| 26 | + def __init__(self, *args: _INITP.args, **kwargs: _INITP.kwargs) -> None: ... |
| 27 | + |
| 28 | + def run( |
| 29 | + self, |
| 30 | + storage: DataFlowStorage, |
| 31 | + *args: _RUNP.args, |
| 32 | + **kwargs: _RUNP.kwargs, |
| 33 | + ) -> Any: ... |
| 34 | + |
| 35 | + |
| 36 | +class _OpRunner: |
| 37 | + """Actor-side worker: each replica holds an independent operator instance. |
| 38 | +
|
| 39 | + Receives a chunk of records (``list[dict]``), wraps it in |
| 40 | + :class:`InMemoryStorage`, delegates to the DataFlow operator's ``run``, |
| 41 | + and returns the result as ``list[dict]``. |
| 42 | + """ |
| 43 | + |
| 44 | + def __init__(self, op_cls: type, op_init_args: tuple, op_init_kwargs: dict): |
| 45 | + self.op = op_cls(*op_init_args, **op_init_kwargs) |
| 46 | + |
| 47 | + def run(self, records: list[dict], run_params: dict) -> list[dict]: |
| 48 | + if not records: |
| 49 | + return [] |
| 50 | + df = pd.DataFrame(records) |
| 51 | + storage = InMemoryStorage(df) |
| 52 | + self.op.run(storage, *run_params.get("args", ()), **run_params.get("kwargs", {})) |
| 53 | + return storage.result.to_dict("records") |
| 54 | + |
| 55 | + |
| 56 | +class RayAcceleratedOperator(OperatorABC, Generic[_INITP, _RUNP]): |
| 57 | + """DataFlow operator backed by RayOrch for transparent data-parallel execution. |
| 58 | +
|
| 59 | + From the pipeline's perspective this is a normal :class:`OperatorABC`: |
| 60 | + it reads from and writes to :class:`DataFlowStorage` sequentially. |
| 61 | + Internally it fans the DataFrame out to *replicas* Ray actors, |
| 62 | + each holding an independent copy of the wrapped operator (and its model). |
| 63 | +
|
| 64 | + Actors are created **lazily** on the first ``run()`` call so that |
| 65 | + pipeline ``compile()`` does not trigger heavyweight model loading. |
| 66 | +
|
| 67 | + Only suitable for **row-independent (map-style)** operators. Operators |
| 68 | + that need cross-row global state (e.g. semantic dedup with a full |
| 69 | + similarity matrix) should *not* use this wrapper. |
| 70 | +
|
| 71 | + Both ``op_cls_init`` and ``run`` have their signatures inferred from |
| 72 | + ``op_cls`` via ``ParamSpec``, giving full IDE auto-complete. |
| 73 | +
|
| 74 | + Parameters |
| 75 | + ---------- |
| 76 | + op_cls: |
| 77 | + The DataFlow operator class to parallelize. |
| 78 | + replicas: |
| 79 | + Number of parallel actor replicas. |
| 80 | + num_gpus_per_replica: |
| 81 | + Fractional GPU allocation per replica (e.g. ``0.25`` to share one |
| 82 | + GPU across four replicas). |
| 83 | + env: |
| 84 | + Optional RayOrch ``EnvRegistry`` key for a custom ``runtime_env``. |
| 85 | +
|
| 86 | + Example |
| 87 | + ------- |
| 88 | + :: |
| 89 | +
|
| 90 | + from dataflow.rayorch import RayAcceleratedOperator |
| 91 | + from dataflow.operators.text_pt.eval import FineWebEduSampleEvaluator |
| 92 | +
|
| 93 | + scorer = RayAcceleratedOperator( |
| 94 | + FineWebEduSampleEvaluator, |
| 95 | + replicas=4, |
| 96 | + num_gpus_per_replica=0.25, |
| 97 | + ).op_cls_init(device="cuda") # ← IDE shows __init__ params |
| 98 | +
|
| 99 | + scorer.run(storage, input_key="text") # ← IDE shows run params |
| 100 | + """ |
| 101 | + |
| 102 | + def __init__( |
| 103 | + self, |
| 104 | + op_cls: Type[_OperatorProto[_INITP, _RUNP]], |
| 105 | + *, |
| 106 | + replicas: int = 1, |
| 107 | + num_gpus_per_replica: float = 0.0, |
| 108 | + env: Optional[str] = None, |
| 109 | + ): |
| 110 | + super().__init__() |
| 111 | + self._op_cls = op_cls |
| 112 | + self._op_init_args: tuple = () |
| 113 | + self._op_init_kwargs: dict = {} |
| 114 | + self._replicas = replicas |
| 115 | + self._num_gpus_per_replica = num_gpus_per_replica |
| 116 | + self._env = env |
| 117 | + self._module = None # created lazily |
| 118 | + |
| 119 | + # PipelineABC.compile() compatibility: |
| 120 | + # compile() → AutoOP uses inspect.signature(operator.run) to bind() |
| 121 | + # call arguments. Our class-level run(storage, *args, **kwargs) would |
| 122 | + # cause bind() to dump extra params into *args, which later gets |
| 123 | + # serialised as an "args" key and leaks into the inner operator on |
| 124 | + # _compiled_forward replay. Installing the inner operator's named |
| 125 | + # signature on the instance avoids this entirely. |
| 126 | + self._install_inner_run_signature(op_cls) |
| 127 | + |
| 128 | + def op_cls_init( |
| 129 | + self, |
| 130 | + *args: _INITP.args, |
| 131 | + **kwargs: _INITP.kwargs, |
| 132 | + ) -> RayAcceleratedOperator[_INITP, _RUNP]: |
| 133 | + """Configure how the wrapped operator is constructed inside each actor. |
| 134 | +
|
| 135 | + Parameters match ``op_cls.__init__``, so IDE auto-complete works. |
| 136 | + May be omitted if the operator's defaults are sufficient. |
| 137 | + """ |
| 138 | + self._op_init_args = args |
| 139 | + self._op_init_kwargs = kwargs |
| 140 | + return self |
| 141 | + |
| 142 | + def _ensure_initialized(self) -> None: |
| 143 | + if self._module is not None: |
| 144 | + return |
| 145 | + from rayorch import Dispatch, RayModule |
| 146 | + |
| 147 | + self._module = RayModule( |
| 148 | + _OpRunner, |
| 149 | + replicas=self._replicas, |
| 150 | + num_gpus_per_replica=self._num_gpus_per_replica, |
| 151 | + dispatch_mode=Dispatch.SHARD_CONTIGUOUS, |
| 152 | + env=self._env, |
| 153 | + ) |
| 154 | + self._module.pre_init( |
| 155 | + op_cls=self._op_cls, |
| 156 | + op_init_args=self._op_init_args, |
| 157 | + op_init_kwargs=self._op_init_kwargs, |
| 158 | + ) |
| 159 | + |
| 160 | + # --- inner signature propagation --- |
| 161 | + |
| 162 | + def _install_inner_run_signature(self, op_cls: type) -> None: |
| 163 | + """Replace ``self.run`` with a thin proxy carrying ``op_cls.run``'s |
| 164 | + ``__signature__``. |
| 165 | +
|
| 166 | + Why: ``PipelineABC.compile()`` → ``AutoOP`` uses |
| 167 | + ``inspect.signature(operator.run)`` to ``bind()`` the call arguments. |
| 168 | + If the signature is the generic ``(storage, *args, **kwargs)`` from |
| 169 | + this wrapper, positional-overflow values land in ``*args`` and get |
| 170 | + serialised as an ``"args"`` key in the kwargs dict. On replay via |
| 171 | + ``_compiled_forward(**kwargs)``, that ``"args"`` key leaks into the |
| 172 | + inner operator as an unexpected keyword argument. |
| 173 | +
|
| 174 | + By exposing the inner operator's **named** parameters here, |
| 175 | + ``bind()`` resolves every argument to a keyword — no ``*args`` |
| 176 | + residue, no downstream pollution. Only this file changes; DataFlow |
| 177 | + core is untouched. |
| 178 | + """ |
| 179 | + inner_sig = inspect.signature(op_cls.run) |
| 180 | + params = [p for p in inner_sig.parameters.values() if p.name != "self"] |
| 181 | + |
| 182 | + impl = self._run_impl |
| 183 | + |
| 184 | + def run(*args: Any, **kwargs: Any) -> None: |
| 185 | + return impl(*args, **kwargs) |
| 186 | + |
| 187 | + run.__signature__ = inspect.Signature(params) # type: ignore[attr-defined] |
| 188 | + run.__doc__ = getattr(op_cls.run, "__doc__", None) |
| 189 | + run.__name__ = "run" |
| 190 | + run.__qualname__ = f"{type(self).__qualname__}.run" |
| 191 | + self.run = run # type: ignore[assignment] |
| 192 | + |
| 193 | + # --- DataFlow OperatorABC interface --- |
| 194 | + # Two-level design for compile() compatibility: |
| 195 | + # 1. Class-level `run` — satisfies OperatorABC's abstract method so the |
| 196 | + # class can be instantiated. Delegates to `_run_impl`. |
| 197 | + # 2. Instance-level `run` (proxy) — installed by |
| 198 | + # `_install_inner_run_signature` in __init__, carries the inner |
| 199 | + # operator's __signature__ so AutoOP.bind() resolves args to keywords. |
| 200 | + # Python attribute lookup checks instance __dict__ before the class, |
| 201 | + # so the proxy always wins at runtime. |
| 202 | + |
| 203 | + def run( # type: ignore[override] |
| 204 | + self, |
| 205 | + storage: DataFlowStorage, |
| 206 | + *args: _RUNP.args, |
| 207 | + **kwargs: _RUNP.kwargs, |
| 208 | + ) -> None: |
| 209 | + return self._run_impl(storage, *args, **kwargs) |
| 210 | + |
| 211 | + def _run_impl( |
| 212 | + self, |
| 213 | + storage: DataFlowStorage, |
| 214 | + *args: _RUNP.args, |
| 215 | + **kwargs: _RUNP.kwargs, |
| 216 | + ) -> None: |
| 217 | + self._ensure_initialized() |
| 218 | + df = storage.read("dataframe") |
| 219 | + records: list[dict] = df.to_dict("records") |
| 220 | + run_params: dict = {"args": args, "kwargs": kwargs} |
| 221 | + result_records = self._module(records, run_params) |
| 222 | + storage.write(pd.DataFrame(result_records)) |
| 223 | + |
| 224 | + # --- lifecycle helpers --- |
| 225 | + |
| 226 | + def shutdown(self) -> None: |
| 227 | + """Terminate all Ray actors held by this operator.""" |
| 228 | + if self._module is None: |
| 229 | + return |
| 230 | + import ray |
| 231 | + |
| 232 | + for actor in self._module.actors: |
| 233 | + ray.kill(actor) |
| 234 | + self._module = None |
| 235 | + |
| 236 | + def __repr__(self) -> str: |
| 237 | + state = "initialized" if self._module is not None else "lazy" |
| 238 | + return ( |
| 239 | + f"RayAcceleratedOperator({self._op_cls.__name__}, " |
| 240 | + f"replicas={self._replicas}, state={state})" |
| 241 | + ) |
0 commit comments