Skip to content

[trainer] feat: add new trainer with TranferQueue#5401

Merged
wuxibin89 merged 53 commits intoverl-project:mainfrom
wuxibin89:wuxibin/tq_trainer
Apr 10, 2026
Merged

[trainer] feat: add new trainer with TranferQueue#5401
wuxibin89 merged 53 commits intoverl-project:mainfrom
wuxibin89:wuxibin/tq_trainer

Conversation

@wuxibin89
Copy link
Copy Markdown
Collaborator

@wuxibin89 wuxibin89 commented Feb 25, 2026

What does this PR do?

This PR introduces main_ppo_sync.py, a new synchronous PPO trainer that decouples control flow from data flow by integrating TransferQueue as the data plane.

Key innovation: The trainer (single-controller) now only orchestrates metadata (KVBatchMeta), while large tensors are transferred directly between workers via TransferQueue using zero-copy serialization. This eliminates the controller bottleneck in large-scale RL training.

See RFC #5400 for detailed design discussion.

Performance Results

Comprehensive benchmarks on both GPU (H100/H20) and NPU (A3) platforms demonstrate 7%–49.2% E2E performance gains:

Performance Comparison Baseline (legacy) vs TransferQueue
Platform Scale Model Dataset Prompt/Response Len GBS Samples per Prompt E2E Gain
GPU H100 2×8 Qwen3-30B-A3B DAPO-Math-17k 2K/8K 128 8 +14.1%
GPU H100 4×8 Qwen3-VL-30B-A3B Geo3K 1K/2K 128 8 +12.5%
GPU H100 2×8 Qwen3-VL-30B-A3B OneThinker 16K/2K 128 8 +22.4%
GPU H100 16×8 Qwen3-VL-30B-A3B OneThinker 16K/2K 128 8 +49.2%
NPU A3 2×16 Qwen3-VL-30B-A3B Geo3K 1K/1K 128 4 +7%
NPU A3 2×16 Qwen3-VL-30B-A3B OneThinker 16K/1K 128 4 +20%

Details for reproducing the reported performance can be found in this blog post.

DAPO-Math-17k + Router Replay R3, Qwen3-30B-A3B, 2x8 H100, 14.1% E2E Improvement

image

Geo3k, 4x8 H100, Qwen3-VL-30B-A3B, 12.5% E2E Improvement

image

Geo3k, 2x16 A3, Qwen3-VL-30B-A3B, 7% E2E Improvement

OneThinker, 2x8 H100, Qwen3-VL-30B-A3B, 22.4% E2E Improvement

image

OneThinker, 2x16 A3, Qwen3-VL-30B-A3B, 20% E2E Improvement

OneThinker, 16x8 H100, Qwen3-VL-30B-A3B, 49.2% E2E Improvement

image

Accuracy Test

DAPO17k + AIME24+ Qwen3-8B-Base, 1x8 H200

image

DAPO17k + AIME24 + Qwen3-8B-Base, 1x8 A3

image

Geo3k, Qwen3-VL-30B-A3B, 4x8 H100

image

TransferQueue Stress Test

We provide a stress test report that demonstrates more than 8192 concurrent clients writing 2 TB of data into TransferQueue across 4 nodes. The system remains stable without any crashes or data loss.

During early development, we have successfully deployed TransferQueue x verl on 64x16 A3 nodes for Qwen3-235B, with seq_len=64k, GBS=256, N_Sample=16.

Scripts

set -x

log_dir="./logs"
mkdir -p $log_dir
timestamp=$(date +"%Y%m%d%H%M%S")

# ===================================== Algorithm =====================================
adv_estimator=grpo
loss_mode=vanilla

# reference policy
use_kl_in_reward=False
kl_coef=0.001
use_kl_loss=False
kl_loss_coef=0.001

clip_ratio_low=0.2
clip_ratio_high=0.28

actor_lr=1e-6
critic_lr=2e-6
gae_gamma=1.0
gae_lam=0.95
critic_warmup=0

