[tool, rollout] refactor: cache tool list initialization with MCP timeout#5426
[tool, rollout] refactor: cache tool list initialization with MCP timeout#5426denismegerle wants to merge 3 commits intoverl-project:mainfrom
Conversation
- Updated the tool initialization process in ToolAgentLoop to utilize ToolListCache for improved efficiency and reduced redundancy. - Enhanced the initialize_tools_from_config function to support timeout handling during tool initialization, improving robustness. - Introduced a caching mechanism to store initialized tools, minimizing repeated initialization and optimizing performance.
Add a small unit test that validates ToolListCache caching semantics, deep-copy isolation, and retry/backoff behavior. Made-with: Cursor
|
Denis Megerle seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
There was a problem hiding this comment.
Code Review
This pull request introduces a process-local cache for tool list initialization and adds a timeout for MCP tool initialization, which are valuable improvements for performance and robustness. The implementation is mostly solid, with good test coverage for the new caching and retry logic. However, I've identified a significant performance bottleneck in the cache implementation related to lock contention. My review includes a detailed comment and a code suggestion to address this issue by refactoring the locking strategy to avoid holding a global lock during long-running I/O operations.
| def get_tool_list(cls, tools_config_file: str | None) -> list: | ||
| if not tools_config_file: | ||
| return [] | ||
|
|
||
| with cls._lock: | ||
| cached = cls._cache.get(tools_config_file) | ||
| if cached is None: | ||
| retries, timeout_s, backoff_base_s, backoff_max_s = cls._get_init_policy() | ||
| logger.debug( | ||
| "[ToolListCache] cache miss for %s, initializing tools (retries=%s, timeout_s=%.2f)", | ||
| tools_config_file, | ||
| retries, | ||
| timeout_s, | ||
| ) | ||
| last_error = None | ||
| for attempt in range(1, retries + 1): | ||
| try: | ||
| cached = initialize_tools_from_config(tools_config_file, mcp_init_timeout_s=timeout_s) | ||
| break | ||
| except Exception as e: | ||
| last_error = e | ||
| if attempt >= retries: | ||
| raise RuntimeError( | ||
| f"[ToolListCache] failed to initialize tools for {tools_config_file} " | ||
| f"after {retries} attempts" | ||
| ) from e | ||
|
|
||
| backoff_s = min(backoff_max_s, backoff_base_s * (2 ** (attempt - 1))) | ||
| logger.warning( | ||
| "[ToolListCache] init attempt %s/%s failed for %s: %s. Retrying in %.2fs", | ||
| attempt, | ||
| retries, | ||
| tools_config_file, | ||
| repr(e), | ||
| backoff_s, | ||
| ) | ||
| if backoff_s > 0: | ||
| time.sleep(backoff_s) | ||
|
|
||
| if cached is None: | ||
| # Defensive fallback: should be unreachable due to RuntimeError above. | ||
| raise RuntimeError( | ||
| f"[ToolListCache] initialization produced no tool list for {tools_config_file}" | ||
| ) from last_error | ||
| cls._cache[tools_config_file] = cached | ||
| else: | ||
| logger.debug(f"[ToolListCache] cache hit for {tools_config_file}") | ||
|
|
||
| return copy.deepcopy(cached) |
There was a problem hiding this comment.
The current implementation of get_tool_list holds a global lock (cls._lock) for the entire duration of tool initialization. This includes potentially long-running operations like network calls in initialize_tools_from_config and time.sleep() during backoff. This creates a major contention point, as any thread trying to access the cache will be blocked, even if it's for a different and already-cached tool list. This can severely degrade performance under concurrent load.
To resolve this, the lock should only be held for brief, critical sections that access or modify the shared cache dictionary. The expensive initialization logic should be executed outside of the lock. A common and robust pattern to handle this and prevent the "thundering herd" problem (multiple threads trying to initialize the same resource) is to use a concurrent.futures.Future object as a placeholder in the cache while initialization is in progress.
I'm suggesting a refactoring of this method to implement this pattern, which will significantly improve concurrency and performance.
@classmethod
def get_tool_list(cls, tools_config_file: str | None) -> list:
if not tools_config_file:
return []
with cls._lock:
cached_item = cls._cache.get(tools_config_file)
if cached_item is not None:
if isinstance(cached_item, concurrent.futures.Future):
logger.debug(f"[ToolListCache] waiting for initialization of {tools_config_file}")
return copy.deepcopy(cached_item.result()) # This can re-raise exceptions from the initializer.
logger.debug(f"[ToolListCache] cache hit for {tools_config_file}")
return copy.deepcopy(cached_item)
# Cache miss. We will attempt to initialize.
with cls._lock:
# Double-check in case another thread initialized while we waited for the lock.
cached_item = cls._cache.get(tools_config_file)
if cached_item is not None:
if isinstance(cached_item, concurrent.futures.Future):
return copy.deepcopy(cached_item.result())
return copy.deepcopy(cached_item)
# We are the initializer. Place a future in the cache as a placeholder.
future = concurrent.futures.Future()
cls._cache[tools_config_file] = future
# Perform initialization outside the lock.
try:
retries, timeout_s, backoff_base_s, backoff_max_s = cls._get_init_policy()
logger.debug(
"[ToolListCache] cache miss for %s, initializing tools (retries=%s, timeout_s=%.2f)",
tools_config_file,
retries,
timeout_s,
)
last_error = None
for attempt in range(1, retries + 1):
try:
initialized_tools = initialize_tools_from_config(tools_config_file, mcp_init_timeout_s=timeout_s)
future.set_result(initialized_tools)
with cls._lock:
cls._cache[tools_config_file] = initialized_tools
return copy.deepcopy(initialized_tools)
except Exception as e:
last_error = e
if attempt >= retries:
raise RuntimeError(
f"[ToolListCache] failed to initialize tools for {tools_config_file} "
f"after {retries} attempts"
) from e
backoff_s = min(backoff_max_s, backoff_base_s * (2 ** (attempt - 1)))
logger.warning(
"[ToolListCache] init attempt %s/%s failed for %s: %s. Retrying in %.2fs",
attempt,
retries,
tools_config_file,
repr(e),
backoff_s,
)
if backoff_s > 0:
time.sleep(backoff_s)
# This part should be unreachable if the loop above either returns or raises.
raise RuntimeError(
f"[ToolListCache] initialization produced no tool list for {tools_config_file}"
) from last_error
except Exception as e:
# On failure, set exception on future and remove placeholder from cache.
future.set_exception(e)
with cls._lock:
if cls._cache.get(tools_config_file) is future:
del cls._cache[tools_config_file]
raise…erl-project#1105) if we use sglang as the rollout engine, we should export SGL_DISABLE_TP_MEMORY_INBALANCE_CHECK to avoid that the memory capacity is unbalanced, please refer to [verl-project#5426 in sglang](sgl-project/sglang#5426) # why we should export SGL_DISABLE_TP_MEMORY_INBALANCE_CHECK when using SGLang as the rollout engine in verl? 1. verl initializes a SGlangRollout module during rollout, which is used to evaluate/generate samples. 2. SGLangRollout will initialize VerlEngine, further initialize a torch. Distributed. DeviceMesh, used to support the TP. 3. DeviceMesh.init () internally checks the free video memory of all participating devices, and if the difference is too large (more than about 10%), it directly reports an error, preventing initialization failures or communication deadlock. # Why might there be inconsistent graphic memory? ## Ray Distributed Actor loads the model at different times: verl uses ray multi-process multi-gpu concurrent training, and each `WorkerDict` may be called at different times: `self.rollout = SGLangRollout(...)` different workers initialize the model at different times → different memory usage. ## Delayed initialization causes memory bias Some workers enter the model loading/infer process earlier than others, such as `generate_sequences()` or `compute_log_prob()`. The early-loaded worker video memory has been eaten by the model, and the late-loaded worker video memory is still empty → the graphic memory gap is large. ## Verl+SGLang's TP initialization goes "all device broadcast", but there is no uniform release timing SGLangRollout only needs to involve the part of the graphics card used by the rollout machine, but its VerlEngine initialization calls torch.distribut.init process group() and broadcast a bunch of weights. Result in: Non-rollout cards also participate in communication; Then initialize DeviceMesh, and the error "inconsistent memory" is reported. ## Different loading modes of FSDP/TP models also cause deviations if the following parameters are set ``` actor.fsdp_config.param_offload=True ref.fsdp_config.param_offload=True ``` Some worker parameters are on the CPU, and some parameters are shard to the GPU in advance. This also creates an asymmetric distribution of video memory. --------- Co-authored-by: ocss884 <ocss.lin@gmail.com>
What does this PR do?
Currently in high concurrency situations the downstream mcp servers might congest, which leads to verl hanging indefinitely in some tool agent loop instances. To avoid this, this MR includes 1) retry for tool schema retrieval and 2) assuming fixed tool schemas, a caching mechanism per config file.
Checklist Before Starting
[{modules}] {type}: {description}(This will be checked by the CI){modules}includefsdp,megatron,veomni,sglang,vllm,rollout,trainer,ci,training_utils,recipe,hardware,deployment,ray,worker,single_controller,misc,perf,model,algo,env,tool,ckpt,doc,data,cfg,reward,like[megatron, fsdp, doc]{type}is infeat,fix,refactor,chore,test[BREAKING]to the beginning of the title.[BREAKING][fsdp, megatron] feat: dynamic batchingTest
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaystests/utils/test_tool_list_cache_on_cpu.pyfor cache semantics (deep-copy isolation + retry/backoff).API and Usage Example
Design & Code Changes
ToolListCacheinverl/tools/utils/tool_registry.pykeyed by tool config path.mcp_init_timeout_stoinitialize_tools_from_config(...)and enforce it on MCP init.ToolAgentLoopto useToolListCache.get_tool_list(...).Notes / trade-offs:
ToolListCachecurrently returnscopy.deepcopy(...)of cached tool objects to avoid shared mutable state across loops. If any tool implementation is not deepcopy-safe (e.g. holds non-copyable resources), we can revise the cache to store schemas/configs instead of tool instances.Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaysci-requestchannel in theverlSlack workspace. (If not accessible, please try the Feishu group (飞书群).)recipesubmodule, please also update the reference to the submodule commit viagit submodule update --remoteorcd recipe && git pull origin main.