[data/docs] Add more education around transformations#59415
[data/docs] Add more education around transformations#59415richardliaw merged 9 commits intoray-project:masterfrom
Conversation
Signed-off-by: Richard Liaw <rliaw@berkeley.edu>
There was a problem hiding this comment.
Code Review
This pull request significantly enhances the documentation for data transformations in Ray Data. It adds new sections on expressions, resource configuration, async UDFs, and distributed UDFs with placement groups. The changes improve clarity and provide more comprehensive examples. I've identified a few areas in the new documentation that could be improved for correctness and clarity, including an incorrect statement about ActorPoolStrategy, a confusing example for TaskPoolStrategy, a minor phrasing issue, and a missing import in a code snippet. Overall, these are great additions to the documentation.
| You can specify the concurrency of the transformation by using the ``compute`` parameter. | ||
|
|
||
| For functions, use ``compute=ray.data.TaskPoolStrategy(size=n)`` to cap the number of concurrent tasks. By default, Ray Data will automatically determine the number of concurrent tasks. | ||
| For classes, use ``compute=ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers. Currently, this is required to be specified. |
There was a problem hiding this comment.
The statement "Currently, this is required to be specified" is incorrect. If compute is not specified for a class-based transform, Ray Data defaults to an autoscaling actor pool (ActorPoolStrategy()). It's good practice to specify it for resource control, but it's not strictly required. Please clarify this to avoid confusion.
| For classes, use ``compute=ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers. Currently, this is required to be specified. | |
| For classes, use ``compute=ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers. If ``compute`` is not specified, an autoscaling actor pool is used by default. |
There was a problem hiding this comment.
We added a default in some recent version
| ds = ray.data.range(10).map_batches(lambda x: x * 2, compute=ray.data.TaskPoolStrategy(size=2)) | ||
| ds.take_all() | ||
|
|
||
| .. testoutput:: | ||
| :options: +MOCK | ||
|
|
||
| [{'id': 0}, {'id': 1}, {'id': 2}, {'id': 3}, {'id': 4}, {'id': 5}, {'id': 6}, {'id': 7}, {'id': 8}, {'id': 9}] |
There was a problem hiding this comment.
The example for TaskPoolStrategy seems incorrect. The lambda lambda x: x * 2 will raise a TypeError when applied to a batch, which is a dictionary of numpy arrays. Also, the testoutput shows the original data, not the transformed data.
To make this a useful and correct example, I suggest updating the lambda to perform a valid transformation and updating the output to reflect that transformation.
| ds = ray.data.range(10).map_batches(lambda x: x * 2, compute=ray.data.TaskPoolStrategy(size=2)) | |
| ds.take_all() | |
| .. testoutput:: | |
| :options: +MOCK | |
| [{'id': 0}, {'id': 1}, {'id': 2}, {'id': 3}, {'id': 4}, {'id': 5}, {'id': 6}, {'id': 7}, {'id': 8}, {'id': 9}] | |
| ds = ray.data.range(10).map_batches(lambda batch: {"id": batch["id"] * 2}, compute=ray.data.TaskPoolStrategy(size=2)) | |
| ds.take_all() | |
| .. testoutput:: | |
| :options: +MOCK | |
| [{'id': 0}, {'id': 2}, {'id': 4}, {'id': 6}, {'id': 8}, {'id': 10}, {'id': 12}, {'id': 14}, {'id': 16}, {'id': 18}] |
| Advanced: Distributed UDFs with Placement Groups | ||
| ================================================ | ||
|
|
||
| While all transformations are automatically parallelized across your Ray cluster, often times these transformations themselves can themselves be distributed. For example, if you're using |
There was a problem hiding this comment.
The phrasing "often times these transformations themselves can themselves be distributed" is a bit repetitive. Consider rephrasing for clarity.
| While all transformations are automatically parallelized across your Ray cluster, often times these transformations themselves can themselves be distributed. For example, if you're using | |
| While all transformations are automatically parallelized across your Ray cluster, these transformations can often be distributed themselves. For example, if you're using |
| from ray.data.expressions import col, udf | ||
| import pyarrow as pa | ||
| import pyarrow.compute as pc | ||
| import ray |
There was a problem hiding this comment.
The example for udf in expressions uses DataType but doesn't import it. This will cause a NameError if a user tries to run this code. Please add the necessary import.
| from ray.data.expressions import col, udf | |
| import pyarrow as pa | |
| import pyarrow.compute as pc | |
| import ray | |
| from ray.data.expressions import col, udf | |
| from ray.data.datatype import DataType | |
| import pyarrow as pa | |
| import pyarrow.compute as pc | |
| import ray |
Signed-off-by: Richard Liaw <rliaw@berkeley.edu>
doc/source/data/key-concepts.rst
Outdated
| ------------------------- | ||
|
|
||
| Ray Data uses a *streaming execution model* to efficiently process large datasets. | ||
| Ray Data can leverage a *streaming execution model* to efficiently process large datasets. |
There was a problem hiding this comment.
Nit: when do we not leverage a streaming execution model? Don't we always leverage it?
There was a problem hiding this comment.
updated the wording; basically when you have read -> shuffle -> write, it isn't really "streaming"
| You can specify the concurrency of the transformation by using the ``compute`` parameter. | ||
|
|
||
| For functions, use ``compute=ray.data.TaskPoolStrategy(size=n)`` to cap the number of concurrent tasks. By default, Ray Data will automatically determine the number of concurrent tasks. | ||
| For classes, use ``compute=ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers. Currently, this is required to be specified. |
| You can specify the concurrency of the transformation by using the ``compute`` parameter. | ||
|
|
||
| For functions, use ``compute=ray.data.TaskPoolStrategy(size=n)`` to cap the number of concurrent tasks. By default, Ray Data will automatically determine the number of concurrent tasks. | ||
| For classes, use ``compute=ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers. Currently, this is required to be specified. |
There was a problem hiding this comment.
We added a default in some recent version
| {'id': 9}] | ||
|
|
||
|
|
||
| Expressions (Alpha) |
There was a problem hiding this comment.
Should this be above the "Advanced" stuff? Feel like this is more applicable to most users
There was a problem hiding this comment.
Right now since it's early, I'm putting it lower. I think in the next release once we flesh this out more it'll go much higher.
Adds documentation around
And also refine text around key concepts.