Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr-test-npu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ jobs:
env:
DEEPEP_NORMAL_LONG_SEQ_ROUND: 5
DEEPEP_NORMAL_LONG_SEQ_PER_ROUND_TOKENS: 512
DEEPEP_NORMAL_COMBINE_ENABLE_LONG_SEQ: 1
HCCL_BUFFSIZE: 1000
run: |
python3 $GITHUB_WORKSPACE/tests/python/deepep/test_intranode.py --num-tokens=2122
Expand Down
5 changes: 3 additions & 2 deletions csrc/deepep/ops/op_host/cam_moe_combine_normal_tiling.cc
Original file line number Diff line number Diff line change
Expand Up @@ -533,16 +533,18 @@ static ge::graphStatus CamMoeCombineNormalA3TilingFuncImpl(gert::TilingContext *
uint64_t perRoundTokens = tilingData->camMoeCombineNormalInfo.perRoundTokens;
uint64_t realMaxBs = tilingData->camMoeCombineNormalInfo.realMaxBs;
uint64_t realBs = std::min(perRoundTokens, realMaxBs);
uint32_t maxRound = tilingData->camMoeCombineNormalInfo.maxRound;
// combine数据区 token首地址对齐512
uint64_t tokenNeedSizeCombine = ((h * MAX_OUT_DTYPE_SIZE + WIN_ADDR_ALIGN - 1UL) / WIN_ADDR_ALIGN) * WIN_ADDR_ALIGN;
tokenNeedSizeCombine = maxRound > 1 ? maxRound * 2 : maxRound;
uint64_t actualSize = (realBs * k * tokenNeedSizeCombine + COMBINE_STATE_WIN_OFFSET + NOTIFY_DISPATCH_WIN_OFFSET) *
DOUBLE_DATA_BUFFER;
OP_TILING_CHECK(
(actualSize > maxWindowSize),
OP_LOGE(nodeName,
"HCCL_BUFFSIZE is too SMALL, realBs = %lu, h = %lu, epWorldSize = %lu, localMoeExpertNum = %u,"
" tokenNeedSizeCombine = %lu, k = %lu, NEEDED_HCCL_BUFFSIZE("
"((realBs * k * tokenNeedSizeCombine)) + 4MB + 204MB) * 2) = %luMB, "
"((realBs * k * tokenNeedSizeCombine * 2)) + 4MB + 204MB) * 2) = %luMB, "
"HCCL_BUFFSIZE=%luMB.",
realBs, h, epWorldSize, localMoeExpertNum, tokenNeedSizeCombine, k, actualSize / MB_SIZE + 1UL,
maxWindowSize / MB_SIZE),
Expand Down Expand Up @@ -571,7 +573,6 @@ static ge::graphStatus CamMoeCombineNormalA3TilingFuncImpl(gert::TilingContext *
PrintTilingDataInfo(nodeName, *tilingData);

uint64_t tilingKey = INIT_TILINGKEY;
uint32_t maxRound = tilingData->camMoeCombineNormalInfo.maxRound;
if (maxRound > 1) {
tilingKey += 1;
}
Expand Down
4 changes: 3 additions & 1 deletion csrc/deepep/ops/op_host/cam_moe_dispatch_normal_tiling.cc
Original file line number Diff line number Diff line change
Expand Up @@ -563,12 +563,14 @@ static ge::graphStatus CamMoeDispatchNormalA3TilingFuncImpl(gert::TilingContext
uint64_t k = static_cast<uint64_t>(tilingData->camMoeDispatchNormalInfo.k);
uint64_t epWorldSize = static_cast<uint64_t>(tilingData->camMoeDispatchNormalInfo.epWorldSize);
uint64_t maxBs = static_cast<uint64_t>(tilingData->camMoeDispatchNormalInfo.globalBs) / epWorldSize;

uint32_t round = tilingData->camMoeDispatchNormalInfo.round;
// dispatch数据区 token首对齐512,有效token长度h_align_32b + scale(32b) + 三元组(3*4b)
uint64_t tokenActualLen =
((h * MAX_OUT_DTYPE_SIZE + UB_ALIGN - 1UL) / UB_ALIGN) * UB_ALIGN + SCALE_EXPAND_IDX_BUFFER;
uint64_t tokenNeedSizeDispatch = ((tokenActualLen + WIN_ADDR_ALIGN - 1UL) / WIN_ADDR_ALIGN) * WIN_ADDR_ALIGN;
uint64_t tokenNeedSizeCombine = ((h * MAX_OUT_DTYPE_SIZE + WIN_ADDR_ALIGN - 1UL) / WIN_ADDR_ALIGN) * WIN_ADDR_ALIGN;
tokenNeedSizeCombine =
round > 1 ? tokenNeedSizeCombine * 2 : tokenNeedSizeCombine; // round > 1 combine要使用double buffer
// 未考虑双流时大小
uint64_t actualSize = (maxBs * k * (tokenNeedSizeCombine + tokenNeedSizeDispatch) + COMBINE_STATE_WIN_OFFSET +
NOTIFY_DISPATCH_WIN_OFFSET) *
Expand Down
2 changes: 2 additions & 0 deletions csrc/deepep/ops/op_host/notify_dispatch_tiling.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ static void PrintTilingDataInfo(const char *nodeName, NotifyDispatchTilingData &
OP_LOGD(nodeName, "perRoundTokens is %u.", tilingData.notifyDispatchInfo.perRoundTokens);
OP_LOGD(nodeName, "aivNum is %u.", tilingData.notifyDispatchInfo.aivNum);
OP_LOGD(nodeName, "totalUbSize is %lu.", tilingData.notifyDispatchInfo.totalUbSize);
OP_LOGD(nodeName, "totalWinSize is %lu.", tilingData.notifyDispatchInfo.totalWinSize);
}

static ge::graphStatus GetAttrAndSetTilingData(gert::TilingContext *context, const char *nodeName,
Expand Down Expand Up @@ -281,6 +282,7 @@ static bool CheckTensorDataType(gert::TilingContext *context, const char *nodeNa
OP_LOGE(nodeName, "HCCL_BUFFSIZE is too SMALL, should larger than %luMB.", actualSize / MB_SIZE);
return false;
}
tilingData->notifyDispatchInfo.totalWinSize = maxWindowSize;
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion csrc/deepep/ops/op_host/tiling_args.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

namespace Moe {
constexpr uint64_t COMBINE_STATE_WIN_OFFSET = 4U * 1024UL * 1024UL;
constexpr uint64_t NOTIFY_DISPATCH_WIN_OFFSET = 204U * 1024UL * 1024UL;
constexpr uint64_t NOTIFY_DISPATCH_WIN_OFFSET = 102U * 1024UL * 1024UL;
} // namespace Moe
#endif // TILING_ARGS_H
39 changes: 19 additions & 20 deletions csrc/deepep/ops/op_kernel/cam_moe_combine_normal_multi_round.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ namespace CamMoeCombineNormalMultiRoundImpl {
constexpr uint32_t RANK_ID_OFFSET_IN_SRC_INFO = 0U;
constexpr uint32_t TOKEN_IDX_OFFSET_IN_SRC_INFO = 1U;
constexpr uint32_t TOPK_IDX_OFFSET_IN_SRC_INFO = 2U;
constexpr uint64_t COMBINE_STATE_WIN_OFFSET = 4UL * 1024UL * 1024UL;
constexpr uint64_t STATE_WIN_SIZE = 4UL * 1024UL * 1024UL;
constexpr uint64_t STATE_WIN_SIZE_HALF = STATE_WIN_SIZE / 2;
constexpr uint64_t MAGIC_WIN_OFFSET = 975UL * 1024UL;
constexpr uint64_t ROUND_STATE_OFFSET = Moe::BASE_ROUND_STATE_OFFSET + Moe::ROUND_STATE_MAX_SIZE * 2UL; // 458*1024
constexpr uint32_t TOKEN_SRC_INFO_LEN = 3U;
Expand All @@ -20,6 +21,7 @@ constexpr uint32_t MUL_256_ALIGN = 256U;
constexpr uint64_t WIN_512_ALIGN = 512UL;
constexpr uint32_t FLOAT_NUM_PER_ALIGN = 8U;
constexpr uint8_t DOUBLE_BUFFER = 2;
constexpr uint32_t WAIT_ROUND_INDEX = 2U;
constexpr int64_t CYCLE_TO_TIME = 50; // cycle num is converted into a fixed base unit of time, set at 50
constexpr uint32_t STATE_OFFSET = 32U;
constexpr uint32_t BATCH_SRC_INFO_CNT = 128U;
Expand Down Expand Up @@ -77,7 +79,7 @@ class CamMoeCombineNormalMultiRound

__aicore__ GM_ADDR GetBufferAddrByRankId(const int32_t rankId)
{
return GetStateAddrByRankId(rankId) + COMBINE_STATE_WIN_OFFSET;
return GetStateAddrByRankId(rankId) + STATE_WIN_SIZE + roundMagic_ * combineDataBuffSize_;
}

__aicore__ inline GM_ADDR GetRoundStateAddrByRankId(const int32_t rankId)
Expand Down Expand Up @@ -145,6 +147,7 @@ class CamMoeCombineNormalMultiRound
uint32_t xOutTokenOffset_{
0}; // 这一轮接收的token需要存放在xOut的偏移,即前面几轮接收的token数,每一轮每个核从topkWeightsGM_拷贝权重也需要
uint32_t stateOffset_{0};
uint32_t combineDataBuffSize_{0};

bool isEnableDiagnose_{false};

Expand Down Expand Up @@ -181,7 +184,6 @@ class CamMoeCombineNormalMultiRound
GlobalTensor<XType> xOutGlobal_;
GlobalTensor<int32_t> sendCostStatsGT_;
GlobalTensor<float> dstRoundStatusGT_;
GM_ADDR localRankGM_;
GM_ADDR workspaceGM_;
};

Expand Down Expand Up @@ -293,7 +295,6 @@ __aicore__ inline void CamMoeCombineNormalMultiRound<TemplateMC2TypeFunc>::InitR

// 创建localCopyQueue_, 用于存放从GM拷贝到UB的token
tpipe_->InitBuffer(localCopyQueue_, DOUBLE_BUFFER, h32AlignRecvXLen_); // 28KB
PipeBarrier<PIPE_ALL>();
}

template <TemplateMC2TypeClass>
Expand Down Expand Up @@ -333,13 +334,11 @@ __aicore__ inline void CamMoeCombineNormalMultiRound<TemplateMC2TypeFunc>::Init(
InitTilingData(tilingData);
InitGlobalBuffer(recvX, tokenSrcInfo, epRecvCount, topkWeights, XOut, sendCostStatsOut);
InitBuffLen();

combineDataBuffSize_ = perRoundTokens_ * axisK_ * h512AlignRecvXLen_;
PipeBarrier<PIPE_ALL>();
winDataSizeOffset_ = static_cast<uint64_t>(magic_) * (tilingData->camMoeCombineNormalInfo.totalWinSize / 2UL);
localRankGM_ = GetBufferAddrByRankId(epRankId_);
DataCacheCleanAndInvalid<SrcInfoType, CacheLine::SINGLE_CACHE_LINE, DcciDst::CACHELINE_OUT>(
epRecvCountGM_[moeExpertNum_ - 1]);
PipeBarrier<PIPE_ALL>();

InitRoundSendData();
InitRoundRecvData();
Expand Down Expand Up @@ -449,18 +448,20 @@ __aicore__ inline void CamMoeCombineNormalMultiRound<TemplateMC2TypeFunc>::SetSt
uint32_t srcTokenId,
uint32_t srcTopkId)
{
GM_ADDR stateGM = GetStateAddrByRankId(srcRankId) + (srcTokenId * axisK_ + srcTopkId) * UB_32_ALIGN;
uint32_t stateOffset = roundMagic_ * STATE_WIN_SIZE_HALF;
GM_ADDR stateGM = GetStateAddrByRankId(srcRankId) + stateOffset + (srcTokenId * axisK_ + srcTopkId) * UB_32_ALIGN;
GlobalTensor<uint32_t> stateGMTensor;
stateGMTensor.SetGlobalBuffer((__gm__ uint32_t *)stateGM);
DataCopy<uint32_t>(stateGMTensor, setStateLT_, FLOAT_NUM_PER_ALIGN);
PipeBarrier<PIPE_ALL>();
}

template <TemplateMC2TypeClass>
__aicore__ inline void CamMoeCombineNormalMultiRound<TemplateMC2TypeFunc>::WaitBuffCopy(uint32_t recvXTokenIdx)
{
uint32_t calCount = axisK_ * FLOAT_NUM_PER_ALIGN;
GM_ADDR stateGM = GetStateAddrByRankId(epRankId_) + recvXTokenIdx * axisK_ * UB_32_ALIGN; // 计算地址偏移
uint32_t stateOffset = roundMagic_ * STATE_WIN_SIZE_HALF;
GM_ADDR stateGM =
GetStateAddrByRankId(epRankId_) + stateOffset + recvXTokenIdx * axisK_ * UB_32_ALIGN; // 计算地址偏移
GlobalTensor<float> stateGMTensor;
stateGMTensor.SetGlobalBuffer((__gm__ float *)stateGM);
float current = (float)0.0;
Expand Down Expand Up @@ -495,7 +496,8 @@ __aicore__ inline void CamMoeCombineNormalMultiRound<TemplateMC2TypeFunc>::ReadB

for (uint32_t topkId = 0U; topkId < axisK_; topkId++) {
float scale = topkWeightsLT_.GetValue(topkWeightTokenIdx * axisK_ + topkId);
GM_ADDR localTokenAddr = localRankGM_ + (recvXTokenIdx * axisK_ + topkId) * h512AlignRecvXLen_;
GM_ADDR localTokenAddr =
GetBufferAddrByRankId(epRankId_) + (recvXTokenIdx * axisK_ + topkId) * h512AlignRecvXLen_;
GlobalTensor<XType> localTokenTensor;
localTokenTensor.SetGlobalBuffer((__gm__ XType *)localTokenAddr);

Expand All @@ -516,7 +518,6 @@ __aicore__ inline void CamMoeCombineNormalMultiRound<TemplateMC2TypeFunc>::ReadB
Cast(xOutLocal, sumFloatBufLocal, AscendC::RoundMode::CAST_RINT, axisH_);
SyncFunc<AscendC::HardEvent::V_MTE3>();
DataCopyPad(xOutGlobal_[xOutTokenIdx * axisH_], xOutLocal, xOutCopyParams);
PipeBarrier<PIPE_ALL>();
}

template <TemplateMC2TypeClass>
Expand All @@ -535,7 +536,7 @@ __aicore__ inline void CamMoeCombineNormalMultiRound<TemplateMC2TypeFunc>::ReadB
const DataCopyPadExtParams<float> copyPadFloatParams{false, 0U, 0U, 0U};
DataCopyPad(topkWeightsLT_, topkWeightsGM_[(xOutTokenOffset_ + roundRecvStartTokenIdx_) * axisK_], bskParams,
copyPadFloatParams);
PipeBarrier<PIPE_ALL>();
SyncFunc<AscendC::HardEvent::MTE2_S>();

for (uint32_t roundTokenIdx = roundRecvStartTokenIdx_; roundTokenIdx < roundRecvEndTokenIdx_;
roundTokenIdx++) { // 每轮都从从hccl buffer起始位置读put来的数据
Expand Down Expand Up @@ -597,7 +598,6 @@ __aicore__ inline void CamMoeCombineNormalMultiRound<TemplateMC2TypeFunc>::WaitR
Duplicate<float>(tempRoundStateTensorLocal, (float)0.0, count);
SyncFunc<AscendC::HardEvent::V_MTE3>();
DataCopy<float>(roundStatusGMTensor, tempRoundStateTensorLocal, count);
PipeBarrier<PIPE_ALL>();
}

template <TemplateMC2TypeClass>
Expand All @@ -606,16 +606,15 @@ __aicore__ inline void CamMoeCombineNormalMultiRound<TemplateMC2TypeFunc>::Proce
if ASCEND_IS_AIV { // 全aiv处理
uint32_t realRound = (realMaxBs_ + perRoundTokens_ - 1) / perRoundTokens_;
while (roundIndex_ < realRound) {
CopyBufferToShareAndSetStatus();
ReadBufferFromRemote();
if (realRound > 1) {
SyncAll<true>();
SetRoundStatus();
if (roundIndex_ >= WAIT_ROUND_INDEX) {
WaitRoundStatus();
roundMagic_ = roundMagic_ == 0 ? 1 : 0;
SyncAll<true>();
}
CopyBufferToShareAndSetStatus();
ReadBufferFromRemote();
SetRoundStatus();
roundIndex_ += 1;
roundMagic_ = roundMagic_ == 0 ? 1 : 0;
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion csrc/deepep/ops/op_kernel/cam_moe_dispatch_normal.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,9 @@ __aicore__ inline void CamMoeDispatchNormal<CamTypeFunc>::Init(
PipeBarrier<PIPE_ALL>();

uint64_t hSizeAlignCombine = Ceil(h * sizeof(XType), WIN_ADDR_ALIGN) * WIN_ADDR_ALIGN;
hSizeAlignCombine = round > 1 ? hSizeAlignCombine * 2 : hSizeAlignCombine;
winDataSizeOffset = dataState * (tilingData->camMoeDispatchNormalInfo.totalWinSize / 2) +
min(realMaxBatchSize, perRoundTokens) * topK * hSizeAlignCombine;
min(realMaxBatchSize, perRoundTokens) * topK * hSizeAlignCombine; // *2 是因为double buffer
shareGM = GetWindAddrByRankId(COMM_EP_IDX, epRankId);

hCommuCopyOutParams = {1U, static_cast<uint32_t>(hScaleIdxSize), 0U, 0U, 0U};
Expand Down
2 changes: 1 addition & 1 deletion csrc/deepep/ops/op_kernel/comm_args.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace Moe {
constexpr int CAM_MAX_RANK_SIZE = 384; // Maximum number of NPU cards supported by the communication library

constexpr uint64_t NOTIFY_DISPATCH_BUFF_OFFSET = 204UL * 1024UL * 1024UL;
constexpr uint64_t NOTIFY_DISPATCH_BUFF_OFFSET = 102UL * 1024UL * 1024UL;
constexpr int64_t IPC_BUFF_MAX_SIZE = 100 * 1024 * 1024;
constexpr int64_t IPC_DATA_OFFSET = 2 * 1024 * 1024; // First 2MB as flag, then 100MB as data storage
constexpr int64_t PING_PONG_SIZE = 2;
Expand Down
1 change: 1 addition & 0 deletions csrc/deepep/ops/op_kernel/notify_dispatch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ extern "C" __global__ __aicore__ void notify_dispatch(GM_ADDR sendData, GM_ADDR
int numTokens = tilingData.notifyDispatchInfo.numTokens;
int round = tilingData.notifyDispatchInfo.round;
int perRoundTokens = tilingData.notifyDispatchInfo.perRoundTokens;
uint64_t totalWinSize = tilingData.notifyDispatchInfo.totalWinSize;

GM_ADDR sendDataInput = sendData;
GM_ADDR tokenPerExpertDataInput = tokenPerExpertData;
Expand Down
20 changes: 9 additions & 11 deletions csrc/deepep/ops/op_kernel/notify_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ __aicore__ inline void SyncFunc()
GM_ADDR recvCount, GM_ADDR recvOffset, GM_ADDR expertGlobalOffset, GM_ADDR srcrankInExpertOffset, \
GM_ADDR rInSrcrankOffset, GM_ADDR totalRecvTokens, GM_ADDR maxBs, GM_ADDR recvTokensPerExpert, int64_t len, \
int32_t round, int32_t perRoundTokens, int32_t numTokens, int op, int root, int cycleCount, GM_ADDR scale, \
int32_t scaleCount, GM_ADDR offset, int localRank, int localRankSize
int32_t scaleCount, GM_ADDR offset, int localRank, int localRankSize, uint64_t totalWinSize

#define KERNELS_ARGS_CALL_ALL2ALL() \
sendDataInput, tokenPerExpertDataInput, sendDataOffsetOutput, recvDataOutput, recvCount, recvOffset, \
expertGlobalOffset, srcrankInExpertOffset, rInSrcrankOffset, totalRecvTokens, maxBs, recvTokensPerExpert, len, \
round, perRoundTokens, numTokens, op, root, cycleCount, scale, scaleCount, offset, localRank, localRankSize
round, perRoundTokens, numTokens, op, root, cycleCount, scale, scaleCount, offset, localRank, localRankSize, \
totalWinSize

template <typename T>
class NotifyDispatch
Expand Down Expand Up @@ -804,17 +805,16 @@ __aicore__ inline void NotifyDispatch<T>::InitSmallFullMesh(KERNELS_ARGS_FUN_ALL
winContext_[COMM_EP_IDX] = (__gm__ HcclOpResParam *)AscendC::GetHcclContext<HCCL_GROUP_ID_0>();
this->magic = GetMagicValue();
ctxIdx = COMM_EP_IDX;
uint64_t winDataOffset = (this->magic % PING_PONG_SIZE) * (totalWinSize / 2);

shareAddrs[rank] =
GetWindAddrByRankId(rank, ctxIdx) + (this->magic % PING_PONG_SIZE) * (IPC_BUFF_MAX_SIZE + IPC_DATA_OFFSET);
shareAddrs[rank] = GetWindAddrByRankId(rank, ctxIdx) + winDataOffset;

int32_t rankNumPerCore = (rankSize + blockNum - 1) / blockNum;
int32_t copyOffset = blockIdx * rankNumPerCore;
int32_t copyLen = rankSize - copyOffset < rankNumPerCore ? rankSize - copyOffset : rankNumPerCore;
if (copyLen > 0) {
for (int i = copyOffset; i < copyOffset + copyLen; ++i) {
shareAddrs[i] =
GetWindAddrByRankId(i, ctxIdx) + (this->magic % PING_PONG_SIZE) * (IPC_BUFF_MAX_SIZE + IPC_DATA_OFFSET);
shareAddrs[i] = GetWindAddrByRankId(i, ctxIdx) + winDataOffset;
}
}

Expand All @@ -825,11 +825,9 @@ __aicore__ inline void NotifyDispatch<T>::InitSmallFullMesh(KERNELS_ARGS_FUN_ALL
int maxCore = coreNumPerRank * rankSize; // Calculate the maximum number of cores that can be used for reading,
// cores exceeding this number will not take action
if (blockIdx < maxCore) {
int readRank =
blockIdx /
coreNumPerRank; // Calculate the rank to be read based on the block, 48 cores divided into 4 groups
shareAddrs[readRank] = GetWindAddrByRankId(readRank, ctxIdx) +
(this->magic % PING_PONG_SIZE) * (IPC_BUFF_MAX_SIZE + IPC_DATA_OFFSET);
// Calculate the rank to be read based on the block, 48 cores divided into 4 groups
int readRank = blockIdx / coreNumPerRank;
shareAddrs[readRank] = GetWindAddrByRankId(readRank, ctxIdx) + winDataOffset;
}

pipe.InitBuffer(tBuf, UB_FLAG_SIZE);
Expand Down
1 change: 1 addition & 0 deletions csrc/deepep/ops/op_kernel/notify_dispatch_tiling.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ struct NotifyDispatchInfo {
uint32_t perRoundTokens;
uint32_t aivNum;
uint64_t totalUbSize;
uint64_t totalWinSize;
};

struct NotifyDispatchTilingData {
Expand Down