Skip to content

Commit aec7d65

Browse files
committed
resolve comments
Signed-off-by: Yu Chen <yuchen.ecnu@gmail.com>
1 parent fff4742 commit aec7d65

File tree

4 files changed

+63
-23
lines changed

4 files changed

+63
-23
lines changed

python/ray/data/llm.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from ray.llm._internal.batch.stages.configs import (
1515
ChatTemplateStageConfig as _ChatTemplateStageConfig,
1616
DetokenizeStageConfig as _DetokenizeStageConfig,
17+
HttpRequestStageConfig as _HttpRequestStageConfig,
1718
PrepareImageStageConfig as _PrepareImageStageConfig,
1819
PrepareMultimodalStageConfig as _PrepareMultimodalStageConfig,
1920
TokenizerStageConfig as _TokenizerStageConfig,
@@ -512,6 +513,29 @@ class TokenizerStageConfig(_TokenizerStageConfig):
512513
pass
513514

514515

516+
@PublicAPI(stability="alpha")
517+
class HttpRequestStageConfig(_HttpRequestStageConfig):
518+
"""The configuration for the http request stage.
519+
520+
Args:
521+
enabled: Whether this stage is enabled. Defaults to True.
522+
batch_size: Rows per batch. If not specified, will use the processor-level
523+
batch_size.
524+
concurrency: Actor pool size or range for this stage. If not specified,
525+
will use the processor-level concurrency. If ``concurrency`` is a
526+
tuple ``(m, n)``, Ray creates an autoscaling actor pool that scales
527+
between ``m`` and ``n`` workers (``1 <= m <= n``). If ``concurrency``
528+
is an ``int`` ``n``, CPU stages use an autoscaling pool from ``(1, n)``.
529+
runtime_env: Optional runtime environment for this stage. If not specified,
530+
will use the processor-level runtime_env. See
531+
:ref:`this doc <handling_dependencies>` for more details.
532+
num_cpus: Number of CPUs to reserve for each map worker in this stage.
533+
memory: Heap memory in bytes to reserve for each map worker in this stage.
534+
"""
535+
536+
pass
537+
538+
515539
@PublicAPI(stability="alpha")
516540
class PrepareImageStageConfig(_PrepareImageStageConfig):
517541
"""The configuration for the prepare image stage.
@@ -737,6 +761,7 @@ def build_processor(
737761
"DetokenizeStageConfig",
738762
"PrepareMultimodalStageConfig",
739763
"TokenizerStageConfig",
764+
"HttpRequestStageConfig",
740765
"PrepareImageStageConfig",
741766
"build_llm_processor",
742767
"build_processor",

python/ray/llm/_internal/batch/processor/http_request_proc.py

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
from pydantic import Field
66

7-
from ray.data import ActorPoolStrategy
87
from ray.data.block import UserDefinedFunction
98
from ray.llm._internal.batch.observability.usage_telemetry.usage import (
109
BatchModelTelemetry,
@@ -15,8 +14,12 @@
1514
ProcessorBuilder,
1615
ProcessorConfig,
1716
)
18-
from ray.llm._internal.batch.processor.utils import extract_resource_kwargs
17+
from ray.llm._internal.batch.processor.utils import build_cpu_stage_map_kwargs
1918
from ray.llm._internal.batch.stages import HttpRequestStage
19+
from ray.llm._internal.batch.stages.configs import (
20+
HttpRequestStageConfig,
21+
resolve_stage_config,
22+
)
2023

2124

2225
class HttpRequestProcessorConfig(ProcessorConfig):
@@ -55,14 +58,9 @@ class HttpRequestProcessorConfig(ProcessorConfig):
5558
# exclude from JSON serialization since `session_factory` is a callable
5659
exclude=True,
5760
)
58-
num_cpus: Optional[float] = Field(
59-
default=None,
60-
description="Number of CPUs per HttpRequestUDF worker. Defaults to 1 if None. "
61-
"For I/O-bound workloads, use fractional values (e.g., 0.1).",
62-
)
63-
memory: Optional[float] = Field(
64-
default=None,
65-
description="Heap memory in bytes to reserve for each HttpRequestUDF worker.",
61+
http_request_stage: Any = Field(
62+
default=True,
63+
description="Chat templating stage config (bool | dict | HttpRequestStageConfig).",
6664
)
6765

6866

@@ -90,6 +88,25 @@ def build_http_request_processor(
9088
Returns:
9189
The constructed processor.
9290
"""
91+
92+
# Prepare processor defaults for merging into stage configs
93+
processor_defaults = {
94+
"batch_size": config.batch_size,
95+
"concurrency": config.concurrency,
96+
}
97+
98+
# Resolve and build HttpRequestStage if enabled
99+
http_request_stage_cfg = resolve_stage_config(
100+
config.http_request_stage,
101+
HttpRequestStageConfig,
102+
processor_defaults,
103+
)
104+
105+
if not http_request_stage_cfg.enabled:
106+
raise ValueError(
107+
"The HTTP request stage is required and cannot be disabled in HttpRequestProcessorConfig."
108+
)
109+
93110
stages = [
94111
HttpRequestStage(
95112
fn_constructor_kwargs=dict(
@@ -100,16 +117,7 @@ def build_http_request_processor(
100117
base_retry_wait_time_in_s=config.base_retry_wait_time_in_s,
101118
session_factory=config.session_factory,
102119
),
103-
map_batches_kwargs=dict(
104-
compute=ActorPoolStrategy(
105-
**config.get_concurrency(autoscaling_enabled=False),
106-
),
107-
**extract_resource_kwargs(
108-
None,
109-
config.num_cpus,
110-
config.memory,
111-
),
112-
),
120+
map_batches_kwargs=build_cpu_stage_map_kwargs(http_request_stage_cfg),
113121
)
114122
]
115123
telemetry_agent = get_or_create_telemetry_agent()

python/ray/llm/_internal/batch/stages/configs.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ class PrepareMultimodalStageConfig(_StageConfigBase):
6868
)
6969

7070

71+
class HttpRequestStageConfig(_StageConfigBase):
72+
pass
73+
74+
7175
def resolve_stage_config(
7276
stage_cfg_value: Union[bool, Dict[str, Any], _StageConfigBase],
7377
stage_config_cls: Type[T],

python/ray/llm/tests/batch/cpu/processor/test_http_request_proc.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import pytest
44

5+
from ray.data.llm import HttpRequestStageConfig
56
from ray.llm._internal.batch.processor import ProcessorBuilder
67
from ray.llm._internal.batch.processor.http_request_proc import (
78
HttpRequestProcessorConfig,
@@ -15,15 +16,17 @@ def test_http_request_processor():
1516
qps=2,
1617
concurrency=4,
1718
batch_size=64,
18-
num_cpus=0.5,
19-
memory=100000,
19+
http_request_stage=HttpRequestStageConfig(
20+
num_cpus=0.5,
21+
memory=100000,
22+
),
2023
)
2124
processor = ProcessorBuilder.build(config)
2225
assert processor.list_stage_names() == ["HttpRequestStage"]
2326
stage = processor.get_stage_by_name("HttpRequestStage")
2427
assert stage.map_batches_kwargs["num_cpus"] == 0.5
2528
assert stage.map_batches_kwargs["memory"] == 100000
26-
assert stage.map_batches_kwargs["compute"].min_size == 4
29+
assert stage.map_batches_kwargs["compute"].min_size == 1
2730
assert stage.map_batches_kwargs["compute"].max_size == 4
2831
assert stage.fn_constructor_kwargs["url"] == "http://localhost:8000"
2932
assert stage.fn_constructor_kwargs["additional_header"] == {

0 commit comments

Comments
 (0)