Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class RdmaEndPoint {
enum Status {
INITIALIZING,
UNCONNECTED,
CONNECTING,
CONNECTED,
};

Expand All @@ -53,6 +54,7 @@ class RdmaEndPoint {
size_t max_wr = 256, size_t max_inline = 64);

private:
int reconstruct();
int deconstruct();

public:
Expand Down Expand Up @@ -98,7 +100,28 @@ class RdmaEndPoint {
int destroyQP();

private:
void disconnectUnlocked();
int disconnectUnlocked();

// Resets the connection.
//
// The main difference between this function and `disconnectUnlocked`
// is that it will reconstruct QPs when `CONFIG_ERDMA` is defined.
// Without `CONFIG_ERDMA`, it is essentially the same as
// `disconnectUnlocked` but with additional logging.
//
// This serves as a workaround for Aliyun eRDMA devices (i.e., once a QP is
// transitioned to the RTS state, it cannot be reset to RTS again directly).
// For more details:
// https://github.com/kvcache-ai/Mooncake/pull/1733#discussion_r2992088663
//
// In practice:
// - Call `resetConnection` if the QPs' state may have transitioned to RTS.
// - Call `disconnectUnlocked` otherwise.
//
// This is mainly used in `setupConnectionsByActive` or
// `setupConnectionsByPassive`. It is NOT invoked in the normal execution
// flow, so a `reason` argument is passed for internal logging purposes.
int resetConnection(const std::string &reason);

public:
const std::string toString() const;
Expand All @@ -125,16 +148,25 @@ class RdmaEndPoint {
std::string *reply_msg = nullptr);

private:
static constexpr uint64_t kWaitExistingHandshakeTimeoutNano =
10 * 1000000000ull; // 10 seconds
static constexpr uint32_t kWaitExistingHandshakeSpinCount = 500;
static constexpr uint32_t kWaitExistingHandshakeInitialSleepUs = 50;
static constexpr uint32_t kWaitExistingHandshakeMaxSleepUs = 2000;

RdmaContext &context_;
std::atomic<Status> status_;

RWSpinlock lock_;
std::vector<ibv_qp *> qp_list_;

std::string peer_nic_path_;
std::vector<uint32_t> peer_qp_num_list_;

volatile int *wr_depth_list_;
int max_wr_depth_;
size_t max_sge_per_wr_;
size_t max_inline_bytes_;

volatile bool active_;
volatile int *cq_outstanding_;
Expand Down
Loading
Loading