Skip to content

Commit 76ab0cd

Browse files
Merge pull request #28872 from taosdata/fix/TS-5669
fix double compress when retry
2 parents bf7c642 + 37d79af commit 76ab0cd

5 files changed

Lines changed: 472 additions & 5 deletions

File tree

source/libs/transport/inc/transComm.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,7 @@ void transPrintEpSet(SEpSet* pEpSet);
452452
void transFreeMsg(void* msg);
453453
int32_t transCompressMsg(char* msg, int32_t len);
454454
int32_t transDecompressMsg(char** msg, int32_t* len);
455+
int32_t transDecompressMsgExt(char const* msg, int32_t len, char** out, int32_t* outLen);
455456

456457
int32_t transOpenRefMgt(int size, void (*func)(void*));
457458
void transCloseRefMgt(int32_t refMgt);

source/libs/transport/src/transCli.c

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1334,28 +1334,49 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
13341334
}
13351335
}
13361336
bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msgLen) {
1337+
int32_t code = 0;
13371338
SCliThrd* pThrd = pConn->hostThrd;
13381339
STrans* pInst = pThrd->pInst;
13391340
if (pConn->userInited == 1) {
13401341
return false;
13411342
}
13421343
STransMsgHead* pHead = *ppHead;
1343-
STransMsgHead* tHead = taosMemoryCalloc(1, *msgLen + sizeof(pInst->user));
1344+
int32_t len = *msgLen;
1345+
char* oriMsg = NULL;
1346+
int32_t oriLen = 0;
1347+
1348+
if (pHead->comp == 1) {
1349+
int32_t msgLen = htonl(pHead->msgLen);
1350+
code = transDecompressMsgExt((char*)(pHead), msgLen, &oriMsg, &oriLen);
1351+
if (code < 0) {
1352+
tError("failed to decompress since %s", tstrerror(code));
1353+
return false;
1354+
} else {
1355+
tDebug("decompress msg and resent, compress size %d, raw size %d", msgLen, oriLen);
1356+
}
1357+
1358+
pHead = (STransMsgHead*)oriMsg;
1359+
len = oriLen;
1360+
}
1361+
STransMsgHead* tHead = taosMemoryCalloc(1, len + sizeof(pInst->user));
13441362
if (tHead == NULL) {
13451363
return false;
13461364
}
13471365
memcpy((char*)tHead, (char*)pHead, TRANS_MSG_OVERHEAD);
13481366
memcpy((char*)tHead + TRANS_MSG_OVERHEAD, pInst->user, sizeof(pInst->user));
13491367

13501368
memcpy((char*)tHead + TRANS_MSG_OVERHEAD + sizeof(pInst->user), (char*)pHead + TRANS_MSG_OVERHEAD,
1351-
*msgLen - TRANS_MSG_OVERHEAD);
1369+
len - TRANS_MSG_OVERHEAD);
13521370

13531371
tHead->withUserInfo = 1;
13541372
*ppHead = tHead;
1355-
*msgLen += sizeof(pInst->user);
1373+
*msgLen = len + sizeof(pInst->user);
13561374

13571375
pConn->pInitUserReq = tHead;
13581376
pConn->userInited = 1;
1377+
if (oriMsg != NULL) {
1378+
taosMemoryFree(oriMsg);
1379+
}
13591380
return true;
13601381
}
13611382
int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
@@ -1421,9 +1442,8 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
14211442
pReq->contLen = 0;
14221443
}
14231444

1424-
int32_t msgLen = transMsgLenFromCont(pReq->contLen);
1425-
14261445
STransMsgHead* pHead = transHeadFromCont(pReq->pCont);
1446+
int32_t msgLen = transMsgLenFromCont(pReq->contLen);
14271447

14281448
char* content = pReq->pCont;
14291449
int32_t contLen = pReq->contLen;

source/libs/transport/src/transComm.c

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,48 @@ int32_t transDecompressMsg(char** msg, int32_t* len) {
7777
STransMsgHead* pNewHead = (STransMsgHead*)buf;
7878
int32_t decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), (char*)pNewHead->content,
7979
tlen - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
80+
81+
if (decompLen != oriLen) {
82+
taosMemoryFree(buf);
83+
return TSDB_CODE_INVALID_MSG;
84+
}
8085
memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead));
8186

8287
*len = oriLen + sizeof(STransMsgHead);
8388
pNewHead->msgLen = htonl(oriLen + sizeof(STransMsgHead));
8489

8590
taosMemoryFree(pHead);
8691
*msg = buf;
92+
return 0;
93+
}
94+
int32_t transDecompressMsgExt(char const* msg, int32_t len, char** out, int32_t* outLen) {
95+
STransMsgHead* pHead = (STransMsgHead*)msg;
96+
char* pCont = transContFromHead(pHead);
97+
98+
STransCompMsg* pComp = (STransCompMsg*)pCont;
99+
int32_t oriLen = htonl(pComp->contLen);
100+
101+
int32_t tlen = len;
102+
char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
103+
if (buf == NULL) {
104+
return terrno;
105+
}
106+
107+
STransMsgHead* pNewHead = (STransMsgHead*)buf;
108+
int32_t decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), (char*)pNewHead->content,
109+
tlen - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
87110
if (decompLen != oriLen) {
111+
tError("msgLen:%d, originLen:%d, decompLen:%d", len, oriLen, decompLen);
112+
taosMemoryFree(buf);
88113
return TSDB_CODE_INVALID_MSG;
89114
}
115+
memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead));
116+
117+
*out = buf;
118+
*outLen = oriLen + sizeof(STransMsgHead);
119+
pNewHead->msgLen = *outLen;
120+
pNewHead->comp = 0;
121+
90122
return 0;
91123
}
92124

tests/parallel_test/cases.task

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,6 +1300,7 @@
13001300
#,,y,script,./test.sh -f tsim/mnode/basic3.sim
13011301
,,y,script,./test.sh -f tsim/mnode/basic4.sim
13021302
,,y,script,./test.sh -f tsim/mnode/basic5.sim
1303+
,,y,script,./test.sh -f tsim/mnode/basic6.sim
13031304
,,y,script,./test.sh -f tsim/show/basic.sim
13041305
,,y,script,./test.sh -f tsim/table/autocreate.sim
13051306
,,y,script,./test.sh -f tsim/table/basic1.sim

0 commit comments

Comments
 (0)