Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/backend/server_arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ Please consult the documentation below to learn more about the parameters you ma
### Expert parallelism
* `enable_ep_moe`: Enables expert parallelism that distributes the experts onto multiple GPUs for MoE models.
* `ep_size`: The size of EP. Please shard the model weights with `tp_size=ep_size`, for detailed benchmarking refer to [this PR](https://github.com/sgl-project/sglang/pull/2203). If not set, `ep_size` will be automatically set to `tp_size`.
* `enable_deepep_moe`: Enables expert parallelism that distributes the experts onto multiple GPUs for DeepSeek-V3 model based on deepseek-ai/DeepEP. Currently DeepEP is bind to DP Attention. Please set `--enable-dp-attention --enable-deepep-moe`, perfer `tp_size=dp_size=ep_size`.
* `enable_deepep_moe`: Enables expert parallelism that distributes the experts onto multiple GPUs for DeepSeek-V3 model based on deepseek-ai/DeepEP.

## Memory and scheduling

Expand Down Expand Up @@ -184,7 +184,7 @@ Please consult the documentation below to learn more about the parameters you ma
*Note: Some of these options are still in experimental stage.*

* `enable_mixed_chunk`: Enables mixing prefill and decode, see [this discussion](https://github.com/sgl-project/sglang/discussions/1163).
* `enable_dp_attention`: Enable [Data Parallelism Attention](https://lmsys.org/blog/2024-12-04-sglang-v0-4/#data-parallelism-attention-for-deepseek-models) for Deepseek models. Note that you need to choose `dp_size = tp_size` for this.
* `enable_dp_attention`: Enable [Data Parallelism Attention](https://lmsys.org/blog/2024-12-04-sglang-v0-4/#data-parallelism-attention-for-deepseek-models) for Deepseek models.
* `enable_torch_compile`: Torch compile the model. Note that compiling a model takes a long time but have a great performance boost. The compiled model can also be [cached for future use](https://docs.sglang.ai/backend/hyperparameter_tuning.html#enabling-cache-for-torch-compile).
* `torch_compile_max_bs`: The maximum batch size when using `torch_compile`.
* `cuda_graph_max_bs`: Adjust the maximum batchsize when using cuda graph. By default this is chosen for you based on GPU specifics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
from contextlib import contextmanager
from functools import wraps
from typing import Callable, List, Optional, TypeVar, Union
from typing import Any, Callable, List, Optional, TypeVar, Union

import torch
import torch.distributed as dist
Expand Down
23 changes: 22 additions & 1 deletion python/sglang/srt/distributed/parallel_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,15 @@ def _all_reduce_in_place(self, input_: torch.Tensor) -> None:
else:
torch.distributed.all_reduce(input_, group=self.device_group)

def reduce_scatter(
self,
output: torch.Tensor,
input_list: List[torch.Tensor],
) -> None:
# TODO(ch-wan): support other backends
torch.distributed.reduce_scatter(output, input_list, group=self.device_group)
return output

def _all_gather_into_tensor(self, output: torch.Tensor, input: torch.Tensor):
pynccl_comm = self.pynccl_comm
if pynccl_comm is not None and not pynccl_comm.disabled:
Expand All @@ -456,11 +465,23 @@ def all_gather_into_tensor(self, output: torch.Tensor, input: torch.Tensor):
output, input, group_name=self.unique_name
)

def all_gather(self, input_: torch.Tensor, dim: int = -1) -> torch.Tensor:
def all_gather(
self,
input_: torch.Tensor,
dim: int = -1,
tensor_list: List[torch.Tensor] = None,
) -> torch.Tensor:
world_size = self.world_size
# Bypass the function if we are using only 1 GPU.
if world_size == 1:
return input_

if tensor_list is not None:
# TODO(ch-wan): support other backends
return torch.distributed.all_gather(
tensor_list, input_, group=self.device_group
)

assert (
-input_.dim() <= dim < input_.dim()
), f"Invalid dim ({dim}) for input tensor with shape {input_.size()}"
Expand Down
13 changes: 12 additions & 1 deletion python/sglang/srt/layers/dp_attention.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import functools
import logging
from contextlib import contextmanager
from typing import TYPE_CHECKING, Union
from typing import TYPE_CHECKING, List

import torch
import triton
Expand Down Expand Up @@ -249,3 +249,14 @@ def dp_scatter(
memcpy_triton(
local_tokens, global_tokens, 0, local_start_pos, local_num_tokens, True
)


def tp_reduce_scatter(
output: torch.Tensor,
input_list: List[torch.Tensor],
):
return get_attention_tp_group().reduce_scatter(output, input_list)


def tp_all_gather(output_list: List[torch.Tensor], input_: torch.Tensor):
return get_attention_tp_group().all_gather(input_, tensor_list=output_list)
2 changes: 1 addition & 1 deletion python/sglang/srt/managers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,7 @@ def get_next_batch_to_run(self) -> Optional[ScheduleBatch]:
ret = None

# Handle DP attention
if self.server_args.enable_dp_attention:
if self.server_args.enable_dp_attention or self.server_args.enable_sp_layernorm:
ret, _ = self.prepare_dp_attn_batch(ret)

return ret
Expand Down
13 changes: 7 additions & 6 deletions python/sglang/srt/model_executor/cuda_graph_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def __init__(self, model_runner: ModelRunner):
self.disable_padding = model_runner.server_args.disable_cuda_graph_padding
self.is_encoder_decoder = model_runner.model_config.is_encoder_decoder
self.enable_dp_attention = model_runner.server_args.enable_dp_attention
self.enable_sp_layernorm = model_runner.server_args.enable_sp_layernorm
self.speculative_algorithm = model_runner.server_args.speculative_algorithm
self.tp_size = model_runner.server_args.tp_size
self.dp_size = model_runner.server_args.dp_size
Expand Down Expand Up @@ -245,8 +246,8 @@ def __init__(self, model_runner: ModelRunner):
)
else:
self.encoder_lens = None

if self.enable_dp_attention:
if self.enable_dp_attention or self.enable_sp_layernorm:
# TODO(ch-wan): SP layernorm should use a different logic to manage gathered_buffer
self.gathered_buffer = torch.zeros(
(
self.max_bs * self.dp_size * self.num_tokens_per_bs,
Expand Down Expand Up @@ -288,7 +289,7 @@ def model_capture_mode(self):
self.model_runner.token_to_kv_pool.capture_mode = False

def can_run(self, forward_batch: ForwardBatch):
if self.enable_dp_attention:
if self.enable_dp_attention or self.enable_sp_layernorm:
total_global_tokens = sum(forward_batch.global_num_tokens_cpu)

is_bs_supported = forward_batch.can_run_dp_cuda_graph and (
Expand Down Expand Up @@ -369,7 +370,7 @@ def capture_one_batch_size(self, bs: int, forward: Callable):
encoder_lens = None
mrope_positions = self.mrope_positions[:, :bs]

if self.enable_dp_attention:
if self.enable_dp_attention or self.enable_sp_layernorm:
self.global_num_tokens_gpu.copy_(
torch.tensor(
[
Expand Down Expand Up @@ -471,7 +472,7 @@ def replay_prepare(self, forward_batch: ForwardBatch):
raw_num_token = raw_bs * self.num_tokens_per_bs

# Pad
if self.enable_dp_attention:
if self.enable_dp_attention or self.enable_sp_layernorm:
index = bisect.bisect_left(
self.capture_bs, sum(forward_batch.global_num_tokens_cpu)
)
Expand All @@ -497,7 +498,7 @@ def replay_prepare(self, forward_batch: ForwardBatch):
self.encoder_lens[:raw_bs].copy_(forward_batch.encoder_lens)
if forward_batch.mrope_positions is not None:
self.mrope_positions[:, :raw_bs].copy_(forward_batch.mrope_positions)
if self.enable_dp_attention:
if self.enable_dp_attention or self.enable_sp_layernorm:
self.global_num_tokens_gpu.copy_(forward_batch.global_num_tokens_gpu)

if hasattr(forward_batch.spec_info, "hidden_states"):
Expand Down
3 changes: 0 additions & 3 deletions python/sglang/srt/model_executor/model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,6 @@ def model_specific_adjustment(self):

if server_args.enable_deepep_moe:
logger.info("DeepEP is turned on.")
assert (
server_args.enable_dp_attention == True
), "Currently DeepEP is bind to Attention DP. Set '--enable-dp-attention --enable-deepep-moe'"

def init_torch_distributed(self):
logger.info("Init torch distributed begin.")
Expand Down
Loading
Loading