Skip to content

Commit 2f1a0f6

Browse files
Merge pull request #29618 from taosdata/fix/main/TS-5937
fix(rpc):use tsApplyMemoryAllowed to control memory alloc while apply msg.
2 parents c90b349 + 6f976d6 commit 2f1a0f6

8 files changed

Lines changed: 59 additions & 15 deletions

File tree

include/common/tglobal.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ extern "C" {
3434
#define GLOBAL_CONFIG_FILE_VERSION 1
3535
#define LOCAL_CONFIG_FILE_VERSION 1
3636

37+
#define RPC_MEMORY_USAGE_RATIO 0.1
38+
#define QUEUE_MEMORY_USAGE_RATIO 0.6
39+
3740
typedef enum {
3841
DND_CA_SM4 = 1,
3942
} EEncryptAlgor;
@@ -110,6 +113,7 @@ extern int32_t tsNumOfQnodeFetchThreads;
110113
extern int32_t tsNumOfSnodeStreamThreads;
111114
extern int32_t tsNumOfSnodeWriteThreads;
112115
extern int64_t tsQueueMemoryAllowed;
116+
extern int64_t tsApplyMemoryAllowed;
113117
extern int32_t tsRetentionSpeedLimitMB;
114118

115119
extern int32_t tsNumOfCompactThreads;

include/util/tqueue.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ typedef struct {
5555
typedef enum {
5656
DEF_QITEM = 0,
5757
RPC_QITEM = 1,
58+
APPLY_QITEM = 2,
5859
} EQItype;
5960

6061
typedef void (*FItem)(SQueueInfo *pInfo, void *pItem);

source/common/src/tglobal.c

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414
*/
1515

1616
#define _DEFAULT_SOURCE
17-
#include "tglobal.h"
1817
#include "cJSON.h"
1918
#include "defines.h"
2019
#include "os.h"
2120
#include "osString.h"
2221
#include "tconfig.h"
22+
#include "tglobal.h"
2323
#include "tgrant.h"
2424
#include "tjson.h"
2525
#include "tlog.h"
@@ -500,7 +500,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
500500
TAOS_RETURN(TSDB_CODE_SUCCESS);
501501
}
502502

503-
struct SConfig *taosGetCfg() { return tsCfg; }
503+
struct SConfig *taosGetCfg() {
504+
return tsCfg;
505+
}
504506

505507
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
506508
char *apolloUrl) {
@@ -818,8 +820,13 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
818820
tsNumOfSnodeWriteThreads = tsNumOfCores / 4;
819821
tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4);
820822

821-
tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
822-
tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
823+
tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * RPC_MEMORY_USAGE_RATIO * QUEUE_MEMORY_USAGE_RATIO;
824+
tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * QUEUE_MEMORY_USAGE_RATIO * 10LL,
825+
TSDB_MAX_MSG_SIZE * QUEUE_MEMORY_USAGE_RATIO * 10000LL);
826+
827+
tsApplyMemoryAllowed = tsTotalMemoryKB * 1024 * RPC_MEMORY_USAGE_RATIO * (1 - QUEUE_MEMORY_USAGE_RATIO);
828+
tsApplyMemoryAllowed = TRANGE(tsApplyMemoryAllowed, TSDB_MAX_MSG_SIZE * (1 - QUEUE_MEMORY_USAGE_RATIO) * 10LL,
829+
TSDB_MAX_MSG_SIZE * (1 - QUEUE_MEMORY_USAGE_RATIO) * 10000LL);
823830

824831
tsLogBufferMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
825832
tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
@@ -857,7 +864,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
857864

858865
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL));
859866
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeWriteThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL));
860-
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
867+
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * RPC_MEMORY_USAGE_RATIO * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
861868
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL));
862869
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL));
863870
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL));
@@ -1572,7 +1579,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
15721579
tsNumOfSnodeWriteThreads = pItem->i32;
15731580

15741581
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "rpcQueueMemoryAllowed");
1575-
tsQueueMemoryAllowed = pItem->i64;
1582+
tsQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64 * QUEUE_MEMORY_USAGE_RATIO;
1583+
tsApplyMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64 * (1 - QUEUE_MEMORY_USAGE_RATIO);
15761584

15771585
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "simdEnable");
15781586
tsSIMDEnable = (bool)pItem->bval;
@@ -2395,6 +2403,12 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
23952403
code = TSDB_CODE_SUCCESS;
23962404
goto _exit;
23972405
}
2406+
if (strcasecmp("rpcQueueMemoryAllowed", name) == 0) {
2407+
tsQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64 * QUEUE_MEMORY_USAGE_RATIO;
2408+
tsApplyMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64 * (1 - QUEUE_MEMORY_USAGE_RATIO);
2409+
code = TSDB_CODE_SUCCESS;
2410+
goto _exit;
2411+
}
23982412

