Skip to content

Conversation

@katec846
Copy link
Contributor

@katec846 katec846 commented Dec 9, 2025

What does this PR do ?

Adds Ray Compiled Graph support and NCCL-optimized data transfer for SFT

TODO:

  • Rebase the code
  • Verify the results after rebasing (correctness and performance)
  • Make sure it runs on dt_policy_worker
  • Make RCG, warmup, and nccl optimization for input_ids transfer control by runtime flag instead of envvars
  • Remove verification of NCCL optimization for input_ids transfer
  • Remove per-worker computation time tracking

This PR introduces significant performance optimizations and infrastructure improvements to the training pipeline:

  1. Ray Compiled Graph (RCG) - Sub-millisecond task orchestration overhead (<50μs vs ~1ms for standard Ray calls)
  2. NCCL-Optimized Data Transfer - Reduces Ray object store overhead by sending data to leader workers only and broadcasting via NCCL

Key Features

🚀 Ray Compiled Graph Support

Implements Ray's compiled DAG execution for distributed training with support for:

  • Pipeline Parallelism (PP), Tensor Parallelism (TP), Context Parallelism (CP), and Data Parallelism (DP)
  • Wrapper architecture to work around Ray's ActorMethod signature hiding
  • Automatic DAG compilation and teardown
  • Multi-DP shard support with independent DAG execution per shard

Enable with:
export NEMO_RL_USE_COMPILED_GRAPH=1

Reduces data transfer overhead by:

  • Sending full tensors only to leader workers (TP0, PP0, CP0 per DP shard)
  • Broadcasting tensors via NCCL to follower workers (faster than Ray object store)
  • Shape metadata transmission for follower workers (minimal overhead)

Enable with:
export NEMO_RL_OPTIMIZE_DATA_TRANSFER=1

Technical Changes

Architecture Changes

Wrapper Pattern for Ray Actors:

  • Replaced direct @ray.remote decoration on worker classes
  • New NeMoRayWorkerWrapper class (vLLM-style architecture)
  • Fixes Ray bug #26283 with compiled graphs and ActorMethod signature inspection
  • All worker method calls now use worker.execute_method.remote(method_name, *args, **kwargs)

Compiled Graph Execution:

  • CompiledGraphExecutor: Single DP shard DAG execution
  • MultiDPCompiledGraphExecutor: Multiple independent DAGs (one per DP shard)
  • CompiledGraphWorkerGroup: Drop-in replacement wrapper for RayWorkerGroup
  • Automatic graph compilation, caching, and teardown

Worker Implementations

Updated all policy workers with NCCL broadcast support:

  • MegatronPolicyWorker: Added tensor reconstruction and NCCL broadcast
  • DTensorPolicyWorker: Added NCCL broadcast via DeviceMesh groups
  • DTensorPolicyWorkerV2: Added NCCL broadcast support

Additional Improvements

  • Warmup Support: Compile graphs with max sequence length before training

    export NEMO_RL_WARMUP_COMPILED_GRAPH=1
    export NEMO_RL_WARMUP_SEQ_LEN=8192 # Optional override

    • Worker PID Logging: Maps nsys profiles to specific workers via PID logging
  • Nsys Profile Naming: Windows-compatible safe filenames (replaces : with _)

  • Ray Log Sync: Enabled by default with 30s frequency

  • Improved Shutdown: Graceful compiled DAG teardown with suppressed Ray logging

Issues

List issues that this PR closes (syntax):

Usage

  • You can potentially add a usage example below
# Add a code snippet demonstrating how to use this

Before your PR is "Ready for review"

Pre checks:

  • Make sure you read and followed Contributor guidelines
  • Did you write any new necessary tests?
  • Did you run the unit tests and functional tests locally? Visit our Testing Guide for how to run tests
  • Did you add or update any necessary documentation? Visit our Document Development Guide for how to write, build and test the docs.

Additional Information

  • ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant