[Data] Make zip operator accept multiple input#56524
[Data] Make zip operator accept multiple input#56524richardliaw merged 8 commits intoray-project:masterfrom
Conversation
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
|
@richardliaw @gvspraveen @alexeykudinkin PTAL, thanks! |
|
Thanks for your contribution! Overall the first change looks good. Just a few minor comments. |
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
…rable dataset size Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
| python/ray/data/_internal/execution/operators/zip_operator.py | ||
| DOC101: Method `ZipOperator.__init__`: Docstring contains fewer arguments than in function signature. | ||
| DOC103: Method `ZipOperator.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [data_context: DataContext, left_input_op: PhysicalOperator]. Arguments in the docstring but not in the function signature: [left_input_ops: ]. | ||
| DOC103: Method `ZipOperator.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [*input_ops: PhysicalOperator, data_context: DataContext]. Arguments in the docstring but not in the function signature: [input_ops: ]. |
There was a problem hiding this comment.
typically when there are changes to the baseline, we would want to fix it. Is this possible to fix or is this a bug with pydoc linting?
There was a problem hiding this comment.
Yeah I am aware of this, but I thought this is intended? Other operator does not have data_context in its doc
e.g. UnionOperator
def __init__(
self,
data_context: DataContext,
*input_ops: PhysicalOperator,
):
"""Create a UnionOperator.
Args:
input_ops: Operators generating input data for this operator to union.
"""There was a problem hiding this comment.
ah in the future we can just add data_context into the doc, i think that's a good thing to fix
| if num_outputs is None: | ||
| num_outputs = input_num_outputs | ||
| else: | ||
| num_outputs = max(num_outputs, input_num_outputs) |
There was a problem hiding this comment.
This should be min, not max
There was a problem hiding this comment.
Let's make sure this is covered with tests also
There was a problem hiding this comment.
Thanks! On second thought, neither max nor min seems accurate, right? Since the number of input blocks for each output should be the same (to perform a zip), and we already assert:
total_left_rows = sum(left_block_rows)
total_right_rows = sum(right_block_rows)
if total_left_rows != total_right_rows:
raise ValueError(
"Cannot zip datasets of different number of rows: "
f"{total_left_rows}, {total_right_rows}"
)Maybe we don't actually need to calculate num_outputs here?
Correct me if I'm wrong, thanks!
There was a problem hiding this comment.
Even if we use min for the number of output rows right now, this logic will need to change when user-directed dropping or padding is introduced.
Padding would require using max, while dropping would use min, so the calculating number of rows here is redundant.
Discussed with @gvspraveen offline.
| if num_rows is None: | ||
| num_rows = input_num_rows | ||
| else: | ||
| num_rows = max(num_rows, input_num_rows) |
| num_outputs = input.estimated_num_outputs() | ||
| if num_outputs is None: | ||
| return None | ||
| total_num_outputs = max(total_num_outputs, num_outputs) |
…rator-accept-multiple-input
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.
Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>
```
<!-- Please give a short summary of the change and the problem this
solves. -->
## Related issue number
#56504
<!-- For example: "Closes #1234" -->
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.
Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>
```
<!-- Please give a short summary of the change and the problem this
solves. -->
## Related issue number
#56504
<!-- For example: "Closes #1234" -->
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.
Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>
```
<!-- Please give a short summary of the change and the problem this
solves. -->
## Related issue number
ray-project#56504
<!-- For example: "Closes ray-project#1234" -->
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Original PR #56524 by owenowenisme Original: ray-project/ray#56524
Merged from original PR #56524 Original: ray-project/ray#56524
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.
Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>
```
<!-- Please give a short summary of the change and the problem this
solves. -->
## Related issue number
ray-project#56504
<!-- For example: "Closes ray-project#1234" -->
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.
Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>
```
<!-- Please give a short summary of the change and the problem this
solves. -->
## Related issue number
ray-project#56504
<!-- For example: "Closes ray-project#1234" -->
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Why are these changes needed?
Before making zip operator a streaming operator, we make it accept multiple input first.
Now Zip operator can be used with
Related issue number
#56504
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.