[Serve] add support for custom batch size function#59059
Conversation
Signed-off-by: abrar <abrar@anyscale.com>
python/ray/serve/batching.py
Outdated
| # Put deferred item back in queue for next batch | ||
| if deferred_item is not None: | ||
| self.queue.put_nowait(deferred_item) |
There was a problem hiding this comment.
this would put the request it in the back of the queue correct?
Signed-off-by: abrar <abrar@anyscale.com>
python/ray/serve/batching.py
Outdated
| batch.append(self.queue.get_nowait()) | ||
|
|
||
| # Put deferred item back in queue for next batch | ||
| if deferred_item is not None: |
There was a problem hiding this comment.
nit: we can put this block after
while not self.queue.empty():
next_item = self.queue.get_nowait()
# Temporarily add to check size
batch.append(next_item)
new_size = self._compute_batch_size(batch)
if new_size > max_batch_size:
# Would exceed limit, remove it and save for later
batch.pop()
deferred_item = next_item
break
# Size is OK, keep it in the batch (already added above)
Signed-off-by: abrar <abrar@anyscale.com>
| args, kwargs = recover_args(request.flattened_args) | ||
| # The batch function expects a single positional argument (the item) | ||
| # after 'self' has been extracted (if it was a method) | ||
| items.append(args[0]) |
There was a problem hiding this comment.
Bug: Potential IndexError when using keyword arguments with batch_size_fn
The _compute_batch_size method assumes that requests are always passed as positional arguments, accessing args[0] without checking if args is empty. If a user defines a batch method with keyword-only parameters (e.g., async def handle_batch(self, *, request)) and calls it with keyword arguments, recover_args will return an empty args list, causing an IndexError: list index out of range. This would result in a confusing error message rather than a clear explanation that batch_size_fn requires the batched argument to be passed positionally.
fixes ray-project#58956 - [x] update documentation - [x] add tests - [x] add code examples and link to docs - [x] compare performance with `len(batch)` baseline ### script ```python from ray import serve from typing import List @serve.deployment(max_ongoing_requests=200) class BatchSizeFnExample: @serve.batch( max_batch_size=50, batch_wait_timeout_s=0.5, # batch_size_fn=lambda items: len(items), ) async def handle_batch(self, requests: List): return [req["value"] * 2 for req in requests] async def __call__(self, request): body = await request.json() return await self.handle_batch(body) app = BatchSizeFnExample.bind() ``` ### load test ```bash # First, create a file with the JSON payload echo '{"value": 5}' > /tmp/post_data.json # Run Apache Bench: 1000 requests, 50 concurrent connections ab -n 1000 -c 100 -p /tmp/post_data.json -T "application/json" http://localhost:8000/ ``` ### results ``` batch size 10 master Requests per second: 317.68 [#/sec] (mean) Time per request: 314.780 [ms] (mean) Time per request: 3.148 [ms] (mean, across all concurrent requests) PR Requests per second: 307.80 [#/sec] (mean) Time per request: 324.891 [ms] (mean) Time per request: 3.249 [ms] (mean, across all concurrent requests) batch size 50 master Requests per second: 328.21 [#/sec] (mean) Time per request: 304.684 [ms] (mean) Time per request: 3.047 [ms] (mean, across all concurrent requests) pr Requests per second: 329.03 [#/sec] (mean) Time per request: 303.922 [ms] (mean) Time per request: 3.039 [ms] (mean, across all concurrent requests) ``` --------- Signed-off-by: abrar <abrar@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
fixes #58956
len(batch)baselinescript
load test
results