23992413
if (strcasecmp(name, "numOfCompactThreads") == 0) {
24002414
#ifdef TD_ENTERPRISE
@@ -2500,7 +2514,6 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
25002514
{"experimental", &tsExperimental},
25012515

25022516
{"numOfRpcSessions", &tsNumOfRpcSessions},
2503-
{"rpcQueueMemoryAllowed", &tsQueueMemoryAllowed},
25042517
{"shellActivityTimer", &tsShellActivityTimer},
25052518
{"readTimeout", &tsReadTimeout},
25062519
{"safetyCheckLevel", &tsSafetyCheckLevel},

source/dnode/mgmt/mgmt_dnode/src/dmHandle.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
181181
req.numOfSupportVnodes = tsNumOfSupportVnodes;
182182
req.numOfDiskCfg = tsDiskCfgNum;
183183
req.memTotal = tsTotalMemoryKB * 1024;
184-
req.memAvail = req.memTotal - tsQueueMemoryAllowed - 16 * 1024 * 1024;
184+
req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
185185
tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
186186
tstrncpy(req.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
187187

source/dnode/mgmt/mgmt_vnode/src/vmWorker.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
323323
return TSDB_CODE_INVALID_MSG;
324324
}
325325

326-
EQItype itype = APPLY_QUEUE == qtype ? DEF_QITEM : RPC_QITEM;
326+
EQItype itype = APPLY_QUEUE == qtype ? APPLY_QITEM : RPC_QITEM;
327327
SRpcMsg *pMsg;
328328
code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
329329
if (code) {

source/dnode/mgmt/test/sut/src/sut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ void Testbase::InitLog(const char* path) {
3636
tstrncpy(tsLogDir, path, PATH_MAX);
3737

3838
taosGetSystemInfo();
39-
tsQueueMemoryAllowed = tsTotalMemoryKB * 0.1;
39+
tsQueueMemoryAllowed = tsTotalMemoryKB * 0.06;
40+
tsApplyMemoryAllowed = tsTotalMemoryKB * 0.04;
4041
if (taosInitLog("taosdlog", 1, false) != 0) {
4142
printf("failed to init log file\n");
4243
}

source/libs/sync/src/syncPipeline.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,11 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe
732732
pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), code, retry);
733733
if (retry) {
734734
taosMsleep(10);
735-
sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, tstrerror(code), pEntry->index);
735+
if (code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE) {
736+
sError("vgId:%d, failed to execute fsm since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index);
737+
} else {
738+
sDebug("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index);
739+
}
736740
}
737741
} while (retry);
738742

source/util/src/tqueue.c

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,16 @@
1414
*/
1515

1616
#define _DEFAULT_SOURCE
17-
#include "tqueue.h"
1817
#include "taoserror.h"
1918
#include "tlog.h"
19+
#include "tqueue.h"
2020
#include "tutil.h"
2121

2222
int64_t tsQueueMemoryAllowed = 0;
2323
int64_t tsQueueMemoryUsed = 0;
2424

25+
int64_t tsApplyMemoryAllowed = 0;
26+
int64_t tsApplyMemoryUsed = 0;
2527
struct STaosQueue {
2628
STaosQnode *head;
2729
STaosQnode *tail;
@@ -148,20 +150,34 @@ int64_t taosQueueMemorySize(STaosQueue *queue) {
148150
}
149151

150152
int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item) {
151-
int64_t alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize);
153+
int64_t alloced = -1;
154+
152155
if (alloced > tsQueueMemoryAllowed) {
156+
alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize);
153157
if (itype == RPC_QITEM) {
154158
uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
155159
tsQueueMemoryAllowed);
156160
(void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
157161
return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
158162
}
163+
} else if (itype == APPLY_QITEM) {
164+
alloced = atomic_add_fetch_64(&tsApplyMemoryUsed, size + dataSize);
165+
if (alloced > tsApplyMemoryAllowed) {
166+
uDebug("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
167+
tsApplyMemoryAllowed);
168+
(void)atomic_sub_fetch_64(&tsApplyMemoryUsed, size + dataSize);
169+
return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
170+
}
159171
}
160172

161173
*item = NULL;
162174
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
163175
if (pNode == NULL) {
164-
(void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
176+
if (itype == RPC_QITEM) {
177+
(void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
178+
} else if (itype == APPLY_QITEM) {
179+
(void)atomic_sub_fetch_64(&tsApplyMemoryUsed, size + dataSize);
180+
}
165181
return terrno;
166182
}
167183

@@ -178,7 +194,12 @@ void taosFreeQitem(void *pItem) {
178194
if (pItem == NULL) return;
179195

180196
STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
181-
int64_t alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize);
197+
int64_t alloced = -1;
198+
if (pNode->itype == RPC_QITEM) {
199+
alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize);
200+
} else if (pNode->itype == APPLY_QITEM) {
201+
alloced = atomic_sub_fetch_64(&tsApplyMemoryUsed, pNode->size + pNode->dataSize);
202+
}
182203
uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
183204

184205
taosMemoryFree(pNode);

0 commit comments

Comments
 (0)