Skip to content

[fsdp2] feat: add per-layer GPU optimizer step with async prefetch pipeline#5364

Open
aoshen524 wants to merge 4 commits intoverl-project:mainfrom
aoshen524:feat/per-layer-optimizer-step
Open

[fsdp2] feat: add per-layer GPU optimizer step with async prefetch pipeline#5364
aoshen524 wants to merge 4 commits intoverl-project:mainfrom
aoshen524:feat/per-layer-optimizer-step

Conversation

@aoshen524
Copy link
Contributor

@aoshen524 aoshen524 commented Feb 21, 2026

Summary

For very large models, optimizer states can become a major memory bottleneck.

In the default path, Adam initializes states lazily at the first optimizer.step(), which can trigger OOM.

Also, optimizer states are not needed during forward/backward. Keeping full optimizer states resident on GPU outside optimizer step only increases peak memory pressure.

This PR introduces per_layer_optimizer_step to stream optimizer states layer-by-layer only during optimizer step, instead of materializing full optimizer states on GPU at once.

Basic Usage

Enable the following flags in actor FSDP config:

actor:
  fsdp_config:
    per_layer_optimizer_step: true
    optimizer_step_prefetch_layers: 1

Experiments

Comparison setup

  • Baseline: default path without per_layer_optimizer_step
  • Variant: enable per_layer_optimizer_step
  • Goal: measure optimizer-step memory behavior and step-time impact

Results

  • Baseline can OOM at the first optimizer step due to Adam lazy state init + temporary tensors:
    • optimizer states: ~31.16 GB
    • temporary tensor pressure (e.g. foreach sqrt path): ~15.58 GB
  • With per_layer_optimizer_step, optimizer-step memory increase is small and controlled:
    • optimizer-step peak increment: ~+1.68 GB (alloc), ~+1.70 GB (device)
  • Runtime impact is limited to a small optimizer-step overhead from H2D/D2H streaming:
    • optimizer-step time observed: ~2.86s to ~5.05s

Precision alignment (grad dump comparison)

To verify that the per-layer optimizer step does not affect training numerics, we compared per-parameter gradient tensors dumped before the optimizer step between the baseline and optimized paths.

Setup:

  • Model: Qwen2.5-VL (7B), FSDP2 with SP=2, 4 GPUs, NNODES=1
  • Deterministic mode enabled (full_determinism=True, seed=42)
  • Hardcoded advantages (VERL_HARDCODE_ADVANTAGE=1) to eliminate reward randomness
  • Grad dump via VERL_PRECISION_DUMP_GRADS (saves per-param grad tensors as .pt files)
  • Comparison tool: tools/compare_grads.py (per-param max_diff, mean_diff, cosine_sim)

Result (step 0, rank 0, 729 params):

Scope Params max_diff mean_diff cosine_sim
language 338 0.00e+00 0.00e+00 ~1.0
vision 390 0.00e+00 0.00e+00 ~1.0
other 1 0.00e+00 0.00e+00 ~1.0

All 729 parameters have bitwise identical gradients (max_diff = 0) between baseline and optimized paths. This confirms that the per-layer optimizer step does not alter forward/backward computation — it only changes how optimizer states are streamed during the step itself.

Reviewer takeaway

  • This change primarily addresses optimizer-step memory risk (especially first-step OOM) by avoiding full-state GPU materialization.
  • The tradeoff is a mild optimizer-step time increase, while keeping memory behavior significantly safer.
  • Gradient-level precision is verified to be bitwise identical with the baseline.

…peline

When FSDP2 + optimizer offload trains large models (e.g., 32B), CPU Adam
takes ~324s processing ~67GB optimizer states. Loading all states to GPU
at once causes OOM.

Per-layer GPU optimizer step streams 1-2 layers at a time (~1.5GB each)
to GPU using 3-stream async pipeline (H2D/Compute/D2H), achieving
~50-80x speedup (324s -> 4.4s). Supports both CPUOffloadPolicy=True
(full offload) and False (optimizer-only offload) modes.

Changes:
- Add PerLayerGPUOptimizerStep class in fsdp_utils.py
- Add per_layer_optimizer_step, optimizer_step_prefetch_layers config
- Wire into fsdp_workers.py update_actor() with stepper lifecycle
- Add optimizer step dispatch + perf metrics in dp_actor.py
- Add 5 unit tests (layer grouping, correctness, multi-step, prefetch)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request introduces a high-performance per-layer GPU optimizer step for FSDP2, which significantly reduces the time spent in Adam updates when using optimizer offload. The implementation uses a 3-stream async pipeline to overlap data transfers with computation. While the core logic is sound and provides impressive speedups, there are several critical efficiency and correctness issues that should be addressed:

  1. Performance Bottlenecks: The use of torch.cuda.empty_cache() twice per iteration and the creation of new CUDA streams on every step will degrade performance.
  2. Resource Management: The stepper is recreated every iteration, leading to redundant module scanning and parameter grouping.
  3. Correctness: The peak memory metric is not reset per step, and there is a potential crash if the optimizer's step state is a Python float.
  4. Async Pipeline Stalls: Parameters and gradients are not pinned when on CPU, which makes the 'async' transfers synchronous and stalls the pipeline.

