Skip to content

Commit be232a9

Browse files
nrghoshpeterxcli
authored andcommitted
[Data][LLM] Remove DataContext overrides in Ray Data LLM Processor (ray-project#60142)
Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
1 parent e8f9ded commit be232a9

File tree

2 files changed

+98
-9
lines changed

2 files changed

+98
-9
lines changed

python/ray/llm/_internal/batch/processor/base.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
from pydantic import Field, field_validator, model_validator
66

7-
import ray
87
from ray.data import Dataset
98
from ray.data.block import UserDefinedFunction
109
from ray.llm._internal.batch.stages import (
@@ -331,14 +330,6 @@ def __init__(
331330
self.postprocess_map_kwargs = postprocess_map_kwargs or {}
332331
self.stages: OrderedDict[str, StatefulStage] = OrderedDict()
333332

334-
# FIXES: https://github.com/ray-project/ray/issues/53124
335-
# TODO (Kourosh): Remove this once the issue is fixed
336-
data_context = ray.data.DataContext.get_current()
337-
data_context.wait_for_min_actors_s = 600
338-
# TODO: Remove this when https://github.com/ray-project/ray/issues/53169
339-
# is fixed.
340-
data_context._enable_actor_pool_on_exit_hook = True
341-
342333
# NOTE (Kourosh): If pre/postprocess is not provided, use the identity function.
343334
# Wrapping is required even if they are identity functions, b/c data_column
344335
# gets inserted/removed via wrap_preprocess/wrap_postprocess.
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""
2+
Test that Ray Data LLM does not override wait_for_min_actors_s.
3+
4+
With default settings (wait_for_min_actors_s <= 0), processing starts
5+
as soon as any actor is ready, regardless of concurrency config.
6+
"""
7+
import sys
8+
9+
import pytest
10+
11+
from ray.data import DataContext
12+
from ray.llm._internal.batch.processor import ProcessorBuilder
13+
from ray.llm._internal.batch.processor.vllm_engine_proc import vLLMEngineProcessorConfig
14+
15+
16+
@pytest.fixture(autouse=True)
17+
def reset_data_context():
18+
"""Reset DataContext before and after each test."""
19+
ctx = DataContext.get_current()
20+
original_value = ctx.wait_for_min_actors_s
21+
ctx.wait_for_min_actors_s = -1
22+
yield
23+
ctx.wait_for_min_actors_s = original_value
24+
25+
26+
class TestWaitForMinActorsNotOverridden:
27+
"""Test that Processor does not override wait_for_min_actors_s."""
28+
29+
def test_processor_does_not_override_default(self):
30+
"""Processor should not change wait_for_min_actors_s from default."""
31+
ctx = DataContext.get_current()
32+
ctx.wait_for_min_actors_s = -1
33+
34+
config = vLLMEngineProcessorConfig(
35+
model_source="facebook/opt-125m",
36+
concurrency=4,
37+
)
38+
ProcessorBuilder.build(config)
39+
40+
assert ctx.wait_for_min_actors_s == -1
41+
42+
@pytest.mark.parametrize("user_value", [60, 600, 1800])
43+
def test_processor_preserves_user_setting(self, user_value):
44+
"""Processor should preserve user-set wait_for_min_actors_s."""
45+
ctx = DataContext.get_current()
46+
ctx.wait_for_min_actors_s = user_value
47+
48+
config = vLLMEngineProcessorConfig(
49+
model_source="facebook/opt-125m",
50+
concurrency=4,
51+
)
52+
ProcessorBuilder.build(config)
53+
54+
assert ctx.wait_for_min_actors_s == user_value
55+
56+
57+
class TestConcurrencyConfigPassthrough:
58+
"""
59+
Test that concurrency config correctly sets ActorPoolStrategy.
60+
61+
This determines blocking behavior when wait_for_min_actors_s > 0:
62+
- concurrency=N → min_size=N → blocks for N actors
63+
- concurrency=(1, N) → min_size=1 → blocks for 1 actor
64+
"""
65+
66+
@pytest.mark.parametrize(
67+
"concurrency,expected_min_size,expected_max_size",
68+
[
69+
(4, 4, 4), # int: fixed pool
70+
((1, 4), 1, 4), # tuple: autoscaling pool
71+
((2, 8), 2, 8), # tuple: custom min
72+
],
73+
ids=["int_concurrency", "tuple_1_to_n", "tuple_custom_min"],
74+
)
75+
def test_concurrency_to_actor_pool_strategy(
76+
self, concurrency, expected_min_size, expected_max_size
77+
):
78+
"""Verify concurrency config maps to correct ActorPoolStrategy."""
79+
config = vLLMEngineProcessorConfig(
80+
model_source="facebook/opt-125m",
81+
concurrency=concurrency,
82+
)
83+
processor = ProcessorBuilder.build(config)
84+
85+
# Get the vLLM stage and check its compute strategy
86+
stage = processor.get_stage_by_name("vLLMEngineStage")
87+
compute = stage.map_batches_kwargs.get("compute")
88+
89+
assert (
90+
compute.min_size == expected_min_size
91+
), f"Expected min_size={expected_min_size}, got {compute.min_size}"
92+
assert (
93+
compute.max_size == expected_max_size
94+
), f"Expected max_size={expected_max_size}, got {compute.max_size}"
95+
96+
97+
if __name__ == "__main__":
98+
sys.exit(pytest.main(["-v", __file__]))

0 commit comments

Comments
 (0)