Skip to content

Commit 72c3efb

Browse files
authored
enh(stream): fix table not exists in processTag (#33625)
1 parent 7394098 commit 72c3efb

File tree

10 files changed

+423
-191
lines changed

10 files changed

+423
-191
lines changed

include/common/streamMsg.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ typedef struct {
511511
int64_t triggerTblUid; // suid or uid
512512
int64_t triggerTblSuid;
513513
int8_t triggerTblType;
514+
int8_t isTriggerTblVirt;
514515
int8_t deleteReCalc;
515516
int8_t deleteOutTbl;
516517
void* partitionCols; // nodelist of SColumnNode

include/libs/new-stream/streamReader.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,11 @@ typedef struct StreamTableListInfo {
4343
typedef struct SStreamTriggerReaderInfo {
4444
void* pTask;
4545
int32_t order;
46-
// SArray* schemas;
4746
STimeWindow twindows;
4847
uint64_t suid;
4948
uint64_t uid;
5049
int8_t tableType;
50+
int8_t isVtableStream; // whether is virtual table stream
5151
int8_t deleteReCalc;
5252
int8_t deleteOutTbl;
5353
SNode* pTagCond;
@@ -73,11 +73,12 @@ typedef struct SStreamTriggerReaderInfo {
7373
int32_t numOfExprCalcTag;
7474
SSHashObj* uidHashTrigger; // < uid -> SHashObj < slotId -> colId > >
7575
SSHashObj* uidHashCalc; // < uid -> SHashObj < slotId -> colId > >
76-
bool isVtableStream; // whether is virtual table stream
7776
void* historyTableList;
7877
SFilterInfo* pFilterInfo;
7978
SHashObj* pTableMetaCacheTrigger;
8079
SHashObj* pTableMetaCacheCalc;
80+
SHashObj* triggerTableSchemaMapVTable; // key: uid, value: STSchema*
81+
STSchema* triggerTableSchema;
8182
bool groupByTbname;
8283
void* pVnode;
8384
SStorageAPI storageApi;
@@ -136,24 +137,25 @@ void* qStreamGetReaderInfo(int64_t streamId, int64_t taskId, void** taskAddr);
136137
void qStreamSetTaskRunning(int64_t streamId, int64_t taskId);
137138
int32_t streamBuildFetchRsp(SArray* pResList, bool hasNext, void** data, size_t* size, int8_t precision);
138139

139-
int32_t qBuildVTableList(SSHashObj* uidHash, SStreamTriggerReaderInfo* sStreamReaderInfo);
140+
int32_t qBuildVTableList(SStreamTriggerReaderInfo* sStreamReaderInfo);
140141

141142
int32_t createStreamTask(void* pVnode, SStreamOptions* options, SStreamReaderTaskInner** ppTask,
142143
SSDataBlock* pResBlock, STableKeyInfo* pList, int32_t pNum, SStorageAPI* storageApi);
143144

144145
int32_t createStreamTaskForTs(SStreamOptions* options, SStreamReaderTaskInner** ppTask, SStorageAPI* api);
145-
146+
147+
int32_t initStreamTableListInfo(StreamTableListInfo* pTableListInfo);
146148
int32_t qStreamGetTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, uint64_t gid, STableKeyInfo** pKeyInfo, int32_t* size);
147149
void qStreamDestroyTableInfo(StreamTableListInfo* pTableListInfo);
148150
int32_t qStreamCopyTableInfo(SStreamTriggerReaderInfo* sStreamReaderInfo, StreamTableListInfo* dst);
149-
int32_t qTransformStreamTableList(void* pTableListInfo, SStreamTriggerReaderInfo* sStreamReaderInfo);
151+
int32_t qStreamSetTableList(StreamTableListInfo* pTableListInfo, int64_t uid, uint64_t gid);
150152
int32_t qStreamGetTableListGroupNum(SStreamTriggerReaderInfo* sStreamReaderInfo);
151153
int32_t qStreamGetTableListNum(SStreamTriggerReaderInfo* sStreamReaderInfo);
152154
SArray* qStreamGetTableArrayList(SStreamTriggerReaderInfo* sStreamReaderInfo);
153155
int32_t qStreamIterTableList(StreamTableListInfo* sStreamReaderInfo, STableKeyInfo** pKeyInfo, int32_t* size, int64_t* suid);
154156
uint64_t qStreamGetGroupIdFromOrigin(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid);
155157
uint64_t qStreamGetGroupIdFromSet(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid);
156-
int32_t qStreamModifyTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, SArray* tableListAdd, SArray* tableListDel, SRWLatch* lock);
158+
int32_t qStreamRemoveTableList(StreamTableListInfo* pTableListInfo, int64_t uid);
157159

158160
#ifdef __cplusplus
159161
}

source/client/src/clientTmq.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,8 +1081,8 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
10811081
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
10821082
for (int32_t j = 0; j < topicNumCur; j++) {
10831083
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
1084-
if (pTopicCur && strcmp(pTopicCur->topicName, privilege->topic) == 0) {
1085-
tqInfoC("consumer:0x%" PRIx64 ", update noPrivilege:%d, topic:%s", tmq->consumerId, privilege->noPrivilege, privilege->topic);
1084+
if (pTopicCur && strcmp(pTopicCur->topicName, privilege->topic) == 0 && pTopicCur->noPrivilege != privilege->noPrivilege) {
1085+
tqInfoC("consumer:0x%" PRIx64 ", update privilege:%s, topic:%s", tmq->consumerId, privilege->noPrivilege ? "false" : "true", privilege->topic);
10861086
pTopicCur->noPrivilege = privilege->noPrivilege;
10871087
}
10881088
}

source/common/src/msg/streamMsg.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,7 @@ int32_t tEncodeSStreamReaderDeployFromTrigger(SEncoder* pEncoder, const SStreamR
911911
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblUid));
912912
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblSuid));
913913
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerTblType));
914+
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
914915
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteReCalc));
915916
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteOutTbl));
916917
TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, pMsg->partitionCols == NULL ? 0 : (int32_t)strlen(pMsg->partitionCols) + 1));
@@ -1460,6 +1461,7 @@ int32_t tDecodeSStreamReaderDeployFromTrigger(SDecoder* pDecoder, SStreamReaderD
14601461
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblUid));
14611462
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblSuid));
14621463
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerTblType));
1464+
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
14631465
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteReCalc));
14641466
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteOutTbl));
14651467
TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
@@ -2949,6 +2951,7 @@ int32_t tCloneStreamCreateDeployPointers(SCMCreateStreamReq *pSrc, SCMCreateStre
29492951
pDst->triggerPrec = pSrc->triggerPrec;
29502952
pDst->deleteReCalc = pSrc->deleteReCalc;
29512953
pDst->deleteOutTbl = pSrc->deleteOutTbl;
2954+
pDst->flags = pSrc->flags;
29522955

29532956
_exit:
29542957

source/dnode/mnode/impl/src/mndStreamMgmt.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,7 @@ int32_t msmBuildReaderDeployInfo(SStmTaskDeploy* pDeploy, void* calcScanPlan, SS
757757
pTrigger->triggerTblUid = pInfo->pCreate->triggerTblUid;
758758
pTrigger->triggerTblSuid = pInfo->pCreate->triggerTblSuid;
759759
pTrigger->triggerTblType = pInfo->pCreate->triggerTblType;
760+
pTrigger->isTriggerTblVirt = STREAM_IS_VIRTUAL_TABLE(pInfo->pCreate->triggerTblType, pInfo->pCreate->flags);
760761
pTrigger->deleteReCalc = pInfo->pCreate->deleteReCalc;
761762
pTrigger->deleteOutTbl = pInfo->pCreate->deleteOutTbl;
762763
pTrigger->partitionCols = pInfo->pCreate->partitionCols;

0 commit comments

Comments
 (0)