Skip to content
1 change: 1 addition & 0 deletions include/common/streamMsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ typedef struct {
int64_t triggerTblUid; // suid or uid
int64_t triggerTblSuid;
int8_t triggerTblType;
int8_t isTriggerTblVirt;
int8_t deleteReCalc;
int8_t deleteOutTbl;
void* partitionCols; // nodelist of SColumnNode
Expand Down
14 changes: 8 additions & 6 deletions include/libs/new-stream/streamReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ typedef struct StreamTableListInfo {
typedef struct SStreamTriggerReaderInfo {
void* pTask;
int32_t order;
// SArray* schemas;
STimeWindow twindows;
uint64_t suid;
uint64_t uid;
int8_t tableType;
int8_t isVtableStream; // whether is virtual table stream
int8_t deleteReCalc;
int8_t deleteOutTbl;
SNode* pTagCond;
Expand All @@ -73,11 +73,12 @@ typedef struct SStreamTriggerReaderInfo {
int32_t numOfExprCalcTag;
SSHashObj* uidHashTrigger; // < uid -> SHashObj < slotId -> colId > >
SSHashObj* uidHashCalc; // < uid -> SHashObj < slotId -> colId > >
bool isVtableStream; // whether is virtual table stream
void* historyTableList;
SFilterInfo* pFilterInfo;
SHashObj* pTableMetaCacheTrigger;
SHashObj* pTableMetaCacheCalc;
SHashObj* triggerTableSchemaMapVTable; // key: uid, value: STSchema*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的 schema 包含原始表所有列,会不会有内存占用问题?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个schema不大,不会有问题。除非极限情况,引用了非常多的原始表,并且每个原始表列非常多。这种情况这个缓存占内存可能有点大。现在没必要为了异常极限情况做处理,处理比较麻烦。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

后续要给这种成员变量加上统计信息,当流计算内存占用较高时,便于分析。

STSchema* triggerTableSchema;
bool groupByTbname;
void* pVnode;
SStorageAPI storageApi;
Expand Down Expand Up @@ -136,24 +137,25 @@ void* qStreamGetReaderInfo(int64_t streamId, int64_t taskId, void** taskAddr);
void qStreamSetTaskRunning(int64_t streamId, int64_t taskId);
int32_t streamBuildFetchRsp(SArray* pResList, bool hasNext, void** data, size_t* size, int8_t precision);

int32_t qBuildVTableList(SSHashObj* uidHash, SStreamTriggerReaderInfo* sStreamReaderInfo);
int32_t qBuildVTableList(SStreamTriggerReaderInfo* sStreamReaderInfo);

int32_t createStreamTask(void* pVnode, SStreamOptions* options, SStreamReaderTaskInner** ppTask,
SSDataBlock* pResBlock, STableKeyInfo* pList, int32_t pNum, SStorageAPI* storageApi);

int32_t createStreamTaskForTs(SStreamOptions* options, SStreamReaderTaskInner** ppTask, SStorageAPI* api);


int32_t initStreamTableListInfo(StreamTableListInfo* pTableListInfo);
int32_t qStreamGetTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, uint64_t gid, STableKeyInfo** pKeyInfo, int32_t* size);
void qStreamDestroyTableInfo(StreamTableListInfo* pTableListInfo);
int32_t qStreamCopyTableInfo(SStreamTriggerReaderInfo* sStreamReaderInfo, StreamTableListInfo* dst);
int32_t qTransformStreamTableList(void* pTableListInfo, SStreamTriggerReaderInfo* sStreamReaderInfo);
int32_t qStreamSetTableList(StreamTableListInfo* pTableListInfo, int64_t uid, uint64_t gid);
int32_t qStreamGetTableListGroupNum(SStreamTriggerReaderInfo* sStreamReaderInfo);
int32_t qStreamGetTableListNum(SStreamTriggerReaderInfo* sStreamReaderInfo);
SArray* qStreamGetTableArrayList(SStreamTriggerReaderInfo* sStreamReaderInfo);
int32_t qStreamIterTableList(StreamTableListInfo* sStreamReaderInfo, STableKeyInfo** pKeyInfo, int32_t* size, int64_t* suid);
uint64_t qStreamGetGroupIdFromOrigin(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid);
uint64_t qStreamGetGroupIdFromSet(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid);
int32_t qStreamModifyTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, SArray* tableListAdd, SArray* tableListDel, SRWLatch* lock);
int32_t qStreamRemoveTableList(StreamTableListInfo* pTableListInfo, int64_t uid);

#ifdef __cplusplus
}
Expand Down
4 changes: 2 additions & 2 deletions source/client/src/clientTmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -1081,8 +1081,8 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
for (int32_t j = 0; j < topicNumCur; j++) {
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
if (pTopicCur && strcmp(pTopicCur->topicName, privilege->topic) == 0) {
tqInfoC("consumer:0x%" PRIx64 ", update noPrivilege:%d, topic:%s", tmq->consumerId, privilege->noPrivilege, privilege->topic);
if (pTopicCur && strcmp(pTopicCur->topicName, privilege->topic) == 0 && pTopicCur->noPrivilege != privilege->noPrivilege) {
tqInfoC("consumer:0x%" PRIx64 ", update privilege:%s, topic:%s", tmq->consumerId, privilege->noPrivilege ? "false" : "true", privilege->topic);
pTopicCur->noPrivilege = privilege->noPrivilege;
}
}
Expand Down
3 changes: 3 additions & 0 deletions source/common/src/msg/streamMsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,7 @@ int32_t tEncodeSStreamReaderDeployFromTrigger(SEncoder* pEncoder, const SStreamR
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblUid));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblSuid));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerTblType));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteReCalc));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteOutTbl));
TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, pMsg->partitionCols == NULL ? 0 : (int32_t)strlen(pMsg->partitionCols) + 1));
Expand Down Expand Up @@ -1460,6 +1461,7 @@ int32_t tDecodeSStreamReaderDeployFromTrigger(SDecoder* pDecoder, SStreamReaderD
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblUid));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblSuid));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerTblType));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteReCalc));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteOutTbl));
TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
Expand Down Expand Up @@ -2949,6 +2951,7 @@ int32_t tCloneStreamCreateDeployPointers(SCMCreateStreamReq *pSrc, SCMCreateStre
pDst->triggerPrec = pSrc->triggerPrec;
pDst->deleteReCalc = pSrc->deleteReCalc;
pDst->deleteOutTbl = pSrc->deleteOutTbl;
pDst->flags = pSrc->flags;

_exit:

Expand Down
1 change: 1 addition & 0 deletions source/dnode/mnode/impl/src/mndStreamMgmt.c
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,7 @@ int32_t msmBuildReaderDeployInfo(SStmTaskDeploy* pDeploy, void* calcScanPlan, SS
pTrigger->triggerTblUid = pInfo->pCreate->triggerTblUid;
pTrigger->triggerTblSuid = pInfo->pCreate->triggerTblSuid;
pTrigger->triggerTblType = pInfo->pCreate->triggerTblType;
pTrigger->isTriggerTblVirt = STREAM_IS_VIRTUAL_TABLE(pInfo->pCreate->triggerTblType, pInfo->pCreate->flags);
pTrigger->deleteReCalc = pInfo->pCreate->deleteReCalc;
pTrigger->deleteOutTbl = pInfo->pCreate->deleteOutTbl;
pTrigger->partitionCols = pInfo->pCreate->partitionCols;
Expand Down
Loading
Loading