[Data] Handle prefetches buffering in iter_batches#58657
Conversation
d3bfaf0 to
35dba95
Compare
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
71ef8c7 to
62ada1d
Compare
Microbenchmark for prefetches in iter_batches
Before After Benchmark script |
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request addresses an issue with buffering in iter_batches to better align with prefetching, aiming to provide more predictable latency. The changes introduce buffering in _iter_batches and _format_in_threadpool by setting the queue depth in make_async_gen based on the prefetch count. Additionally, the number of workers in the formatting threadpool is now capped to a default of 4 to optimize for latency.
The implementation looks solid and correctly applies the described fixes. I've found a minor but important typo in a newly introduced constant and its corresponding environment variable, which should be corrected before merging. My review includes suggestions to fix this.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
> Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Handle prefetches buffering in iter_batches **Requirements** - Consumer of `iter_batches` requires predictable latency on batches, though overall Data Pipeline is optimized for maximizing throughput and utilization. - Essentially prefetching is used by the Consumer to impedance match Latency requirement with the Throughput optimized datapipeline. **Issue** - Queuing/buffering was not set up in `_iter_batches` to match prefetching. - Multiple Workers were set up for `_format_in_threadpool` as f(prefetch_count) which adds to latency variance on `_iter_batches`. **Fix** - In `_iter_batches`, set up queue depth for buffering in `make_async_gen` to honor prefetching. - In `_format_in_threadpool`, restrict to maximum of 4 workers by default, so as to optimize for `iter_batches` latency. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Co-authored-by: Hao Chen <chenh1024@gmail.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
> Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Handle prefetches buffering in iter_batches **Requirements** - Consumer of `iter_batches` requires predictable latency on batches, though overall Data Pipeline is optimized for maximizing throughput and utilization. - Essentially prefetching is used by the Consumer to impedance match Latency requirement with the Throughput optimized datapipeline. **Issue** - Queuing/buffering was not set up in `_iter_batches` to match prefetching. - Multiple Workers were set up for `_format_in_threadpool` as f(prefetch_count) which adds to latency variance on `_iter_batches`. **Fix** - In `_iter_batches`, set up queue depth for buffering in `make_async_gen` to honor prefetching. - In `_format_in_threadpool`, restrict to maximum of 4 workers by default, so as to optimize for `iter_batches` latency. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Co-authored-by: Hao Chen <chenh1024@gmail.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
> Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Handle prefetches buffering in iter_batches **Requirements** - Consumer of `iter_batches` requires predictable latency on batches, though overall Data Pipeline is optimized for maximizing throughput and utilization. - Essentially prefetching is used by the Consumer to impedance match Latency requirement with the Throughput optimized datapipeline. **Issue** - Queuing/buffering was not set up in `_iter_batches` to match prefetching. - Multiple Workers were set up for `_format_in_threadpool` as f(prefetch_count) which adds to latency variance on `_iter_batches`. **Fix** - In `_iter_batches`, set up queue depth for buffering in `make_async_gen` to honor prefetching. - In `_format_in_threadpool`, restrict to maximum of 4 workers by default, so as to optimize for `iter_batches` latency. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Co-authored-by: Hao Chen <chenh1024@gmail.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
> Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Handle prefetches buffering in iter_batches **Requirements** - Consumer of `iter_batches` requires predictable latency on batches, though overall Data Pipeline is optimized for maximizing throughput and utilization. - Essentially prefetching is used by the Consumer to impedance match Latency requirement with the Throughput optimized datapipeline. **Issue** - Queuing/buffering was not set up in `_iter_batches` to match prefetching. - Multiple Workers were set up for `_format_in_threadpool` as f(prefetch_count) which adds to latency variance on `_iter_batches`. **Fix** - In `_iter_batches`, set up queue depth for buffering in `make_async_gen` to honor prefetching. - In `_format_in_threadpool`, restrict to maximum of 4 workers by default, so as to optimize for `iter_batches` latency. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Co-authored-by: Hao Chen <chenh1024@gmail.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
[Data] Handle prefetches buffering in iter_batches
Requirements
iter_batchesrequires predictable latency on batches, though overall Data Pipeline is optimized for maximizing throughput and utilization.Issue
_iter_batchesto match prefetching._format_in_threadpoolas f(prefetch_count) which adds to latency variance on_iter_batches.Fix
_iter_batches, set up queue depth for buffering inmake_async_gento honor prefetching._format_in_threadpool, restrict to maximum of 4 workers by default, so as to optimize foriter_batcheslatency.Related issues
Additional information