Comment on lines +909 to +912
self.device = torch.device(f"cuda:{device_id}") if isinstance(device_id, int) else torch.device(device_id)
self.prefetch_layers = prefetch_layers
self._layer_param_groups = self._build_layer_groups(model)
self._init_states_and_pin()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Creating new CUDA streams inside the step() method on every call is inefficient. It is better to initialize the h2d_stream and d2h_stream once in the constructor and reuse them.

Suggested change
self.device = torch.device(f"cuda:{device_id}") if isinstance(device_id, int) else torch.device(device_id)
self.prefetch_layers = prefetch_layers
self._layer_param_groups = self._build_layer_groups(model)
self._init_states_and_pin()
self.device = torch.device(f"cuda:{device_id}") if isinstance(device_id, int) else torch.device(device_id)
self.prefetch_layers = prefetch_layers
self._layer_param_groups = self._build_layer_groups(model)
self._init_states_and_pin()
self.h2d_stream = torch.cuda.Stream(device=self.device)
self.d2h_stream = torch.cuda.Stream(device=self.device)

Comment on lines +981 to +990
for key in ("exp_avg", "exp_avg_sq"):
if key in state:
local = self._get_local_tensor(state[key])
if local.device.type != "cpu":
state[key] = local.to("cpu")
# Pin optimizer state tensors for async transfers
for key in ("exp_avg", "exp_avg_sq", "step"):
local = self._get_local_tensor(state[key])
if local.device.type == "cpu" and not local.is_pinned():
local.data = local.pin_memory()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This block has two issues:

  1. It doesn't handle the case where state['step'] is a Python float (common in some Adam implementations), which will cause an AttributeError when accessing .device or a crash in _prefetch_layer.
  2. To ensure the 3-stream pipeline is truly asynchronous, the parameters and gradients should also be pinned if they reside on the CPU (offload mode). Otherwise, to(non_blocking=True) will behave synchronously.
Suggested change
for key in ("exp_avg", "exp_avg_sq"):
if key in state:
local = self._get_local_tensor(state[key])
if local.device.type != "cpu":
state[key] = local.to("cpu")
# Pin optimizer state tensors for async transfers
for key in ("exp_avg", "exp_avg_sq", "step"):
local = self._get_local_tensor(state[key])
if local.device.type == "cpu" and not local.is_pinned():
local.data = local.pin_memory()
for key in ("exp_avg", "exp_avg_sq", "step"):
if key in state:
val = state[key]
if not isinstance(val, torch.Tensor):
state[key] = torch.tensor(float(val), dtype=torch.float32, device="cpu")
else:
local = self._get_local_tensor(val)
if local.device.type != "cpu":
state[key] = local.to("cpu")
# Pin optimizer state tensors and param/grad for async transfers
for key in ("exp_avg", "exp_avg_sq", "step"):
local = self._get_local_tensor(state[key])
if local.device.type == "cpu" and not local.is_pinned():
local.data = local.pin_memory()
for t in (param, param.grad):
if t is not None:
local_t = self._get_local_tensor(t.data)
if local_t.device.type == "cpu" and not local_t.is_pinned():
local_t.data = local_t.pin_memory()

Comment on lines +1103 to +1105
h2d_stream = torch.cuda.Stream(device=self.device)
d2h_stream = torch.cuda.Stream(device=self.device)
compute_stream = torch.cuda.current_stream(self.device)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Reuse the persistent streams created in the constructor to avoid the overhead of stream creation on every step. Also, reset the peak memory statistics at the start of the step to ensure the peak_memory_gb metric reflects the usage of the current step rather than the global peak.

Suggested change
h2d_stream = torch.cuda.Stream(device=self.device)
d2h_stream = torch.cuda.Stream(device=self.device)
compute_stream = torch.cuda.current_stream(self.device)
torch.cuda.reset_peak_memory_stats(self.device)
h2d_stream = self.h2d_stream
d2h_stream = self.d2h_stream
compute_stream = torch.cuda.current_stream(self.device)

# blocks to CUDA driver so forward/backward can't repurpose them.
# Without this, each optimizer step leaks ~1.7 GB of device memory
# because caching allocator blocks get "stolen" by gradient allocation.
torch.cuda.empty_cache()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

