[rollout] feat: global request-level load balancer single source routing#5399
Conversation
There was a problem hiding this comment.
Code Review
The pull request successfully replaces the heapq-based local load balancing with a centralized GlobalRequestLoadBalancer Ray actor, which is a significant improvement for multi-node, multi-worker setups. The introduction of try/finally blocks for acquire_server and release_server ensures proper resource management and prevents counter leaks, enhancing the robustness of the system. However, there are a couple of areas that need attention to ensure correctness and maintainability.
| server_idx, server = self._choose_server_with_index(request_id) | ||
| self._inflight_requests[server_idx] += 1 |
There was a problem hiding this comment.
In AsyncLLMServerManager, when _load_balancer is None (local load balancing mode), the _inflight_requests list is a shared mutable state within a single AsyncLLMServerManager instance. The AgentLoopWorker.generate_sequences method uses asyncio.gather to run multiple agent loops concurrently, and each loop will access the same self.server_manager instance. This creates a race condition when _acquire_server_with_index and _release_server modify self._inflight_requests concurrently. This can lead to incorrect in-flight request counts and thus suboptimal or incorrect load balancing. To fix this, access to self._inflight_requests in local mode must be protected by an asyncio.Lock.
| server_idx, server = self._choose_server_with_index(request_id) | |
| self._inflight_requests[server_idx] += 1 | |
| async with self._local_lock: | |
| server_idx, server = self._choose_server_with_index(request_id) | |
| self._inflight_requests[server_idx] += 1 |
| if self._inflight_requests[server_idx] > 0: | ||
| self._inflight_requests[server_idx] -= 1 |
There was a problem hiding this comment.
Similar to the _acquire_server_with_index method, the _release_server method in local load balancing mode also modifies self._inflight_requests which is a shared mutable state. This operation needs to be protected by an asyncio.Lock to prevent race conditions and ensure the integrity of the in-flight request counts.
| if self._inflight_requests[server_idx] > 0: | |
| self._inflight_requests[server_idx] -= 1 | |
| async with self._local_lock: | |
| if self._inflight_requests[server_idx] > 0: | |
| self._inflight_requests[server_idx] -= 1 |
| self._load_balancer = load_balancer_handle | ||
|
|
||
| # In global load-balancer mode, server index must map to the same instance on every worker. | ||
| if self._load_balancer is None: | ||
| random.shuffle(self.server_handles) |
There was a problem hiding this comment.
To ensure that the _local_lock is always available when _load_balancer is None, it should be initialized unconditionally in the __init__ method. This prevents potential AttributeError if _local_lock is accessed before _load_balancer is determined to be None.
| self._load_balancer = load_balancer_handle | |
| # In global load-balancer mode, server index must map to the same instance on every worker. | |
| if self._load_balancer is None: | |
| random.shuffle(self.server_handles) | |
| self.server_handles = list(server_handles) | |
| self._load_balancer = load_balancer_handle | |
| self._local_lock = asyncio.Lock() # Initialize lock unconditionally | |
| # In global load-balancer mode, server index must map to the same instance on every worker. | |
| if self._load_balancer is None: | |
| random.shuffle(self.server_handles) |
| if self._inflight_requests[candidate_idx] == min_inflight: | ||
| self._next_server_idx = (candidate_idx + 1) % num_servers | ||
| return candidate_idx | ||
| raise RuntimeError("Failed to select a server for routing.") |
There was a problem hiding this comment.
The RuntimeError at this line appears to be unreachable. If num_servers is positive (which is checked in __init__), the loop for offset in range(num_servers) will always find a server with the minimum in-flight requests. This makes the RuntimeError dead code. Consider replacing it with an assertion to indicate an impossible state, which is more appropriate for logic errors that should not occur.
| raise RuntimeError("Failed to select a server for routing.") | |
| assert False, "Logic error: should always find a server if num_servers > 0" |
| self._next_server_idx = (candidate_idx + 1) % num_servers | ||
| return candidate_idx | ||
|
|
||
| raise RuntimeError("Failed to select a server for routing.") |
There was a problem hiding this comment.
Similar to the GlobalRequestLoadBalancer's _select_least_loaded_server_idx method, this RuntimeError is likely unreachable. The preceding check if not self._inflight_requests: handles the empty case. If _inflight_requests is not empty, the loop is guaranteed to find a server. Replacing this with an assertion would clarify that this is an unexpected, impossible state.
| raise RuntimeError("Failed to select a server for routing.") | |
| assert False, "Logic error: should always find a server if num_servers > 0" |
b3a9533 to
8794343
Compare
5f0a98c to
9bbcd96
Compare
| class GlobalRequestLoadBalancer: | ||
| """Global sticky-session + in-flight load balancer shared by all AgentLoopWorkers.""" | ||
|
|
||
| def __init__(self, num_servers: int, max_cache_size: int = DEFAULT_ROUTING_CACHE_SIZE): |
There was a problem hiding this comment.
Better pass in server actor ids here, since we may support elastic scale up/down server in future.
There was a problem hiding this comment.
should we do A or B? I think A is better here as IP is shown.
A:
self.server_addresses:
AgentLoopManager: ['10.1.201.2:35057', '10.1.201.2:45275', '10.1.201.2:38121', '10.1.201.2:46807']
B:
the ray attribute _actor_id.hex():
['17e5a28bb23117714906f67323000000', '811580cb4c81c7fcb617c74623000000', 'a3ce9c328fb401163cdbb7d023000000', 'bad2866fe8059f1346b6442e23000000']
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a global GlobalRequestLoadBalancer to centralize routing logic, which is a significant architectural improvement over the previous per-worker approach. The implementation correctly handles sticky sessions and least-loaded routing with round-robin tie-breaking, and the new unit tests are comprehensive. I have one high-severity suggestion to improve the reliability of releasing server resources, which will prevent potential state corruption in the load balancer and enhance long-term system stability.
| async def _release_server(self, server_idx: Optional[int]) -> None: | ||
| if server_idx is None: | ||
| return | ||
| # Fire-and-forget: release is just a counter decrement, no need to await. | ||
| # Awaiting here risks blocking the finally clause if the LB actor is unresponsive. | ||
| self._load_balancer.release_server.remote(server_idx=server_idx) |
There was a problem hiding this comment.
The current fire-and-forget approach for releasing a server can lead to state corruption in the load balancer if the release_server call is lost (e.g., due to the worker process crashing before the message is sent). This would cause the inflight request count to "leak", eventually making the load balancer believe servers are busier than they are, which could degrade or halt the system.
While the comment correctly identifies the risk of blocking in a finally clause, an unresponsive load balancer would likely cause subsequent acquire_server calls to block anyway. Prioritizing the consistency of the load balancer's state seems more crucial for long-term system stability.
I suggest awaiting the release_server call within a try...except block to handle potential Ray errors gracefully. This ensures the release is confirmed while preventing the worker from getting stuck indefinitely if the load balancer actor is down.
| async def _release_server(self, server_idx: Optional[int]) -> None: | |
| if server_idx is None: | |
| return | |
| # Fire-and-forget: release is just a counter decrement, no need to await. | |
| # Awaiting here risks blocking the finally clause if the LB actor is unresponsive. | |
| self._load_balancer.release_server.remote(server_idx=server_idx) | |
| async def _release_server(self, server_idx: Optional[int]) -> None: | |
| if server_idx is None: | |
| return | |
| try: | |
| await self._load_balancer.release_server.remote(server_idx=server_idx) | |
| except ray.exceptions.RayError as e: | |
| logger.warning(f"Failed to release server {server_idx} from load balancer: {e}") |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a global request-level load balancer, GlobalRequestLoadBalancer, to manage routing for AgentLoopWorkers. It refactors the load balancing logic from AsyncLLMServerManager into this new Ray actor, aiming to provide a single source of truth for routing and improve load distribution. A security audit confirmed that no high or critical security vulnerabilities were identified, and the implementation adheres to secure coding principles. The AI model's code review comments have been reviewed and are included as they provide valuable feedback on testing practices and potential logical issues.
| with pytest.raises(ray.exceptions.RayTaskError, match="Invalid server_idx for release"): | ||
| ray.get(lb.release_server.remote(server_idx=9)) |
There was a problem hiding this comment.
The match argument in pytest.raises uses a very specific string. If the error message in GlobalRequestLoadBalancer.release_server changes slightly (e.g., adds punctuation or more context), this test will fail even if the underlying logic remains correct. It's generally safer to use a more general regex or check for the exception type and a key substring, or even better, the specific exception type and then assert on the message content separately if more precision is needed.
| with pytest.raises(ray.exceptions.RayTaskError, match="Invalid server_idx for release"): | |
| ray.get(lb.release_server.remote(server_idx=9)) | |
| with pytest.raises(ray.exceptions.RayTaskError, match="Invalid server_idx for release") as excinfo: | |
| ray.get(lb.release_server.remote(server_idx=9)) | |
| assert "Invalid server_idx for release" in str(excinfo.value) |
| with pytest.raises(ray.exceptions.RayTaskError, match="no inflight requests"): | ||
| ray.get(lb.release_server.remote(server_idx=1)) |
There was a problem hiding this comment.
Similar to the previous comment, the match argument for pytest.raises is very specific. The error message in GlobalRequestLoadBalancer.release_server is Release called with no inflight requests on server {server_idx}. A minor change to this message could cause the test to break. Consider using a more robust matching strategy.
| with pytest.raises(ray.exceptions.RayTaskError, match="no inflight requests"): | |
| ray.get(lb.release_server.remote(server_idx=1)) | |
| with pytest.raises(ray.exceptions.RayTaskError, match="no inflight requests") as excinfo: | |
| ray.get(lb.release_server.remote(server_idx=1)) | |
| assert "no inflight requests" in str(excinfo.value) |
| if self._inflight_requests[candidate_idx] == min_inflight: | ||
| self._next_server_idx = (candidate_idx + 1) % num_servers | ||
| return candidate_idx | ||
| raise RuntimeError("Failed to select a server for routing.") |
There was a problem hiding this comment.
The RuntimeError("Failed to select a server for routing.") on line 78 seems potentially unreachable or indicates a fundamental flaw if it can be reached. Given that server_actor_ids is checked for emptiness in __init__, num_servers will always be greater than zero. The loop for offset in range(num_servers) guarantees that candidate_idx will iterate through all possible server indices. Since min_inflight is calculated from self._inflight_requests, at least one server will always have _inflight_requests[candidate_idx] == min_inflight. This suggests the RuntimeError might never be hit, or if it is, it points to an unexpected state that needs further investigation or a more specific error condition.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a global request load balancer to manage server routing, which is a great step towards a more robust and scalable architecture. The implementation of GlobalRequestLoadBalancer with sticky sessions and least-loaded routing with round-robin tie-breaking is solid, and the accompanying tests are comprehensive.
My main concern, which I've flagged as critical, is that the load_balancer_handle is treated as optional in several __init__ methods. However, the new implementation completely relies on this handle, and providing None will lead to runtime errors as there's no fallback to the old load balancing logic. I've suggested making this handle a required parameter to prevent such failures.
| load_balancer_handle: Optional[ray.actor.ActorHandle] = None, | ||
| ): | ||
| """Initialize the AsyncLLMServerManager. | ||
|
|
||
| Args: | ||
| config (DictConfig): whole config for main entrypoint. | ||
| server_handles (List[ray.actor.ActorHandle]): OpenAI compatible LLM server actor handles. | ||
| max_cache_size (int, optional): max cache size for request_id to server mapping. Defaults to 10000. | ||
| load_balancer_handle (Optional[ray.actor.ActorHandle]): shared global load balancer actor. |
There was a problem hiding this comment.
The load_balancer_handle is marked as Optional, but if it is None, calls to _acquire_server_with_index and _release_server will fail with an AttributeError as self._load_balancer will be None. The old load balancing logic has been removed, so there is no fallback. The load_balancer_handle should be made non-optional.
| load_balancer_handle: Optional[ray.actor.ActorHandle] = None, | |
| ): | |
| """Initialize the AsyncLLMServerManager. | |
| Args: | |
| config (DictConfig): whole config for main entrypoint. | |
| server_handles (List[ray.actor.ActorHandle]): OpenAI compatible LLM server actor handles. | |
| max_cache_size (int, optional): max cache size for request_id to server mapping. Defaults to 10000. | |
| load_balancer_handle (Optional[ray.actor.ActorHandle]): shared global load balancer actor. | |
| load_balancer_handle: ray.actor.ActorHandle, | |
| ): | |
| """Initialize the AsyncLLMServerManager. | |
| Args: | |
| config (DictConfig): whole config for main entrypoint. | |
| server_handles (List[ray.actor.ActorHandle]): OpenAI compatible LLM server actor handles. | |
| load_balancer_handle (ray.actor.ActorHandle): shared global load balancer actor. |
| load_balancer_handle: Optional[ray.actor.ActorHandle] = None, | ||
| ): | ||
| """Initialize agent loop manager. | ||
| Args: | ||
| config (DictConfig): YAML config. | ||
| server_handles (List[ray.actor.ActorHandle]): OpenAI compatible LLM server actor handles. | ||
| reward_loop_worker_handles (List[ray.actor.ActorHandle]): Actor handles for streaming reward computation. | ||
| load_balancer_handle (Optional[ray.actor.ActorHandle]): shared global load balancer actor. | ||
| """ |
There was a problem hiding this comment.
The load_balancer_handle is marked as Optional, but it is passed to AsyncLLMServerManager which requires it. If load_balancer_handle is None, this will lead to a runtime AttributeError since the internal load balancing logic has been removed. The handle should be made non-optional.
| load_balancer_handle: Optional[ray.actor.ActorHandle] = None, | |
| ): | |
| """Initialize agent loop manager. | |
| Args: | |
| config (DictConfig): YAML config. | |
| server_handles (List[ray.actor.ActorHandle]): OpenAI compatible LLM server actor handles. | |
| reward_loop_worker_handles (List[ray.actor.ActorHandle]): Actor handles for streaming reward computation. | |
| load_balancer_handle (Optional[ray.actor.ActorHandle]): shared global load balancer actor. | |
| """ | |
| load_balancer_handle: ray.actor.ActorHandle, | |
| ): | |
| """Initialize agent loop manager. | |
| Args: | |
| config (DictConfig): YAML config. | |
| server_handles (List[ray.actor.ActorHandle]): OpenAI compatible LLM server actor handles. | |
| reward_loop_worker_handles (List[ray.actor.ActorHandle]): Actor handles for streaming reward computation. | |
| load_balancer_handle (ray.actor.ActorHandle): shared global load balancer actor. | |
| """ |
| config: DictConfig, | ||
| server_handles: list[ray.actor.ActorHandle], | ||
| reward_loop_worker_handles: list[ray.actor.ActorHandle] = None, | ||
| load_balancer_handle: Optional[ray.actor.ActorHandle] = None, |
There was a problem hiding this comment.
The load_balancer_handle is marked as Optional, but it is passed to FullyAsyncLLMServerManager and the parent AgentLoopWorker, both of which require it. If it is None, this will cause a runtime AttributeError. The handle should be made non-optional.
| load_balancer_handle: Optional[ray.actor.ActorHandle] = None, | |
| load_balancer_handle: ray.actor.ActorHandle, |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a global request-level load balancer, GlobalRequestLoadBalancer, as a centralized Ray actor to manage routing for LLM servers. The implementation correctly handles sticky sessions for multi-turn conversations and falls back to a least-loaded strategy with round-robin tie-breaking for new requests. The new load balancer is integrated into AsyncLLMServerManager and wired through the AgentLoopManager and AgentLoopWorker components. Unit tests for the new load balancer are also included and cover various routing and session stickiness scenarios.
My main feedback concerns the implementation of _release_server in AsyncLLMServerManager. It is unnecessarily declared as an async method, which can be confusing. I've suggested changing it to a synchronous method for better code clarity, which also requires updating the call sites.
| async def _release_server(self, server_idx: Optional[int]) -> None: | ||
| if server_idx is None: | ||
| return | ||
| # Fire-and-forget: release is just a counter decrement, no need to await. | ||
| # Awaiting here risks blocking the finally clause if the LB actor is unresponsive. | ||
| self._load_balancer.release_server.remote(server_idx=server_idx) |
There was a problem hiding this comment.
This method is declared as async but contains no await expressions. This is misleading because it suggests an asynchronous operation that can be awaited, but it completes synchronously. To improve clarity and align with its fire-and-forget nature, it should be a regular synchronous method. This is particularly important as it's used in a finally block.
| async def _release_server(self, server_idx: Optional[int]) -> None: | |
| if server_idx is None: | |
| return | |
| # Fire-and-forget: release is just a counter decrement, no need to await. | |
| # Awaiting here risks blocking the finally clause if the LB actor is unresponsive. | |
| self._load_balancer.release_server.remote(server_idx=server_idx) | |
| def _release_server(self, server_idx: Optional[int]) -> None: | |
| if server_idx is None: | |
| return | |
| # Fire-and-forget: release is just a counter decrement, no need to await. | |
| # Awaiting here risks blocking the finally clause if the LB actor is unresponsive. | |
| self._load_balancer.release_server.remote(server_idx=server_idx) |
| ) | ||
| return output | ||
| finally: | ||
| await self._release_server(server_idx) |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a global request-level load balancer, which is a significant and well-executed architectural improvement. The new GlobalRequestLoadBalancer correctly implements sticky sessions and least-loaded routing with round-robin tie-breaking. The integration into AsyncLLMServerManager and the worker classes is clean, using a try...finally block to ensure resources are properly released, which is critical for robustness. The accompanying unit tests for the load balancer are comprehensive and cover routing logic, sticky sessions, and error conditions effectively. The changes are consistently applied across both the base and the fully asynchronous agent loops. I have not identified any critical or high-severity issues in this implementation.
| self._next_server_idx = (candidate_idx + 1) % num_servers | ||
| return candidate_idx | ||
|
|
||
| def acquire_server(self, request_id: str) -> int: |
There was a problem hiding this comment.
Why not just return server_actor_id? It's tricky to ensure GlobalRequestLoadBalancer and AsyncLLMServerManager have consistent order.
| def _select_least_loaded_server_idx(self) -> int: | ||
| min_inflight = min(self._inflight_requests) | ||
| num_servers = len(self._inflight_requests) | ||
| for offset in range(num_servers): |
There was a problem hiding this comment.
I'm a bit confuse here, why not just choose the server_actor_id with min_inflight?
…he affinity Replace per-worker heapq load balancing with a shared GlobalRequestLoadBalancer Ray actor that coordinates in-flight request counts across all workers. Key changes: - Add GlobalRequestLoadBalancer actor with acquire/release server protocol - Merge server_handles + server_addresses into servers: list[tuple[str, ActorHandle]] - Add GRPO prefix cache affinity via request_id-based sticky routing - Update AsyncLLMServerManager, AgentLoopWorker, FullyAsyncAgentLoopWorker signatures - Update test_special_server_adapter.py for new constructor API - Add load balancer unit tests in test_basic_agent_loop.py Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
9f8de25 to
06b9c1b
Compare
…ing (verl-project#5399) ## Summary This PR is **Task 1 (Phase 1)** for routing roadmap (verl-project#5442): - Introduce global `GlobalRequestLoadBalancer` as runtime single source of truth for routing. - Keep request-level sticky (`request_id -> server`) + least-loaded routing. ## Code Scope - `verl/experimental/agent_loop/agent_loop.py` - global LB routing path (`acquire/release`) - `verl/experimental/fully_async_policy/agent_loop/agent_loop.py` - keep required wiring only - `tests/experimental/agent_loop/test_basic_agent_loop.py` - load balancer behavior tests aligned with current strategy ## Experiment Configuration (Copied) Model/Data: - Model: **Qwen2.5-VL-32B** - Data: **osworld.json** - Machine: **3 * H100 Nodes** Runtime config (from run): ```bash MAX_ASSISTANT_TURNS=50 MAX_PROMPT_LENGTH=24488 RESPONSE_LENGTH=512 MAX_TOKENS=512 NNODES=3 N_GPU_PER_NODE=8 HARDWARE=3 x H100 nodes SP_SIZE=1 MICRO_BATCH_SIZE_PER_GPU=1 PPO_MINI_BATCH_SIZE=6 TRAIN_BATCH_SIZE=24 ROLLOUT_N=16 STRATEGY=fsdp2 ENGINE=vllm DUMP_VALIDATION_DATA=true DUMP_ROLLOUT_DATA=false DUMP_TASK_SCORES=false DUMP_ROLLOUT_TIMING=true FILTER_GROUPS=true DETERMINISTIC=false TRAJECTORY_SPLITTING=true USE_FLEX_ATTENTION_HETERO=false DUMMY_MODE=disabled RAY_DASHBOARD_ADDRESS=http://127.0.0.1:8265 nsys_prof=false ENABLE_PROMETHEUS_MONITORING=true PROMETHEUS_PORT=9090 SCRIPT_DIR=/home/ubuntu/verl-grounding PROMETHEUS_CONFIG_FILE=/home/ubuntu/verl-grounding/gui_scripts/vllm_monitoring/prometheus.yaml PROMETHEUS_SERVED_MODEL_NAME=qwen2_5_vl_32b EXPERIMENT_NAME=gui-agent-fsdp2-gb24-3nodes-20260223_145012 HF_MODEL_PATH=/mnt/data2/models_and_datasets/sft-32b-bigs1-1027-s2-0103-osworld TRAIN_DATA_PATH=data/nongoogledrive_02_04_no_infeasible_and_chart_editing_train.json TEST_DATA_PATH=data/nongoogledrive_02_04_no_infeasible_and_chart_editing_test.json PROJECT_NAME=verl_grounding SAVE_PATH=/mnt/data2/saves/verl_grounding/gui-agent-fsdp2-gb24-3nodes-20260223_145012 ALL_OFFLOAD=true ACTOR_PARAM_OFFLOAD=true ACTOR_GRAD_OFFLOAD=true ACTOR_OPTIMIZER_OFFLOAD=true REF_PARAM_OFFLOAD=true PROMETHEUS_PARAMS='actor_rollout_ref.rollout.disable_log_stats=False actor_rollout_ref.rollout.prometheus.enable=True actor_rollout_ref.rollout.prometheus.port=9090 actor_rollout_ref.rollout.prometheus.file=/home/ubuntu/verl-grounding/gui_scripts/vllm_monitoring/prometheus.yaml actor_rollout_ref.rollout.prometheus.served_model_name=qwen2_5_vl_32b' ``` ## Load Distribution Comparison (Placeholder) Before (baseline): <img width="1121" height="474" alt="148a35c49293e9a79fe25d2877d2670f" src="https://github.com/user-attachments/assets/771d4e09-77df-4a7e-b071-fe7fc570c2a4" /> After (optimized): <img width="1152" height="491" alt="d516d2756af159a4b2b86d9228e8ac38" src="https://github.com/user-attachments/assets/7b6a1f8d-18d9-494a-9243-42edea6386a4" /> ## Metrics (Before PR -> After PR) | Metric | Baseline | Optimized | Delta | |---|---:|---:|---:| | `agent_loop/per_step/generate_sequences/mean` | `6.3596` s | `4.5312` s | `-1.8285` s (`-28.75%`) | | `agent_loop/slowest/total_time` | `899.3761` s | `724.6898` s | `-174.6863` s (`-19.42%`) | Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…ing (verl-project#5399) ## Summary This PR is **Task 1 (Phase 1)** for routing roadmap (verl-project#5442): - Introduce global `GlobalRequestLoadBalancer` as runtime single source of truth for routing. - Keep request-level sticky (`request_id -> server`) + least-loaded routing. ## Code Scope - `verl/experimental/agent_loop/agent_loop.py` - global LB routing path (`acquire/release`) - `verl/experimental/fully_async_policy/agent_loop/agent_loop.py` - keep required wiring only - `tests/experimental/agent_loop/test_basic_agent_loop.py` - load balancer behavior tests aligned with current strategy ## Experiment Configuration (Copied) Model/Data: - Model: **Qwen2.5-VL-32B** - Data: **osworld.json** - Machine: **3 * H100 Nodes** Runtime config (from run): ```bash MAX_ASSISTANT_TURNS=50 MAX_PROMPT_LENGTH=24488 RESPONSE_LENGTH=512 MAX_TOKENS=512 NNODES=3 N_GPU_PER_NODE=8 HARDWARE=3 x H100 nodes SP_SIZE=1 MICRO_BATCH_SIZE_PER_GPU=1 PPO_MINI_BATCH_SIZE=6 TRAIN_BATCH_SIZE=24 ROLLOUT_N=16 STRATEGY=fsdp2 ENGINE=vllm DUMP_VALIDATION_DATA=true DUMP_ROLLOUT_DATA=false DUMP_TASK_SCORES=false DUMP_ROLLOUT_TIMING=true FILTER_GROUPS=true DETERMINISTIC=false TRAJECTORY_SPLITTING=true USE_FLEX_ATTENTION_HETERO=false DUMMY_MODE=disabled RAY_DASHBOARD_ADDRESS=http://127.0.0.1:8265 nsys_prof=false ENABLE_PROMETHEUS_MONITORING=true PROMETHEUS_PORT=9090 SCRIPT_DIR=/home/ubuntu/verl-grounding PROMETHEUS_CONFIG_FILE=/home/ubuntu/verl-grounding/gui_scripts/vllm_monitoring/prometheus.yaml PROMETHEUS_SERVED_MODEL_NAME=qwen2_5_vl_32b EXPERIMENT_NAME=gui-agent-fsdp2-gb24-3nodes-20260223_145012 HF_MODEL_PATH=/mnt/data2/models_and_datasets/sft-32b-bigs1-1027-s2-0103-osworld TRAIN_DATA_PATH=data/nongoogledrive_02_04_no_infeasible_and_chart_editing_train.json TEST_DATA_PATH=data/nongoogledrive_02_04_no_infeasible_and_chart_editing_test.json PROJECT_NAME=verl_grounding SAVE_PATH=/mnt/data2/saves/verl_grounding/gui-agent-fsdp2-gb24-3nodes-20260223_145012 ALL_OFFLOAD=true ACTOR_PARAM_OFFLOAD=true ACTOR_GRAD_OFFLOAD=true ACTOR_OPTIMIZER_OFFLOAD=true REF_PARAM_OFFLOAD=true PROMETHEUS_PARAMS='actor_rollout_ref.rollout.disable_log_stats=False actor_rollout_ref.rollout.prometheus.enable=True actor_rollout_ref.rollout.prometheus.port=9090 actor_rollout_ref.rollout.prometheus.file=/home/ubuntu/verl-grounding/gui_scripts/vllm_monitoring/prometheus.yaml actor_rollout_ref.rollout.prometheus.served_model_name=qwen2_5_vl_32b' ``` ## Load Distribution Comparison (Placeholder) Before (baseline): <img width="1121" height="474" alt="148a35c49293e9a79fe25d2877d2670f" src="https://github.com/user-attachments/assets/771d4e09-77df-4a7e-b071-fe7fc570c2a4" /> After (optimized): <img width="1152" height="491" alt="d516d2756af159a4b2b86d9228e8ac38" src="https://github.com/user-attachments/assets/7b6a1f8d-18d9-494a-9243-42edea6386a4" /> ## Metrics (Before PR -> After PR) | Metric | Baseline | Optimized | Delta | |---|---:|---:|---:| | `agent_loop/per_step/generate_sequences/mean` | `6.3596` s | `4.5312` s | `-1.8285` s (`-28.75%`) | | `agent_loop/slowest/total_time` | `899.3761` s | `724.6898` s | `-174.6863` s (`-19.42%`) | Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…ing (verl-project#5399) ## Summary This PR is **Task 1 (Phase 1)** for routing roadmap (verl-project#5442): - Introduce global `GlobalRequestLoadBalancer` as runtime single source of truth for routing. - Keep request-level sticky (`request_id -> server`) + least-loaded routing. ## Code Scope - `verl/experimental/agent_loop/agent_loop.py` - global LB routing path (`acquire/release`) - `verl/experimental/fully_async_policy/agent_loop/agent_loop.py` - keep required wiring only - `tests/experimental/agent_loop/test_basic_agent_loop.py` - load balancer behavior tests aligned with current strategy ## Experiment Configuration (Copied) Model/Data: - Model: **Qwen2.5-VL-32B** - Data: **osworld.json** - Machine: **3 * H100 Nodes** Runtime config (from run): ```bash MAX_ASSISTANT_TURNS=50 MAX_PROMPT_LENGTH=24488 RESPONSE_LENGTH=512 MAX_TOKENS=512 NNODES=3 N_GPU_PER_NODE=8 HARDWARE=3 x H100 nodes SP_SIZE=1 MICRO_BATCH_SIZE_PER_GPU=1 PPO_MINI_BATCH_SIZE=6 TRAIN_BATCH_SIZE=24 ROLLOUT_N=16 STRATEGY=fsdp2 ENGINE=vllm DUMP_VALIDATION_DATA=true DUMP_ROLLOUT_DATA=false DUMP_TASK_SCORES=false DUMP_ROLLOUT_TIMING=true FILTER_GROUPS=true DETERMINISTIC=false TRAJECTORY_SPLITTING=true USE_FLEX_ATTENTION_HETERO=false DUMMY_MODE=disabled RAY_DASHBOARD_ADDRESS=http://127.0.0.1:8265 nsys_prof=false ENABLE_PROMETHEUS_MONITORING=true PROMETHEUS_PORT=9090 SCRIPT_DIR=/home/ubuntu/verl-grounding PROMETHEUS_CONFIG_FILE=/home/ubuntu/verl-grounding/gui_scripts/vllm_monitoring/prometheus.yaml PROMETHEUS_SERVED_MODEL_NAME=qwen2_5_vl_32b EXPERIMENT_NAME=gui-agent-fsdp2-gb24-3nodes-20260223_145012 HF_MODEL_PATH=/mnt/data2/models_and_datasets/sft-32b-bigs1-1027-s2-0103-osworld TRAIN_DATA_PATH=data/nongoogledrive_02_04_no_infeasible_and_chart_editing_train.json TEST_DATA_PATH=data/nongoogledrive_02_04_no_infeasible_and_chart_editing_test.json PROJECT_NAME=verl_grounding SAVE_PATH=/mnt/data2/saves/verl_grounding/gui-agent-fsdp2-gb24-3nodes-20260223_145012 ALL_OFFLOAD=true ACTOR_PARAM_OFFLOAD=true ACTOR_GRAD_OFFLOAD=true ACTOR_OPTIMIZER_OFFLOAD=true REF_PARAM_OFFLOAD=true PROMETHEUS_PARAMS='actor_rollout_ref.rollout.disable_log_stats=False actor_rollout_ref.rollout.prometheus.enable=True actor_rollout_ref.rollout.prometheus.port=9090 actor_rollout_ref.rollout.prometheus.file=/home/ubuntu/verl-grounding/gui_scripts/vllm_monitoring/prometheus.yaml actor_rollout_ref.rollout.prometheus.served_model_name=qwen2_5_vl_32b' ``` ## Load Distribution Comparison (Placeholder) Before (baseline): <img width="1121" height="474" alt="148a35c49293e9a79fe25d2877d2670f" src="https://github.com/user-attachments/assets/771d4e09-77df-4a7e-b071-fe7fc570c2a4" /> After (optimized): <img width="1152" height="491" alt="d516d2756af159a4b2b86d9228e8ac38" src="https://github.com/user-attachments/assets/7b6a1f8d-18d9-494a-9243-42edea6386a4" /> ## Metrics (Before PR -> After PR) | Metric | Baseline | Optimized | Delta | |---|---:|---:|---:| | `agent_loop/per_step/generate_sequences/mean` | `6.3596` s | `4.5312` s | `-1.8285` s (`-28.75%`) | | `agent_loop/slowest/total_time` | `899.3761` s | `724.6898` s | `-174.6863` s (`-19.42%`) | Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…ing (verl-project#5399) ## Summary This PR is **Task 1 (Phase 1)** for routing roadmap (verl-project#5442): - Introduce global `GlobalRequestLoadBalancer` as runtime single source of truth for routing. - Keep request-level sticky (`request_id -> server`) + least-loaded routing. ## Code Scope - `verl/experimental/agent_loop/agent_loop.py` - global LB routing path (`acquire/release`) - `verl/experimental/fully_async_policy/agent_loop/agent_loop.py` - keep required wiring only - `tests/experimental/agent_loop/test_basic_agent_loop.py` - load balancer behavior tests aligned with current strategy ## Experiment Configuration (Copied) Model/Data: - Model: **Qwen2.5-VL-32B** - Data: **osworld.json** - Machine: **3 * H100 Nodes** Runtime config (from run): ```bash MAX_ASSISTANT_TURNS=50 MAX_PROMPT_LENGTH=24488 RESPONSE_LENGTH=512 MAX_TOKENS=512 NNODES=3 N_GPU_PER_NODE=8 HARDWARE=3 x H100 nodes SP_SIZE=1 MICRO_BATCH_SIZE_PER_GPU=1 PPO_MINI_BATCH_SIZE=6 TRAIN_BATCH_SIZE=24 ROLLOUT_N=16 STRATEGY=fsdp2 ENGINE=vllm DUMP_VALIDATION_DATA=true DUMP_ROLLOUT_DATA=false DUMP_TASK_SCORES=false DUMP_ROLLOUT_TIMING=true FILTER_GROUPS=true DETERMINISTIC=false TRAJECTORY_SPLITTING=true USE_FLEX_ATTENTION_HETERO=false DUMMY_MODE=disabled RAY_DASHBOARD_ADDRESS=http://127.0.0.1:8265 nsys_prof=false ENABLE_PROMETHEUS_MONITORING=true PROMETHEUS_PORT=9090 SCRIPT_DIR=/home/ubuntu/verl-grounding PROMETHEUS_CONFIG_FILE=/home/ubuntu/verl-grounding/gui_scripts/vllm_monitoring/prometheus.yaml PROMETHEUS_SERVED_MODEL_NAME=qwen2_5_vl_32b EXPERIMENT_NAME=gui-agent-fsdp2-gb24-3nodes-20260223_145012 HF_MODEL_PATH=/mnt/data2/models_and_datasets/sft-32b-bigs1-1027-s2-0103-osworld TRAIN_DATA_PATH=data/nongoogledrive_02_04_no_infeasible_and_chart_editing_train.json TEST_DATA_PATH=data/nongoogledrive_02_04_no_infeasible_and_chart_editing_test.json PROJECT_NAME=verl_grounding SAVE_PATH=/mnt/data2/saves/verl_grounding/gui-agent-fsdp2-gb24-3nodes-20260223_145012 ALL_OFFLOAD=true ACTOR_PARAM_OFFLOAD=true ACTOR_GRAD_OFFLOAD=true ACTOR_OPTIMIZER_OFFLOAD=true REF_PARAM_OFFLOAD=true PROMETHEUS_PARAMS='actor_rollout_ref.rollout.disable_log_stats=False actor_rollout_ref.rollout.prometheus.enable=True actor_rollout_ref.rollout.prometheus.port=9090 actor_rollout_ref.rollout.prometheus.file=/home/ubuntu/verl-grounding/gui_scripts/vllm_monitoring/prometheus.yaml actor_rollout_ref.rollout.prometheus.served_model_name=qwen2_5_vl_32b' ``` ## Load Distribution Comparison (Placeholder) Before (baseline): <img width="1121" height="474" alt="148a35c49293e9a79fe25d2877d2670f" src="https://github.com/user-attachments/assets/771d4e09-77df-4a7e-b071-fe7fc570c2a4" /> After (optimized): <img width="1152" height="491" alt="d516d2756af159a4b2b86d9228e8ac38" src="https://github.com/user-attachments/assets/7b6a1f8d-18d9-494a-9243-42edea6386a4" /> ## Metrics (Before PR -> After PR) | Metric | Baseline | Optimized | Delta | |---|---:|---:|---:| | `agent_loop/per_step/generate_sequences/mean` | `6.3596` s | `4.5312` s | `-1.8285` s (`-28.75%`) | | `agent_loop/slowest/total_time` | `899.3761` s | `724.6898` s | `-174.6863` s (`-19.42%`) | Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Summary
This PR is Task 1 (Phase 1) for routing roadmap (#5442):
GlobalRequestLoadBalanceras runtime single source of truth for routing.request_id -> server) + least-loaded routing.Code Scope
verl/experimental/agent_loop/agent_loop.pyacquire/release)verl/experimental/fully_async_policy/agent_loop/agent_loop.pytests/experimental/agent_loop/test_basic_agent_loop.pyExperiment Configuration (Copied)
Model/Data:
Runtime config (from run):
MAX_ASSISTANT_TURNS=50 MAX_PROMPT_LENGTH=24488 RESPONSE_LENGTH=512 MAX_TOKENS=512 NNODES=3 N_GPU_PER_NODE=8 HARDWARE=3 x H100 nodes SP_SIZE=1 MICRO_BATCH_SIZE_PER_GPU=1 PPO_MINI_BATCH_SIZE=6 TRAIN_BATCH_SIZE=24 ROLLOUT_N=16 STRATEGY=fsdp2 ENGINE=vllm DUMP_VALIDATION_DATA=true DUMP_ROLLOUT_DATA=false DUMP_TASK_SCORES=false DUMP_ROLLOUT_TIMING=true FILTER_GROUPS=true DETERMINISTIC=false TRAJECTORY_SPLITTING=true USE_FLEX_ATTENTION_HETERO=false DUMMY_MODE=disabled RAY_DASHBOARD_ADDRESS=http://127.0.0.1:8265 nsys_prof=false ENABLE_PROMETHEUS_MONITORING=true PROMETHEUS_PORT=9090 SCRIPT_DIR=/home/ubuntu/verl-grounding PROMETHEUS_CONFIG_FILE=/home/ubuntu/verl-grounding/gui_scripts/vllm_monitoring/prometheus.yaml PROMETHEUS_SERVED_MODEL_NAME=qwen2_5_vl_32b EXPERIMENT_NAME=gui-agent-fsdp2-gb24-3nodes-20260223_145012 HF_MODEL_PATH=/mnt/data2/models_and_datasets/sft-32b-bigs1-1027-s2-0103-osworld TRAIN_DATA_PATH=data/nongoogledrive_02_04_no_infeasible_and_chart_editing_train.json TEST_DATA_PATH=data/nongoogledrive_02_04_no_infeasible_and_chart_editing_test.json PROJECT_NAME=verl_grounding SAVE_PATH=/mnt/data2/saves/verl_grounding/gui-agent-fsdp2-gb24-3nodes-20260223_145012 ALL_OFFLOAD=true ACTOR_PARAM_OFFLOAD=true ACTOR_GRAD_OFFLOAD=true ACTOR_OPTIMIZER_OFFLOAD=true REF_PARAM_OFFLOAD=true PROMETHEUS_PARAMS='actor_rollout_ref.rollout.disable_log_stats=False actor_rollout_ref.rollout.prometheus.enable=True actor_rollout_ref.rollout.prometheus.port=9090 actor_rollout_ref.rollout.prometheus.file=/home/ubuntu/verl-grounding/gui_scripts/vllm_monitoring/prometheus.yaml actor_rollout_ref.rollout.prometheus.served_model_name=qwen2_5_vl_32b'Load Distribution Comparison (Placeholder)
Before (baseline):

After (optimized):

Metrics (Before PR -> After PR)
agent_loop/per_step/generate_sequences/mean6.3596s4.5312s-1.8285s (-28.75%)agent_loop/slowest/total_time899.3761s724.6898s-174.6863s (-19.42%)