Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds a training engine implementation for diffusion models, specifically supporting Qwen Image models with FSDP-based distributed training and Flow Matching with Stochastic Differential Equations (SDE) schedulers.
Key Changes:
- Implements
DiffusersFSDPEnginewith support for LoRA, gradient checkpointing, and mixed precision training - Adds custom SDE scheduler (
FlowMatchSDEDiscreteScheduler) with log probability computation for reinforcement learning - Introduces modified Qwen Image pipeline with log probability tracking for training workflows
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
verl/workers/engine/fsdp/diffuser_impl.py |
Main FSDP engine implementation for diffusion models with 748 lines of model initialization, training, and checkpoint management |
verl/workers/engine/diffusers/utils.py |
Utility functions for preparing and loading diffusion pipelines to devices |
verl/workers/engine/diffusers/schedulers/scheduling_flow_match_sde_discrete.py |
Custom scheduler extending FlowMatchEulerDiscreteScheduler with SDE support and log probability calculation |
verl/workers/engine/diffusers/schedulers/__init__.py |
Module initialization for custom schedulers |
verl/workers/engine/diffusers/pipelines/pipeline_qwenimage.py |
Extended QwenImagePipeline with log probability tracking for RL training |
verl/workers/engine/diffusers/pipelines/__init__.py |
Module initialization for custom pipelines |
verl/workers/engine/diffusers/patch.py |
Utility to inject SDE scheduler into existing pipelines |
verl/workers/engine/diffusers/__init__.py |
Main module initialization for diffusers engine components |
verl/trainer/ppo/ray_diffusion_trainer.py |
Updates copyright header and removes unused parameter |
verl/trainer/main_flowgrpo.py |
Updates copyright header |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| sde_window_range (`tuple[int, int]`, *optional*, defaults to (0, 5)): | ||
| The range of the SDE window start index. Only used if `sde_window_size` is provided. | ||
| sde_type (`str`, *optional*, defaults to "sde"): | ||
| The type of SDE to use. Choose between "sde" |
There was a problem hiding this comment.
Incomplete documentation in docstring. The description for "sde_type" parameter ends abruptly with "Choose between 'sde'" without completing the sentence or listing the available options.
| The type of SDE to use. Choose between "sde" | |
| The type of SDE to use, e.g. `"sde"`. |
| attention_kwargs=self.attention_kwargs, | ||
| return_dict=False, | ||
| )[0] | ||
|
|
||
| if do_true_cfg: | ||
| with self.transformer.cache_context("uncond"): | ||
| neg_noise_pred = self.transformer( | ||
| hidden_states=latents, | ||
| timestep=timestep / 1000, | ||
| guidance=guidance, | ||
| encoder_hidden_states_mask=negative_prompt_embeds_mask, | ||
| encoder_hidden_states=negative_prompt_embeds, | ||
| img_shapes=img_shapes, | ||
| txt_seq_lens=negative_txt_seq_lens, | ||
| attention_kwargs=self.attention_kwargs, |
There was a problem hiding this comment.
Inconsistent attribute reference. Lines 358 and 372 reference 'self.attention_kwargs' but the attribute is set as 'self._attention_kwargs' on line 211. This mismatch will cause an AttributeError.
| @EngineRegistry.register(model_type="diffusion_model", backend=["fsdp", "fsdp2"], device=["cuda", "npu"]) | ||
| class DiffusersFSDPEngine(BaseEngine): | ||
| """ | ||
| Concrete Diffussers Engine implementation using PyTorch FullyShardedDataParallel (FSDP). |
There was a problem hiding this comment.
Typo in the docstring: "Diffussers" should be "Diffusers".
| Concrete Diffussers Engine implementation using PyTorch FullyShardedDataParallel (FSDP). | |
| Concrete Diffusers Engine implementation using PyTorch FullyShardedDataParallel (FSDP). |
| prev_sample (`torch.FloatTensor`, *optional*): | ||
| The sample from the previous timestep. If not provided, it will be sampled inside the function. | ||
| sde_type (`str`, *optional*, defaults to "sde"): | ||
| The type of SDE to use. Choose between "sde" and "cps |
There was a problem hiding this comment.
Incomplete documentation in docstring. The description for the "sde_type" parameter is cut off at "Choose between 'sde' and 'cps" - it should end with a closing quote and potentially additional information about which option to choose or their differences.
| The type of SDE to use. Choose between "sde" and "cps | |
| The type of SDE to use. Choose between "sde" and "cps". |
| else: | ||
| XLA_AVAILABLE = False | ||
|
|
||
| logger = logging.get_logger(__name__) |
There was a problem hiding this comment.
Missing function call for 'get_logger'. The code uses 'logging.get_logger(name)' but should use 'logging.getLogger(name)' instead. The correct method is 'getLogger' not 'get_logger'.
| logger = logging.get_logger(__name__) | |
| logger = logging.getLogger(__name__) |
| elif not self.transformer.config.guidance_embeds and guidance_scale is None: | ||
| guidance = None | ||
|
|
||
| if self.attention_kwargs is None: |
There was a problem hiding this comment.
Inconsistent attribute usage. The code sets 'self._attention_kwargs' on line 211 but checks 'self.attention_kwargs' (without underscore) on line 306. This will cause an AttributeError since the attribute name doesn't match.
| if self.attention_kwargs is None: | |
| if self._attention_kwargs is None: |
|
|
||
| # Load base model with specified configuration and dtype | ||
| module = self._build_module() | ||
| scheduler = self._build_scheduer() |
There was a problem hiding this comment.
Typo in method name: "_build_scheduer" should be "_build_scheduler". This will cause an AttributeError when called on line 401.
| scheduler = self._build_scheduer() | |
| scheduler = self._build_scheduler() |
* add training engine * fix init * fix typs
* add entroypoint (#1) * add training engine (#2) * add training engine * fix init * fix typs * move folders & make for two-forward pass in training loop (#4) * Add diffusion reward loop (#3) * init reward; add ocr reward * update disrm input * add unit test * pass ut * fix typos/bugs * update copyright * [fix] update customized reward func in UT (#5) * init reward; add ocr reward * update disrm input * add unit test * pass ut * fix typos/bugs * update copyright * update customized reward_fn * init dataset for Qwen-Image * pass UT * update return, update UT * pass UT * align with rl_dataset * pass UT * update filter long prompts * debug * clean code --------- Co-authored-by: Cheung Ka Wai <zhtmike@gmail.com>
* add training engine * fix init * fix typs
* add entroypoint (#1) * add training engine (#2) * add training engine * fix init * fix typs * move folders & make for two-forward pass in training loop (#4) * Add diffusion reward loop (#3) * init reward; add ocr reward * update disrm input * add unit test * pass ut * fix typos/bugs * update copyright * [fix] update customized reward func in UT (#5) * init reward; add ocr reward * update disrm input * add unit test * pass ut * fix typos/bugs * update copyright * update customized reward_fn * init dataset for Qwen-Image * pass UT * update return, update UT * pass UT * align with rl_dataset * pass UT * update filter long prompts * debug * clean code --------- Co-authored-by: Cheung Ka Wai <zhtmike@gmail.com>
* add entroypoint (#1) * add training engine (#2) * add training engine * fix init * fix typs * move folders & make for two-forward pass in training loop (#4) * Add diffusion reward loop (#3) * init reward; add ocr reward * update disrm input * add unit test * pass ut * fix typos/bugs * update copyright * [fix] update customized reward func in UT (#5) * init reward; add ocr reward * update disrm input * add unit test * pass ut * fix typos/bugs * update copyright * update customized reward_fn * Update 20260109 (#8) * Update 20260109 * update * fix CI * [data] feat: Add dataset for Qwen-Image (#6) * add entroypoint (#1) * add training engine (#2) * add training engine * fix init * fix typs * move folders & make for two-forward pass in training loop (#4) * Add diffusion reward loop (#3) * init reward; add ocr reward * update disrm input * add unit test * pass ut * fix typos/bugs * update copyright * [fix] update customized reward func in UT (#5) * init reward; add ocr reward * update disrm input * add unit test * pass ut * fix typos/bugs * update copyright * update customized reward_fn * init dataset for Qwen-Image * pass UT * update return, update UT * pass UT * align with rl_dataset * pass UT * update filter long prompts * debug * clean code --------- Co-authored-by: Cheung Ka Wai <zhtmike@gmail.com> * add new config; debug actor * debug; add reward config; add adv, policy loss * debug reward loop * init diffusers engine UT * debug * debug * deubg actor forward * debug * merge * add UT for adv and loss * pass adv&loss UTs; pass engine backward UT * clean debug code --------- Co-authored-by: Cheung Ka Wai <zhtmike@gmail.com>
* add entroypoint (#1) * add training engine (#2) * add training engine * fix init * fix typs * move folders & make for two-forward pass in training loop (#4) * Add diffusion reward loop (#3) * init reward; add ocr reward * update disrm input * add unit test * pass ut * fix typos/bugs * update copyright * [fix] update customized reward func in UT (#5) * init reward; add ocr reward * update disrm input * add unit test * pass ut * fix typos/bugs * update copyright * update customized reward_fn * Update 20260109 (#8) * Update 20260109 * update * fix CI * [data] feat: Add dataset for Qwen-Image (#6) * add entroypoint (#1) * add training engine (#2) * add training engine * fix init * fix typs * move folders & make for two-forward pass in training loop (#4) * Add diffusion reward loop (#3) * init reward; add ocr reward * update disrm input * add unit test * pass ut * fix typos/bugs * update copyright * [fix] update customized reward func in UT (#5) * init reward; add ocr reward * update disrm input * add unit test * pass ut * fix typos/bugs * update copyright * update customized reward_fn * init dataset for Qwen-Image * pass UT * update return, update UT * pass UT * align with rl_dataset * pass UT * update filter long prompts * debug * clean code --------- Co-authored-by: Cheung Ka Wai <zhtmike@gmail.com> * update to align verl data format * debug --------- Co-authored-by: Cheung Ka Wai <zhtmike@gmail.com>
What does this PR do?
Checklist Before Starting
[{modules}] {type}: {description}(This will be checked by the CI){modules}includefsdp,megatron,sglang,vllm,rollout,trainer,ci,training_utils,recipe,hardware,deployment,ray,worker,single_controller,misc,perf,model,algo,env,tool,ckpt,doc,data,cfg,reward,like[megatron, fsdp, doc]{type}is infeat,fix,refactor,chore,test[BREAKING]to the beginning of the title.[BREAKING][fsdp, megatron] feat: dynamic batchingTest
API and Usage Example
# Add code snippet or script demonstrating how to use thisDesign & Code Changes
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaysci-requestchannel in theverlSlack workspace. (If not accessible, please try the Feishu group (飞书群).)recipesubmodule, please also update the reference to the submodule commit viagit submodule update --remoteorcd recipe && git pull origin main.