torch.cuda.empty_cache() is an extremely expensive operation that synchronizes the GPU and defragments memory. Calling it at the end of every optimizer step will significantly degrade performance and negate the benefits of the async pipeline. If memory pressure is the concern, consider making this optional or investigating why the caching allocator isn't reusing blocks efficiently.

Comment on lines +994 to +1001
if _use_per_layer:
stepper = PerLayerGPUOptimizerStep(
model=self.actor_module_fsdp,
optimizer=self.actor_optimizer,
device_id=get_device_id(),
prefetch_layers=self.config.actor.fsdp_config.get("optimizer_step_prefetch_layers", 1),
)
self.actor._per_layer_optimizer_stepper = stepper
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Instantiating the stepper on every update_actor call is inefficient because it re-scans the model modules and re-groups parameters every iteration. The stepper should be cached in self.actor and reused.

Suggested change
if _use_per_layer:
stepper = PerLayerGPUOptimizerStep(
model=self.actor_module_fsdp,
optimizer=self.actor_optimizer,
device_id=get_device_id(),
prefetch_layers=self.config.actor.fsdp_config.get("optimizer_step_prefetch_layers", 1),
)
self.actor._per_layer_optimizer_stepper = stepper
if _use_per_layer:
stepper = getattr(self.actor, "_per_layer_optimizer_stepper", None)
if stepper is None:
stepper = PerLayerGPUOptimizerStep(
model=self.actor_module_fsdp,
optimizer=self.actor_optimizer,
device_id=get_device_id(),
prefetch_layers=self.config.actor.fsdp_config.get("optimizer_step_prefetch_layers", 1),
)
self.actor._per_layer_optimizer_stepper = stepper

Comment on lines +1034 to +1036
if _use_per_layer:
self.actor._per_layer_optimizer_stepper = None
torch.cuda.empty_cache()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Removing the stepper and calling empty_cache() here prevents caching and adds a redundant, expensive synchronization point. If the stepper is cached as suggested, this cleanup block should be removed or moved to a proper destructor/cleanup method.

aoshen524 and others added 3 commits February 21, 2026 17:06
Breaking change: PerLayerGPUOptimizerStep now requires params/grads
on GPU (offload_policy=False). Using CPUOffloadPolicy raises ValueError.

Changes based on Gemini review feedback:
- Move CUDA stream creation to __init__ (reuse across step() calls)
- Handle state['step'] as Python float/int (defensive)
- Add reset_peak_memory_stats for accurate per-step peak metrics
- Fix decoupled_weight_decay: detect AdamW via isinstance instead of
  relying on param_group key (was silently wrong for AdamW)
- Cache stepper in fsdp_workers.py (avoid repeated module scan/pin)
- Remove per-call stepper destruction (cached for reuse)

Simplifications from GPU-only mode:
- _prefetch_layer: params/grads are references (no H2D copy needed)
- _offload_layer: only optimizer states D2H (params updated in-place)
- _validate_gpu_params: explicit check at init time
- Tests: replace CPU offload test with ValueError assertion test

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…alidation

- Assert AdamW optimizer (TypeError if not)
- Add _validate_single_hyperparam_set(): ensures all param_groups have
  identical hyperparams since per-layer step processes by layer not group
- _run_adam_for_layer now mirrors Adam.step() (adam.py:248-270):
  reads all hyperparams from group dict, computes has_complex dynamically
- Remove defensive state['step'] float/int handling
- Remove _decoupled_weight_decay instance var (use group dict directly)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…e path)

Support per_layer_optimizer_step in the FSDPEngine (disable_legacy_worker=True)
code path:
- initialize(): create PerLayerGPUOptimizerStep after _build_model_optimizer()
  while params are still on GPU, with offload_policy validation
- optimizer_step(): use stepper.step() instead of optimizer.step()
- to(): skip bulk optimizer load/offload when per-layer stepper is active
  (stepper manages its own optimizer states via async H2D/D2H)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
aoshen524 added a commit to aoshen524/AReaL that referenced this pull request Mar 5, 2026
…pipeline

Port of verl-project/verl#5364. Instead of materializing all optimizer
states on GPU at once (~46.74GB for 7B), streams 1-2 layers at a time
(~1.68GB) using a 3-stream async pipeline (H2D/compute/D2H), achieving
~50-80x speedup over CPU Adam while avoiding OOM.

Changes:
- Add PerLayerGPUOptimizerStep class in fsdp_utils/optimizer.py
- Add per_layer_optimizer_step and optimizer_step_prefetch_layers config fields
- Wire into FSDPEngine.optimizer_step() with performance metrics
- Add 5 test cases covering layer grouping, correctness, multi-step, and prefetch

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant