diff --git a/mooncake-pg/include/connection_poller.h b/mooncake-pg/include/connection_poller.h index 481ad2f31d..77cc4086be 100644 --- a/mooncake-pg/include/connection_poller.h +++ b/mooncake-pg/include/connection_poller.h @@ -90,7 +90,7 @@ class ConnectionContext { public: ConnectionContext(int backendIndex, int rank, int size, bool isDummy, - uint64_t* local2global_rank_map, std::string location, + uint64_t* local2global_rank_map, c10::intrusive_ptr<::c10d::Store> store, std::shared_ptr meta, std::shared_ptr p2p_proxy, diff --git a/mooncake-pg/include/mooncake_worker.cuh b/mooncake-pg/include/mooncake_worker.cuh index 814069e1c4..b6897e9e52 100644 --- a/mooncake-pg/include/mooncake_worker.cuh +++ b/mooncake-pg/include/mooncake_worker.cuh @@ -60,6 +60,7 @@ void launchReduceKernel(at::Tensor dst, size_t pos, size_t realSize, void* src, void launchReduceCpu(at::Tensor dst, size_t pos, size_t realSize, void* src, size_t numRanks, c10d::ReduceOp op, bool* activeRanks); +void preloadReduceKernels(); class ConnectionContext; class MooncakeWorker { diff --git a/mooncake-pg/src/connection_poller.cpp b/mooncake-pg/src/connection_poller.cpp index 5c8c94c755..93c262d079 100644 --- a/mooncake-pg/src/connection_poller.cpp +++ b/mooncake-pg/src/connection_poller.cpp @@ -13,6 +13,7 @@ #include #include #include +#include "memory_location.h" #include "mooncake_worker.cuh" namespace mooncake { @@ -42,7 +43,6 @@ static bool supportFabricMem() { ConnectionContext::ConnectionContext(int backendIndex, int rank, int size, bool isDummy, uint64_t* local2global_rank_map, - std::string location, c10::intrusive_ptr<::c10d::Store> store, std::shared_ptr meta, std::shared_ptr p2p_proxy, @@ -67,15 +67,15 @@ ConnectionContext::ConnectionContext(int backendIndex, int rank, int size, return; } - warmup_send_region_ = new int32_t[kMaxNumRanks]; + warmup_send_region_ = new int32_t[kMaxNumRanks]{}; warmup_send_region_[0] = 1; int rc = engine_->registerLocalMemory( - warmup_send_region_, kMaxNumRanks * sizeof(int32_t), location); + warmup_send_region_, kMaxNumRanks * sizeof(int32_t), kWildcardLocation); TORCH_CHECK(!rc, "Failed to register local memory for context."); warmup_recv_region_ = new int32_t[kMaxNumRanks]{}; - rc = engine_->registerLocalMemory(warmup_recv_region_, - kMaxNumRanks * sizeof(int32_t), location); + rc = engine_->registerLocalMemory( + warmup_recv_region_, kMaxNumRanks * sizeof(int32_t), kWildcardLocation); TORCH_CHECK(!rc, "Failed to register local memory for context."); } diff --git a/mooncake-pg/src/mooncake_backend.cpp b/mooncake-pg/src/mooncake_backend.cpp index 0dcea813a4..20dbb3ab99 100644 --- a/mooncake-pg/src/mooncake_backend.cpp +++ b/mooncake-pg/src/mooncake_backend.cpp @@ -10,6 +10,7 @@ #include #include #include "connection_poller.h" +#include "memory_location.h" #include "mooncake_worker.cuh" namespace mooncake { @@ -177,16 +178,18 @@ MooncakeBackend::MooncakeBackend( TORCH_CHECK(static_cast(size) <= kMaxNumRanks, "The number of ranks exceeds the limit."); for (size_t i = 0; i < 2; i++) { - cpu_sync_send_region_[i] = new int32_t[kMaxNumRanks]; - int rc = engine_->registerLocalMemory( - cpu_sync_send_region_[i], kMaxNumRanks * sizeof(int32_t), location); + cpu_sync_send_region_[i] = new int32_t[kMaxNumRanks]{}; + int rc = engine_->registerLocalMemory(cpu_sync_send_region_[i], + kMaxNumRanks * sizeof(int32_t), + kWildcardLocation); TORCH_CHECK(!rc, REGISTER_BUFFER_ERROR_MSG); } for (size_t i = 0; i < 2; i++) { - cpu_sync_recv_region_[i] = new int32_t[kMaxNumRanks]; - int rc = engine_->registerLocalMemory( - cpu_sync_recv_region_[i], kMaxNumRanks * sizeof(int32_t), location); + cpu_sync_recv_region_[i] = new int32_t[kMaxNumRanks]{}; + int rc = engine_->registerLocalMemory(cpu_sync_recv_region_[i], + kMaxNumRanks * sizeof(int32_t), + kWildcardLocation); TORCH_CHECK(!rc, REGISTER_BUFFER_ERROR_MSG); } @@ -203,6 +206,9 @@ MooncakeBackend::MooncakeBackend( worker_ = worker_mgr.GetCPUWorker(); else worker_ = worker_mgr.GetCUDAWorker(cuda_device_index); + if (!isCpu_) { + preloadReduceKernels(); + } worker_->Start(); p2p_proxy_ = std::make_shared( @@ -218,7 +224,7 @@ MooncakeBackend::MooncakeBackend( meta_ = std::make_shared(); connection_ctx_ = std::make_shared( backendIndex_, rank, size, options_ && options_->isExtension_, - local2global_rank_map_, location, store, meta_, p2p_proxy_, engine_); + local2global_rank_map_, store, meta_, p2p_proxy_, engine_); rank_info.send_buffer[0] = (uint64_t)send_buffer_[0]; rank_info.send_buffer[1] = (uint64_t)send_buffer_[1]; diff --git a/mooncake-pg/src/mooncake_worker.cu b/mooncake-pg/src/mooncake_worker.cu index b8fc223098..a33a59e427 100644 --- a/mooncake-pg/src/mooncake_worker.cu +++ b/mooncake-pg/src/mooncake_worker.cu @@ -108,6 +108,19 @@ __global__ void reduceKernel(scalar_t* dst, const scalar_t* src, } } +namespace { + +template +void preload_reduce_kernel(const char* name) { + cudaFuncAttributes attr{}; + auto err = cudaFuncGetAttributes( + &attr, reinterpret_cast(reduceKernel)); + TORCH_CHECK(err == cudaSuccess, "Failed to preload kernel ", name, ": ", + cudaGetErrorString(err)); +} + +} // namespace + void launchReduceKernel(at::Tensor dst, size_t pos, size_t realSize, void* src, size_t numRanks, c10d::ReduceOp op, bool* activeRanks, cudaStream_t stream) { @@ -244,6 +257,18 @@ void launchReduceCpu(at::Tensor dst, size_t pos, size_t realSize, void* src, } } +void preloadReduceKernels() { + preload_reduce_kernel("reduceKernel"); + preload_reduce_kernel("reduceKernel"); + preload_reduce_kernel("reduceKernel"); + preload_reduce_kernel("reduceKernel"); + preload_reduce_kernel("reduceKernel"); + preload_reduce_kernel("reduceKernel"); + preload_reduce_kernel("reduceKernel"); + preload_reduce_kernel("reduceKernel"); + preload_reduce_kernel("reduceKernel"); +} + MooncakeWorker::MooncakeWorker(int cuda_device_index) : cuda_device_index_(cuda_device_index) { int deviceCount = 0; @@ -378,4 +403,4 @@ c10::intrusive_ptr MooncakeWorker::putTaskCuda( return c10::make_intrusive(opType, event, meta); } -} // namespace mooncake \ No newline at end of file +} // namespace mooncake diff --git a/mooncake-pg/src/mooncake_worker_thread.cpp b/mooncake-pg/src/mooncake_worker_thread.cpp index 002f710a1c..59fa27d9dd 100644 --- a/mooncake-pg/src/mooncake_worker_thread.cpp +++ b/mooncake-pg/src/mooncake_worker_thread.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include namespace mooncake { @@ -12,6 +13,8 @@ enum WorkerTaskStatus { DONE = 3, }; +static constexpr size_t kInvalidTaskId = static_cast(-1); + void MooncakeWorker::Start() { bool expected = false; if (started_.compare_exchange_strong(expected, true)) { @@ -51,6 +54,9 @@ void MooncakeWorker::startWorker() { std::memory_order_release); continue; } + for (size_t j = 0; j < kMaxNumRanks; ++j) { + rankToTaskId[i][j] = kInvalidTaskId; + } std::vector entries; for (int j = 0; j < group->size; ++j) { if (!group->activeRanks[j]) { @@ -131,6 +137,9 @@ void MooncakeWorker::startWorker() { if (!group->activeRanks[j]) { continue; } + if (rankToTaskId[i][j] == kInvalidTaskId) { + continue; + } group->engine->getTransferStatus( task.batchID, rankToTaskId[i][j], status); if (status.s != TransferStatusEnum::COMPLETED) { @@ -175,6 +184,9 @@ void MooncakeWorker::startWorker() { auto source_ptr = (int32_t*)group->segmentInfos[group->rank] .send_sync[task.bufferOffset]; + for (size_t j = 0; j < kMaxNumRanks; ++j) { + rankToTaskId[i][j] = kInvalidTaskId; + } std::vector entries; for (int j = 0; j < group->size; ++j) { if (!group->activeRanks[j]) { @@ -213,6 +225,9 @@ void MooncakeWorker::startWorker() { if (!group->activeRanks[j]) { continue; } + if (rankToTaskId[i][j] == kInvalidTaskId) { + continue; + } group->engine->getTransferStatus( task.batchID, rankToTaskId[i][j], status); if (signal_ptr[j] != 1 || diff --git a/mooncake-pg/src/p2p_proxy.cpp b/mooncake-pg/src/p2p_proxy.cpp index 41e9bbd1ac..99fb36d414 100644 --- a/mooncake-pg/src/p2p_proxy.cpp +++ b/mooncake-pg/src/p2p_proxy.cpp @@ -9,6 +9,7 @@ #include #include #include +#include "memory_location.h" namespace mooncake { @@ -161,15 +162,14 @@ void P2PProxy::AllocateResources() { } } } - int rc = engine_->registerLocalMemory(resources_.ctrl_send_region_, kMaxNumRanks * sizeof(P2PControlSlot), - location_); + kWildcardLocation); TORCH_CHECK(rc == 0, "Failed to register P2P ctrl send region"); rc = engine_->registerLocalMemory(resources_.ctrl_recv_region_, kMaxNumRanks * sizeof(P2PControlSlot), - location_); + kWildcardLocation); TORCH_CHECK(rc == 0, "Failed to register P2P ctrl recv region"); } diff --git a/mooncake-transfer-engine/tent/src/rpc/rpc.cpp b/mooncake-transfer-engine/tent/src/rpc/rpc.cpp index a8db818d5d..75f7f33512 100644 --- a/mooncake-transfer-engine/tent/src/rpc/rpc.cpp +++ b/mooncake-transfer-engine/tent/src/rpc/rpc.cpp @@ -69,6 +69,16 @@ Status CoroRpcAgent::start(uint16_t& port, bool ipv6) { server_ = new coro_rpc::coro_rpc_server(kRpcThreads, port); server_->register_handler<&CoroRpcAgent::process>(this); server_->async_start(); + const auto err = server_->get_errc(); + if (err) { + LOG(WARNING) + << "Failed to start RPC server(async_start) on port " + << port << ": " << err.message(); + delete server_; + server_ = nullptr; + port = 0; + continue; + } running_ = true; return Status::OK(); } catch (const std::exception& e) {