Skip to content

Commit 49d48c1

Browse files
authored
fix: stream mgmt request error handling issue (#33665)
* fix: stream mgmt req error handling issue * fix: deploy original trigger reader issue
1 parent d0995fb commit 49d48c1

File tree

2 files changed

+51
-33
lines changed

2 files changed

+51
-33
lines changed

source/dnode/mnode/impl/inc/mndStream.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ bool mstEventHandledChkSet(int32_t event);
2929
typedef enum {
3030
STM_ERR_TASK_NOT_EXISTS = 1,
3131
STM_ERR_STREAM_STOPPED,
32+
STM_ERR_PROCESSING_ERR,
3233
} EStmErrType;
3334

3435

source/dnode/mnode/impl/src/mndStreamMgmt.c

Lines changed: 50 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3390,19 +3390,15 @@ void msmHandleTaskAbnormalStatus(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pMsg, SStm
33903390
}
33913391
}
33923392

3393-
void msmHandleStatusUpdateErr(SStmGrpCtx* pCtx, EStmErrType err, SStmTaskStatusMsg* pStatus) {
3393+
void msmHandleStreamTaskErr(SStmGrpCtx* pCtx, EStmErrType err, SStmTaskStatusMsg* pStatus) {
33943394
int32_t code = TSDB_CODE_SUCCESS;
33953395
int32_t lino = 0;
33963396
SStreamTask* pTask = (SStreamTask*)pStatus;
33973397
int64_t streamId = pStatus->streamId;
33983398

3399-
msttInfo("start to handle task status update exception, type: %d", err);
3400-
3401-
// STREAMTODO
3399+
msttInfo("start to handle task error, type: %d", err);
34023400

3403-
if (STM_ERR_TASK_NOT_EXISTS == err || STM_ERR_STREAM_STOPPED == err) {
3404-
TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
3405-
}
3401+
TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
34063402

34073403
_exit:
34083404

@@ -3471,23 +3467,23 @@ int32_t msmNormalHandleStatusUpdate(SStmGrpCtx* pCtx) {
34713467
SStmTaskStatus** ppStatus = taosHashGet(mStreamMgmt.taskMap, &pTask->streamId, sizeof(pTask->streamId) + sizeof(pTask->taskId));
34723468
if (NULL == ppStatus) {
34733469
msttWarn("task no longer exists in taskMap, will try to undeploy current task, taskIdx:%d", pTask->taskIdx);
3474-
msmHandleStatusUpdateErr(pCtx, STM_ERR_TASK_NOT_EXISTS, pTask);
3470+
msmHandleStreamTaskErr(pCtx, STM_ERR_TASK_NOT_EXISTS, pTask);
34753471
continue;
34763472
}
34773473

34783474
SStmStatus* pStream = (SStmStatus*)(*ppStatus)->pStream;
34793475
int8_t stopped = atomic_load_8(&pStream->stopped);
34803476
if (stopped) {
34813477
msttWarn("stream already stopped %d, will try to undeploy current task, taskIdx:%d", stopped, pTask->taskIdx);
3482-
msmHandleStatusUpdateErr(pCtx, STM_ERR_STREAM_STOPPED, pTask);
3478+
msmHandleStreamTaskErr(pCtx, STM_ERR_STREAM_STOPPED, pTask);
34833479
continue;
34843480
}
34853481

34863482
if ((pTask->seriousId != (*ppStatus)->id.seriousId) || (pTask->nodeId != (*ppStatus)->id.nodeId)) {
34873483
msttInfo("task mismatch with it in taskMap, will try to rm it, current seriousId:%" PRId64 ", nodeId:%d",
34883484
(*ppStatus)->id.seriousId, (*ppStatus)->id.nodeId);
34893485

3490-
msmHandleStatusUpdateErr(pCtx, STM_ERR_TASK_NOT_EXISTS, pTask);
3486+
msmHandleStreamTaskErr(pCtx, STM_ERR_TASK_NOT_EXISTS, pTask);
34913487
continue;
34923488
}
34933489

@@ -4068,7 +4064,7 @@ int32_t msmCheckDeployTrigReader(SStmGrpCtx* pCtx, SStmStatus* pStatus, SStmTask
40684064
}
40694065

40704066
int32_t msmDeployTriggerOrigReader(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
4071-
int32_t code = TSDB_CODE_SUCCESS;
4067+
int32_t code = TSDB_CODE_SUCCESS, finalCode = TSDB_CODE_SUCCESS;
40724068
int32_t lino = 0;
40734069
int32_t vgId = 0;
40744070
int64_t streamId = pTask->streamId;
@@ -4084,6 +4080,11 @@ int32_t msmDeployTriggerOrigReader(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
40844080
SSHashObj* pVgs = NULL;
40854081
SStreamMgmtReq* pMgmtReq = NULL;
40864082
int8_t stopped = 0;
4083+
4084+
if (NULL == pCtx->pRsp->rsps.rspList) {
4085+
pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
4086+
TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, finalCode, lino, _final, terrno);
4087+
}
40874088

40884089
TSWAP(pTask->pMgmtReq, pMgmtReq);
40894090
rsp.task = *(SStreamTask*)pTask;
@@ -4149,28 +4150,33 @@ int32_t msmDeployTriggerOrigReader(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
41494150
mstsDebug("the %dth otrigReader src added to trigger's virtual orig readerList, TASK:%" PRIx64 " nodeId:%d", i, addr.taskId, addr.nodeId);
41504151
}
41514152

4152-
if (NULL == pCtx->pRsp->rsps.rspList) {
4153-
pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
4154-
TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
4155-
}
4156-
4157-
TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
4158-
41594153
_exit:
41604154

41614155
tFreeSStreamMgmtReq(pMgmtReq);
41624156
taosMemoryFree(pMgmtReq);
41634157

41644158
tSimpleHashCleanup(pVgs);
4159+
mstDestroyDbVgroupsHash(pDbVgroups);
41654160

41664161
if (code) {
4167-
tFreeSStreamMgmtRsp(&rsp);
4162+
rsp.code = code;
4163+
4164+
TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), finalCode, lino, _final, terrno);
4165+
41684166
mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
4167+
} else {
4168+
TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), finalCode, lino, _final, terrno);
41694169
}
41704170

