[Data] Parallelize DefaultCollateFn - arrow_batch_to_tensors#58821
[Data] Parallelize DefaultCollateFn - arrow_batch_to_tensors#58821
Conversation
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces parallelization to arrow_batch_to_tensors in DefaultCollateFn to improve performance. The changes leverage make_async_gen for parallel tensor conversion, which is a good approach for this CPU-bound task. The implementation correctly handles both combine_chunks and non-combine_chunks cases. The code is well-structured. I have a few suggestions to further improve efficiency and code clarity.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
|
Train release tests: https://buildkite.com/ray-project/release/builds/68576 |
Parallelization MicrobenchmarkGetting around 1.8X with 4 workers. Result Benchmark |
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces parallel processing to DefaultCollateFn by using a ThreadPoolExecutor within arrow_batch_to_tensors. The changes look promising for improving performance. I've identified a few issues: a type inconsistency in the return value of arrow_batch_to_tensors, a bug in CustomArrowCollateFn when running on GPU, and a weakness in the new test case. Addressing these will improve the correctness and robustness of the implementation.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces parallel processing to arrow_batch_to_tensors using a ThreadPoolExecutor, which should speed up the DefaultCollateFn. The changes are well-structured, with parallel paths for both combine_chunks=True and combine_chunks=False scenarios. The implementation also includes a bug fix for handling None devices and adds comprehensive tests to ensure correctness.
My feedback focuses on a few areas for improvement:
- Using
threadpool.mapfor more concise and idiomatic code when submitting tasks. - Discussing the use of
__del__for thread pool cleanup, which can be unreliable, and suggesting alternatives or at least adding comments to explain the choice.
Overall, this is a solid improvement to the data loading performance.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces parallel processing to DefaultCollateFn by using a ThreadPoolExecutor within arrow_batch_to_tensors. This is a great enhancement for performance, especially for datasets with many columns or chunks. The implementation looks solid, and the addition of a correctness test is much appreciated.
My main feedback is regarding code duplication in the benchmark file release/train_tests/benchmark/image_classification/factory.py. The CustomArrowCollateFn duplicates a significant amount of logic from DefaultCollateFn. I've suggested a refactoring to use inheritance, which will make the code cleaner and easier to maintain. I've also included a related comment to update the imports accordingly.
| from ray.data.collate_fn import ArrowBatchCollateFn, CollateFn | ||
| from concurrent.futures import ThreadPoolExecutor | ||
| from ray.data.dataset import TorchDeviceType |
There was a problem hiding this comment.
Following the refactoring of CustomArrowCollateFn to inherit from DefaultCollateFn, the imports should be updated. ArrowBatchCollateFn and ThreadPoolExecutor are no longer needed, and DefaultCollateFn should be imported instead.
| from ray.data.collate_fn import ArrowBatchCollateFn, CollateFn | |
| from concurrent.futures import ThreadPoolExecutor | |
| from ray.data.dataset import TorchDeviceType | |
| from ray.data.collate_fn import CollateFn, DefaultCollateFn | |
| from ray.data.dataset import TorchDeviceType |
|
/gemini review |
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
Parallelization MicrobenchmarkJust with 2 columns, 4 threads look better than 0 thread(s). Result Benchmark |
…ject#58821) > Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Parallelize DefaultCollateFn - arrow_batch_to_tensors In `arrow_batch_to_tensors`, use `make_async_gen` to set up multiple workers to speed up processing of columns for Tensor conversion in `convert_ndarray_to_torch_tensor`, so `DefaultCollateFn` can be speedup. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
…ject#58821) > Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Parallelize DefaultCollateFn - arrow_batch_to_tensors In `arrow_batch_to_tensors`, use `make_async_gen` to set up multiple workers to speed up processing of columns for Tensor conversion in `convert_ndarray_to_torch_tensor`, so `DefaultCollateFn` can be speedup. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
…ject#58821) > Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Parallelize DefaultCollateFn - arrow_batch_to_tensors In `arrow_batch_to_tensors`, use `make_async_gen` to set up multiple workers to speed up processing of columns for Tensor conversion in `convert_ndarray_to_torch_tensor`, so `DefaultCollateFn` can be speedup. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
[Data] Parallelize DefaultCollateFn - arrow_batch_to_tensors
In
arrow_batch_to_tensors, usemake_async_gento set up multiple workers to speed up processing of columns for Tensor conversion inconvert_ndarray_to_torch_tensor, soDefaultCollateFncan be speedup.Related issues
Additional information