# rollout correction
rollout_is="sequence"                     # Self-normalized sequence-level IS
rollout_is_threshold=2.0                  # Upper threshold for IS weights
rollout_is_batch_normalize="true"         # Self-normalization (mean=1.0)

# ===================================== Data/Model =====================================
MODEL_ID=${MODEL_ID:-Qwen/Qwen3-8B-Base}
MODEL_PATH=${MODEL_PATH:-${HOME}/models/${MODEL_ID}}

TASK_NAME=${TASK_NAME:-dapo}
if [[ $TASK_NAME == "gsm8k" ]]; then
    train_files=/datasets/gsm8k/train.parquet
    test_files=/datasets/gsm8k/train.parquet
    actor_model_path=/models/${MODEL_ID}
    apply_rope_fusion=True
elif [[ $TASK_NAME == "geo3k" ]]; then
    train_files=/datasets/geo3k/train.parquet
    test_files=/datasets/geo3k/test.parquet
    actor_model_path=/models/${MODEL_ID}
    apply_rope_fusion=False
elif [[ $TASK_NAME == "dapo" ]]; then
    train_files=/datasets/dapo-math-17k/dapo-math-17k.parquet
    test_files=/datasets/aime24/aime-2024.parquet
    actor_model_path=/models/${MODEL_ID}
    max_prompt_length=$((1024 * 2))
    max_response_length=$((1024 * 8))
else
    echo "TASK_NAME $TASK_NAME not supported"
    exit 1
fi

critic_model_path=$actor_model_path

max_prompt_length=${max_prompt_length:-$((1024 * 1))}
max_response_length=${max_response_length:-$((1024 * 2))}
train_batch_size=128
ppo_mini_batch_size=32
n_resp_per_prompt=8
n_resp_per_prompt_val=1