4171-
mstDestroyDbVgroupsHash(pDbVgroups);
4171+
_final:
41724172

4173-
return code;
4173+
if (finalCode) {
4174+
tFreeSStreamMgmtRsp(&rsp);
4175+
mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
4176+
msmHandleStreamTaskErr(pCtx, STM_ERR_PROCESSING_ERR, pTask);
4177+
}
4178+
4179+
return finalCode;
41744180
}
41754181

41764182
int32_t msmGetCalcScanFromList(int64_t streamId, SArray* pList, int64_t uid, SStreamCalcScan** ppRes) {
@@ -4250,7 +4256,7 @@ int32_t msmCheckDeployCalcReader(SStmGrpCtx* pCtx, SStmStatus* pStatus, SStmTask
42504256

42514257

42524258
int32_t msmDeployRunnerOrigReader(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
4253-
int32_t code = TSDB_CODE_SUCCESS;
4259+
int32_t code = TSDB_CODE_SUCCESS, finalCode = TSDB_CODE_SUCCESS;
42544260
int32_t lino = 0;
42554261
int32_t vgId = 0;
42564262
int64_t streamId = pTask->streamId;
@@ -4265,6 +4271,11 @@ int32_t msmDeployRunnerOrigReader(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
42654271
int8_t stopped = 0;
42664272
int32_t vgNum = 0;
42674273
SStreamTaskAddr* pAddr = NULL;
4274+
4275+
if (NULL == pCtx->pRsp->rsps.rspList) {
4276+
pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
4277+
TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, finalCode, lino, _final, terrno);
4278+
}
42684279

42694280
TSWAP(pTask->pMgmtReq, pMgmtReq);
42704281
rsp.task = *(SStreamTask*)pTask;
@@ -4304,24 +4315,30 @@ int32_t msmDeployRunnerOrigReader(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
43044315
}
43054316
}
43064317

4307-
if (NULL == pCtx->pRsp->rsps.rspList) {
4308-
pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
4309-
TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
4310-
}
4311-
4312-
TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
4313-
43144318
_exit:
43154319

43164320
tFreeSStreamMgmtReq(pMgmtReq);
43174321
taosMemoryFree(pMgmtReq);
43184322

43194323
if (code) {
4320-
tFreeSStreamMgmtRsp(&rsp);
4324+
rsp.code = code;
4325+
4326+
TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), finalCode, lino, _final, terrno);
43214327
mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
4328+
} else {
4329+
TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), finalCode, lino, _final, terrno);
43224330
}
43234331

4324-
return code;
4332+
4333+
_final:
4334+
4335+
if (finalCode) {
4336+
tFreeSStreamMgmtRsp(&rsp);
4337+
mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
4338+
msmHandleStreamTaskErr(pCtx, STM_ERR_PROCESSING_ERR, pTask);
4339+
}
4340+
4341+
return finalCode;
43254342
}
43264343

43274344

@@ -4331,10 +4348,10 @@ int32_t msmHandleTaskMgmtReq(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
43314348

43324349
switch (pTask->pMgmtReq->type) {
43334350
case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
4334-
TAOS_CHECK_EXIT(msmDeployTriggerOrigReader(pCtx, pTask));
4351+
msmDeployTriggerOrigReader(pCtx, pTask);
43354352
break;
43364353
case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER:
4337-
TAOS_CHECK_EXIT(msmDeployRunnerOrigReader(pCtx, pTask));
4354+
msmDeployRunnerOrigReader(pCtx, pTask);
43384355
break;
43394356
default:
43404357
msttError("Invalid mgmtReq type:%d", pTask->pMgmtReq->type);

0 commit comments

Comments
 (0)