MODEL_NAME_ONLY=${MODEL_ID##*/}
log_file="${log_dir}/${MODEL_NAME_ONLY}_${DATASET_NAME}_transferqueue_NEW_TRAINER_longrun_${timestamp}.log"

n_gpus_training=8
# ===================================== Training =====================================
backend=${BACKEND:-fsdp} # fsdp, fsdp2, megatron

actor_max_token_len_per_gpu=$(((max_prompt_length + max_response_length)))
critic_max_token_len_per_gpu=$(((max_prompt_length + max_response_length) * 4))

USP_SIZE=2
ACTOR_FSDP_CONFIG="
    actor_rollout_ref.actor.fsdp_config.strategy=$backend \
    actor_rollout_ref.actor.fsdp_config.param_offload=True \
    actor_rollout_ref.actor.fsdp_config.optimizer_offload=True \
    actor_rollout_ref.actor.fsdp_config.ulysses_sequence_parallel_size=$USP_SIZE"

TP_SIZE=2
CP_SIZE=1
PP_SIZE=2
VPP_SIZE=null
EP_SIZE=1
ETP_SIZE=1
ACTOR_MEGATRON_CONFIG="
    actor_rollout_ref.actor.megatron.tensor_model_parallel_size=$TP_SIZE \
    actor_rollout_ref.actor.megatron.context_parallel_size=$CP_SIZE \
    actor_rollout_ref.actor.megatron.pipeline_model_parallel_size=$PP_SIZE \
    actor_rollout_ref.actor.megatron.virtual_pipeline_model_parallel_size=$VPP_SIZE \
    actor_rollout_ref.actor.megatron.expert_model_parallel_size=$EP_SIZE \
    actor_rollout_ref.actor.megatron.expert_tensor_parallel_size=$ETP_SIZE \
    actor_rollout_ref.actor.megatron.param_offload=True \
    actor_rollout_ref.actor.megatron.grad_offload=True \
    actor_rollout_ref.actor.megatron.optimizer_offload=True \
    +actor_rollout_ref.actor.megatron.override_transformer_config.moe_router_dtype=fp32 \
    +actor_rollout_ref.actor.megatron.override_transformer_config.moe_permute_fusion=True \
    +actor_rollout_ref.actor.megatron.override_transformer_config.recompute_method=uniform \
    +actor_rollout_ref.actor.megatron.override_transformer_config.recompute_granularity=full \
    +actor_rollout_ref.actor.megatron.override_transformer_config.recompute_num_layers=1 \
    +actor_rollout_ref.actor.megatron.override_transformer_config.apply_rope_fusion=$apply_rope_fusion \
    +actor_rollout_ref.actor.megatron.override_transformer_config.gradient_accumulation_fusion=True \
    actor_rollout_ref.actor.megatron.use_mbridge=True"

ACTOR_CONFIG="
    actor_rollout_ref.actor.optim.lr=$actor_lr \
    actor_rollout_ref.model.path=$actor_model_path \
    actor_rollout_ref.model.use_remove_padding=True \
    actor_rollout_ref.actor.use_kl_loss=$use_kl_loss \
    actor_rollout_ref.actor.kl_loss_coef=$kl_loss_coef \
    actor_rollout_ref.actor.clip_ratio_low=$clip_ratio_low \
    actor_rollout_ref.actor.clip_ratio_high=$clip_ratio_high \
    actor_rollout_ref.actor.clip_ratio_c=10.0 \
    actor_rollout_ref.actor.policy_loss.loss_mode=${loss_mode} \
    actor_rollout_ref.actor.use_dynamic_bsz=True \
    actor_rollout_ref.actor.ppo_mini_batch_size=$ppo_mini_batch_size \
    actor_rollout_ref.actor.ppo_max_token_len_per_gpu=$actor_max_token_len_per_gpu"

CIRITC_CONFIG="
    critic.optim.lr=$critic_lr \
    critic.model.path=$critic_model_path \
    critic.model.use_remove_padding=True \
    critic.ppo_max_token_len_per_gpu=$critic_max_token_len_per_gpu"

CRITIC_FSDP_CONFIG="${ACTOR_FSDP_CONFIG//actor_rollout_ref.actor/critic.model}"
CRITIC_MEGATRON_CONFIG="${ACTOR_MEGATRON_CONFIG//actor_rollout_ref.actor/critic}"

if [[ $backend == "megatron" ]]; then
    CONFIG_NAME=ppo_megatron_trainer
    ACTOR_CONFIG="$ACTOR_CONFIG $ACTOR_MEGATRON_CONFIG"
    if [[ $adv_estimator == "gae" ]]; then
        CIRITC_CONFIG="$CIRITC_CONFIG $CRITIC_MEGATRON_CONFIG"
    else
        CIRITC_CONFIG=""
    fi
else # fsdp, fsdp2
    CONFIG_NAME=ppo_trainer
    ACTOR_CONFIG="$ACTOR_CONFIG $ACTOR_FSDP_CONFIG"
    if [[ $adv_estimator == "gae" ]]; then
        CIRITC_CONFIG="$CIRITC_CONFIG $CRITIC_FSDP_CONFIG"
    else
        CIRITC_CONFIG=""
    fi
fi

# ===================================== Inference =====================================
rollout_name=vllm
infer_tp=2
infer_dp=1
infer_ep=1

ROLLOUT_CONFIG="
    actor_rollout_ref.rollout.name=$rollout_name \
    actor_rollout_ref.rollout.mode=async \
    actor_rollout_ref.rollout.tensor_model_parallel_size=$infer_tp \
    actor_rollout_ref.rollout.data_parallel_size=$infer_dp \
    actor_rollout_ref.rollout.expert_parallel_size=$infer_ep \
    actor_rollout_ref.rollout.gpu_memory_utilization=0.7 \
    actor_rollout_ref.rollout.n=$n_resp_per_prompt \
    actor_rollout_ref.rollout.val_kwargs.top_p=0.7 \
    actor_rollout_ref.rollout.val_kwargs.temperature=1.0 \
    actor_rollout_ref.rollout.calculate_log_probs=True \
    actor_rollout_ref.rollout.enforce_eager=True \
    actor_rollout_ref.rollout.val_kwargs.n=$n_resp_per_prompt_val"

# wandb
project_name=transfer_queue_test
experiment_name=${MODEL_NAME_ONLY}-$adv_estimator-$backend-$rollout_name
default_local_dir=${HOME}/checkpoint/$project_name/$experiment_name

python3 -m verl.trainer.main_ppo_sync \
    --config-path=./config \
    --config-name=$CONFIG_NAME \
    algorithm.adv_estimator=$adv_estimator \
    algorithm.use_kl_in_reward=$use_kl_in_reward \
    algorithm.kl_ctrl.kl_coef=$kl_coef \
    algorithm.gamma=$gae_gamma \
    algorithm.lam=$gae_lam \
    algorithm.rollout_correction.rollout_is=$rollout_is \
    data.train_files="$train_files" \
    data.val_files="$test_files" \
    data.return_raw_chat=True \
    data.train_batch_size=$train_batch_size \
    data.max_prompt_length=$max_prompt_length \
    data.max_response_length=$max_response_length \
    data.filter_overlong_prompts=False \
    data.truncation='error' \
    trainer.use_legacy_worker_impl=disable \
    trainer.critic_warmup=$critic_warmup \
    trainer.logger=['console'] \
    trainer.project_name=$project_name \
    trainer.experiment_name=$experiment_name \
    trainer.default_local_dir=$default_local_dir \
    trainer.n_gpus_per_node=${n_gpus_training} \
    trainer.nnodes=1 \
    trainer.val_before_train=False \
    trainer.val_only=False \
    trainer.log_val_generations=100 \
    trainer.save_freq=-1 \
    trainer.test_freq=5 \
    trainer.total_epochs=15 \
    $ACTOR_CONFIG \
    $CIRITC_CONFIG \
    $ROLLOUT_CONFIG \
    $@ 2>&1 | tee "$log_file"

@wuxibin89 wuxibin89 mentioned this pull request Feb 25, 2026
28 tasks
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new synchronous PPO trainer using TransferQueue, which is a significant architectural change. The new implementation includes a replay buffer and updated data handling utilities for nested tensors. However, the review identified several critical issues. Key methods like _save_checkpoint and _compute_reward_colocate are not implemented, which will lead to runtime crashes. There is also a critical bug in the response_to_nested utility function that will cause errors during tensor creation. Additionally, there are high-severity concerns regarding unsafe process termination and data dispatching logic that could lead to silent failures. These issues must be addressed before this PR can be merged.

Comment thread verl/trainer/main_ppo_sync.py Outdated
Comment on lines +801 to +804
def _compute_reward_colocate(self, batch: KVBatchMeta, metrics: dict) -> KVBatchMeta:
"""Compute the reward with colocate reward model."""
# TODO: add reward model
raise NotImplementedError
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The _compute_reward_colocate method is not implemented. This method is called in the training and validation steps when a reward model is used without a dedicated resource pool. This will cause the trainer to crash under that configuration.

Comment thread verl/workers/utils/padding.py Outdated
Comment thread verl/trainer/main_ppo_sync.py
Comment thread verl/trainer/main_ppo_sync.py
@ZhentaoFan
Copy link
Copy Markdown
Contributor

#5375 (comment) May take a look at this when you have some spare time. Thanks.

wuxibin89 pushed a commit that referenced this pull request Mar 2, 2026
### What does this PR do?

In verl 0.8, we will refactor the integration approach for
TransferQueue. For reference:
#5401.

This PR will first clean up the legacy code to facilitate the subsequent
TQ integration.

CC @zw0610 @wuxibin89 

### Checklist Before Starting

- [ ] Search for similar PRs. Paste at least one query link here: ...
- [ ] Format the PR title as `[{modules}] {type}: {description}` (This
will be checked by the CI)
- `{modules}` include `fsdp`, `megatron`, `veomni`, `sglang`, `vllm`,
`rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`,
`deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`,
`model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data`, `cfg`, `reward`,
`fully_async`, `one_step_off`
- If this PR involves multiple modules, separate them with `,` like
`[megatron, fsdp, doc]`
  - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test`
- If this PR breaks any API (CLI arguments, config, function signature,
etc.), add `[BREAKING]` to the beginning of the title.
  - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching`

### Test

> For changes that can not be tested by CI (e.g., algorithm
implementation, new model support), validate by experiment(s) and show
results like training curve plots, evaluation results, etc.

### API and Usage Example

> Demonstrate how the API changes if any, and provide usage example(s)
if possible.

```python
# Add code snippet or script demonstrating how to use this
```

### Design & Code Changes

> Demonstrate the high-level design if this PR is complex, and list the
specific changes.

### Checklist Before Submitting

> [!IMPORTANT]
> Please check all the following items before requesting a review,
otherwise the reviewer might deprioritize this PR for review.

- [ ] Read the [Contribute
Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md).
- [ ] Apply [pre-commit
checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting):
`pre-commit install && pre-commit run --all-files --show-diff-on-failure
--color=always`
- [ ] Add / Update [the
documentation](https://github.com/volcengine/verl/tree/main/docs).
- [ ] Add unit or end-to-end test(s) to [the CI
workflow](https://github.com/volcengine/verl/tree/main/.github/workflows)
to cover all the code. If not feasible, explain why: ...
- [ ] Once your PR is ready for CI, send a message in [the `ci-request`
channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the
`verl` Slack
workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ).
(If not accessible, please try [the Feishu group
(飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
- [ ] If your PR is related to the `recipe` submodule, please also
update the reference to the submodule commit via `git submodule update
--remote` or `cd recipe && git pull origin main`.

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
SchumiDing pushed a commit to SchumiDing/verl that referenced this pull request Mar 2, 2026
…l-project#5454)

### What does this PR do?

In verl 0.8, we will refactor the integration approach for
TransferQueue. For reference:
verl-project#5401.

This PR will first clean up the legacy code to facilitate the subsequent
TQ integration.

CC @zw0610 @wuxibin89 

### Checklist Before Starting

- [ ] Search for similar PRs. Paste at least one query link here: ...
- [ ] Format the PR title as `[{modules}] {type}: {description}` (This
will be checked by the CI)
- `{modules}` include `fsdp`, `megatron`, `veomni`, `sglang`, `vllm`,
`rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`,
`deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`,
`model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data`, `cfg`, `reward`,
`fully_async`, `one_step_off`
- If this PR involves multiple modules, separate them with `,` like
`[megatron, fsdp, doc]`
  - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test`
- If this PR breaks any API (CLI arguments, config, function signature,
etc.), add `[BREAKING]` to the beginning of the title.
  - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching`

### Test

> For changes that can not be tested by CI (e.g., algorithm
implementation, new model support), validate by experiment(s) and show
results like training curve plots, evaluation results, etc.

### API and Usage Example

> Demonstrate how the API changes if any, and provide usage example(s)
if possible.

```python
# Add code snippet or script demonstrating how to use this
```

### Design & Code Changes

> Demonstrate the high-level design if this PR is complex, and list the
specific changes.

### Checklist Before Submitting

> [!IMPORTANT]
> Please check all the following items before requesting a review,
otherwise the reviewer might deprioritize this PR for review.

- [ ] Read the [Contribute
Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md).
- [ ] Apply [pre-commit
checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting):
`pre-commit install && pre-commit run --all-files --show-diff-on-failure
--color=always`
- [ ] Add / Update [the
documentation](https://github.com/volcengine/verl/tree/main/docs).
- [ ] Add unit or end-to-end test(s) to [the CI
workflow](https://github.com/volcengine/verl/tree/main/.github/workflows)
to cover all the code. If not feasible, explain why: ...
- [ ] Once your PR is ready for CI, send a message in [the `ci-request`
channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the
`verl` Slack
workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ).
(If not accessible, please try [the Feishu group
(飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
- [ ] If your PR is related to the `recipe` submodule, please also
update the reference to the submodule commit via `git submodule update
--remote` or `cd recipe && git pull origin main`.

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Mar 2, 2026

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
4 out of 5 committers have signed the CLA.

✅ Begunner
✅ 0oshowero0
✅ ZhentaoFan
✅ wuxibin89
❌ xupinjie


Begunner seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

Comment thread verl/workers/engine_workers.py Outdated
@ZhentaoFan
Copy link
Copy Markdown
Contributor

A sidenote for future merge: there is a bug in metrics log: response/aborted_ratio + prompt_length/clip_ratio is near 100%. The root cause is the removal of padding in TQWorker. Inside the metric_utils.py, we should find a nice way to modify the compute_data_metrics(), which is currently designed for general usage.

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

fix sanity and annotation checks

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
sijyang pushed a commit to sijyang/verl that referenced this pull request Apr 1, 2026
…l-project#5454)

### What does this PR do?

In verl 0.8, we will refactor the integration approach for
TransferQueue. For reference:
verl-project#5401.

This PR will first clean up the legacy code to facilitate the subsequent
TQ integration.

CC @zw0610 @wuxibin89 

### Checklist Before Starting

- [ ] Search for similar PRs. Paste at least one query link here: ...
- [ ] Format the PR title as `[{modules}] {type}: {description}` (This
will be checked by the CI)
- `{modules}` include `fsdp`, `megatron`, `veomni`, `sglang`, `vllm`,
`rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`,
`deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`,
`model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data`, `cfg`, `reward`,
`fully_async`, `one_step_off`
- If this PR involves multiple modules, separate them with `,` like
`[megatron, fsdp, doc]`
  - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test`
- If this PR breaks any API (CLI arguments, config, function signature,
etc.), add `[BREAKING]` to the beginning of the title.
  - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching`

### Test

> For changes that can not be tested by CI (e.g., algorithm
implementation, new model support), validate by experiment(s) and show
results like training curve plots, evaluation results, etc.

### API and Usage Example

> Demonstrate how the API changes if any, and provide usage example(s)
if possible.

```python
# Add code snippet or script demonstrating how to use this
```

### Design & Code Changes

> Demonstrate the high-level design if this PR is complex, and list the
specific changes.

### Checklist Before Submitting

> [!IMPORTANT]
> Please check all the following items before requesting a review,
otherwise the reviewer might deprioritize this PR for review.

- [ ] Read the [Contribute
Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md).
- [ ] Apply [pre-commit
checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting):
`pre-commit install && pre-commit run --all-files --show-diff-on-failure
--color=always`
- [ ] Add / Update [the
documentation](https://github.com/volcengine/verl/tree/main/docs).
- [ ] Add unit or end-to-end test(s) to [the CI
workflow](https://github.com/volcengine/verl/tree/main/.github/workflows)
to cover all the code. If not feasible, explain why: ...
- [ ] Once your PR is ready for CI, send a message in [the `ci-request`
channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the
`verl` Slack
workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ).
(If not accessible, please try [the Feishu group
(飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
- [ ] If your PR is related to the `recipe` submodule, please also
update the reference to the submodule commit via `git submodule update
--remote` or `cd recipe && git pull origin main`.

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@wuxibin89
Copy link
Copy Markdown
Collaborator Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a synchronous PPO trainer (main_ppo_sync.py) leveraging TransferQueue and ReplayBuffer for zero-copy data transfer and flexible sampling. It includes a new tqbridge decorator to facilitate communication between the trainer and distributed workers using BatchMeta objects. Additionally, the PR adds support for multi-trajectory advantage estimation (GRPO) and optimizes metric computation by utilizing pre-calculated sequence lengths. Review feedback identifies a critical TypeError in the NonTensorStack initialization within list_of_dict_to_tensordict and a logging syntax error in the ReplayBuffer class.

else (
torch.nested.as_nested_tensor(val_list, layout=torch.jagged)
if val_list and all(isinstance(item, torch.Tensor) for item in val_list)
else NonTensorStack(*val_list)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The NonTensorStack constructor expects TensorDictBase objects (such as NonTensorData). Passing a list of raw objects (e.g., strings or integers) via *val_list will result in a TypeError. Use NonTensorStack.from_list with explicit NonTensorData wrapping to correctly handle non-tensor data, consistent with the implementation in get_tensordict at line 405.

Suggested change
else NonTensorStack(*val_list)
else NonTensorStack.from_list([NonTensorData(item) for item in val_list])

Comment thread verl/trainer/main_ppo_sync.py Outdated
tags.append(tag)
else:
logger.debug(f"Unknown status {tag['status']} for key {key}")
logger.info("partitions", self.partitions)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logging call is missing a format specifier. In Python's logging module, logger.info(msg, *args) expects msg to be a format string if additional arguments are provided. As written, self.partitions will be ignored and only the string "partitions" will be logged. Use a format string like "partitions: %s" to ensure the data is captured.

Suggested change
logger.info("partitions", self.partitions)
logger.info("partitions: %s", self.partitions)

@guillemgt
Copy link
Copy Markdown
Contributor

When an agent loop has multiple outputs for a given session, right now rewards are computed based on the last sequence only and propagated for the other outputs.
It could be useful to also allow reward workers to take in all outputs in a session and give a different reward to each one. For example, there are cases where the intermediate states may be relevant for the reward. This would also open the door to methods like self-play (giving different rewards to the outputs of each player).

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>

mock TQ to pass some UT

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@0oshowero0 0oshowero0 force-pushed the wuxibin/tq_trainer branch from e414fde to 776af09 Compare April 7, 2026 06:26
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Comment thread verl/workers/engine_workers.py Outdated
final_output = tu.get_tensordict(tensor_dict=model_output, non_tensor_dict={"metrics": final_metrics})
return final_output

@tqbridge(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="train"))
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@0oshowero0 We better hide tqbridge in @register

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

0oshowero0 and others added 2 commits April 8, 2026 14:14
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@wuxibin89 wuxibin89 merged commit 866a1ea into verl-project:main Apr 10, 2026
79 of 85 checks passed
jianjunzhong added a commit to jianjunzhong/verl that referenced this pull request Apr 10, 2026
wuxibin89 pushed a commit that referenced this pull request Apr 16, 2026
### What does this PR do?
### Background
Inside the current tq_trainer, AgentLoopWorkerTQ already supports the
multi-trajectory feature. However, during actual training, sample
(trajectory)-level padding is still required for each batch so that the
number of samples is divisible by both `dp_size` and `mini_batch_size`;
otherwise, an error will be thrown. This PR fixes the bug and addresses
the following considerations:

#### Upsampling:
1. **LCM alignment**: `% dp_size == 0` and `% mini_batch_size == 0` (and
`% critic_mini_batch_size == 0` if training the critic).
2. **Notice**:We should **not** use pad_dataproto_to_divisor() for
padding, as it takes first `pad_size` existing samples and would pollute
the grpo_adv, gradient, metrics and etc., which causes inconsistency.

#### Padding:
1. Padded samples use **independent UIDs** to avoid interfering with
GRPO advantage computation.
3. Padded samples are constructed with the **shortest possible
sequence** — one prompt token + one response token — to minimize
redundant computation.
4. An `is_padding` flag is added to the tags of padded samples to avoid
impacting the **accuracy metrics** such as score, reward, and response
length (while performance metrics still include padded samples).
5. Maintains the router_replay shape, position_ids shape and add
multi-modal inputs placeholder in VLM case when necessary.

### Related PR
1. #5636
2. #5401

### Verification Experiments with Multi-Trajectory Agent:
#### The three smallest primes — 2, 3, and 5 — are chosen to form the
relevant hyperparameters:
[dp=2, batch_size=45, mini_batch_size=15, rollout.n=8].

<img width="2160" height="1224" alt="20260410_225225_critic_score_mean"
src="https://github.com/user-attachments/assets/64de98fa-4356-45bd-abf7-6d5cb2fa9640"
/>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants