Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions include/libs/function/functionMgt.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ typedef enum EFunctionType {
FUNCTION_TYPE_HISTOGRAM,
FUNCTION_TYPE_HYPERLOGLOG,
FUNCTION_TYPE_STDVAR,
FUNCTION_TYPE_STDDEV_SAMP,
FUNCTION_TYPE_STDVAR_SAMP,
FUNCTION_TYPE_GROUP_CONCAT,

// nonstandard SQL function
FUNCTION_TYPE_BOTTOM = 500,
Expand Down Expand Up @@ -164,19 +167,19 @@ typedef enum EFunctionType {
FUNCTION_TYPE_IROWTS_ORIGIN,
FUNCTION_TYPE_GROUP_ID,
FUNCTION_TYPE_IS_WINDOW_FILLED,
FUNCTION_TYPE_TPREV_TS, // _tprev_ts
FUNCTION_TYPE_TCURRENT_TS, // _tcurrent_ts
FUNCTION_TYPE_TNEXT_TS, // _tnext_ts
FUNCTION_TYPE_TWSTART, // _twstart
FUNCTION_TYPE_TWEND, // _twend
FUNCTION_TYPE_TWDURATION, // _twduration
FUNCTION_TYPE_TWROWNUM, // _twrownum
FUNCTION_TYPE_TPREV_LOCALTIME, // _tprev_localtime
FUNCTION_TYPE_TNEXT_LOCALTIME, // _tnext_localtime
FUNCTION_TYPE_TLOCALTIME, // _tlocaltime
FUNCTION_TYPE_TGRPID, // _tgrpid
FUNCTION_TYPE_PLACEHOLDER_COLUMN, // %%n
FUNCTION_TYPE_PLACEHOLDER_TBNAME, // %%tbname
FUNCTION_TYPE_TPREV_TS, // _tprev_ts
FUNCTION_TYPE_TCURRENT_TS, // _tcurrent_ts
FUNCTION_TYPE_TNEXT_TS, // _tnext_ts
FUNCTION_TYPE_TWSTART, // _twstart
FUNCTION_TYPE_TWEND, // _twend
FUNCTION_TYPE_TWDURATION, // _twduration
FUNCTION_TYPE_TWROWNUM, // _twrownum
FUNCTION_TYPE_TPREV_LOCALTIME, // _tprev_localtime
FUNCTION_TYPE_TNEXT_LOCALTIME, // _tnext_localtime
FUNCTION_TYPE_TLOCALTIME, // _tlocaltime
FUNCTION_TYPE_TGRPID, // _tgrpid
FUNCTION_TYPE_PLACEHOLDER_COLUMN, // %%n
FUNCTION_TYPE_PLACEHOLDER_TBNAME, // %%tbname

// internal function
FUNCTION_TYPE_SELECT_VALUE = 3750,
Expand Down Expand Up @@ -228,7 +231,8 @@ typedef enum EFunctionType {
FUNCTION_TYPE_STD_STATE_MERGE,
FUNCTION_TYPE_HYPERLOGLOG_STATE,
FUNCTION_TYPE_HYPERLOGLOG_STATE_MERGE,

FUNCTION_TYPE_STDDEV_SAMP_MERGE,
FUNCTION_TYPE_STDVAR_SAMP_MERGE,

// geometry functions
FUNCTION_TYPE_GEOM_FROM_TEXT = 4250,
Expand Down Expand Up @@ -325,7 +329,8 @@ bool fmIsPlaceHolderFunc(int32_t funcId);

void getLastCacheDataType(SDataType* pType, int32_t pkBytes);
int32_t createFunction(const char* pName, SNodeList* pParameterList, SFunctionNode** pFunc);
int32_t createFunctionWithSrcFunc(const char* pName, const SFunctionNode* pSrcFunc, SNodeList* pParameterList, SFunctionNode** pFunc);
int32_t createFunctionWithSrcFunc(const char* pName, const SFunctionNode* pSrcFunc, SNodeList* pParameterList,
SFunctionNode** pFunc);

int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc,
SFunctionNode** pMergeFunc);
Expand Down Expand Up @@ -373,13 +378,15 @@ bool fmIsCountLikeFunc(int32_t funcId);

// } SStreamPseudoFuncType;

int32_t fmGetStreamPesudoFuncEnv(int32_t funcId, SNodeList* pParamNodes, SFuncExecEnv *pEnv);
int32_t fmGetStreamPesudoFuncEnv(int32_t funcId, SNodeList* pParamNodes, SFuncExecEnv* pEnv);

int32_t fmSetStreamPseudoFuncParamVal(int32_t funcId, SNodeList* pParamNodes, const SStreamRuntimeFuncInfo* pStreamRuntimeInfo);
int32_t fmSetStreamPseudoFuncParamVal(int32_t funcId, SNodeList* pParamNodes,
const SStreamRuntimeFuncInfo* pStreamRuntimeInfo);

const void* fmGetStreamPesudoFuncVal(int32_t funcId, const SStreamRuntimeFuncInfo* pStreamRuntimeFuncInfo);

void fmGetStreamPesudoFuncValTbname(int32_t funcId, const SStreamRuntimeFuncInfo* pStreamRuntimeFuncInfo, void** data, int32_t* dataLen);
void fmGetStreamPesudoFuncValTbname(int32_t funcId, const SStreamRuntimeFuncInfo* pStreamRuntimeFuncInfo, void** data,
int32_t* dataLen);
#ifdef __cplusplus
}
#endif
Expand Down
73 changes: 39 additions & 34 deletions source/libs/executor/src/aggregateoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ typedef struct SAggOperatorInfo {
bool cleanGroupResInfo;
} SAggOperatorInfo;

static void destroyAggOperatorInfo(void* param);
static void destroyAggOperatorInfo(void* param);
static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);

static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
Expand Down Expand Up @@ -100,12 +100,11 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);


code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
TSDB_CHECK_CODE(code, lino, _error);

code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
TSDB_CHECK_CODE(code, lino, _error);

if (pAggNode->pExprs != NULL) {
Expand Down Expand Up @@ -192,7 +191,7 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SAggOperatorInfo* pAggInfo = pOperator->info;

if(!pAggInfo) {
if (!pAggInfo) {
qError("function:%s, pAggInfo is NULL", __func__);
return false;
}
Expand Down Expand Up @@ -239,7 +238,8 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated before apply the group aggregation.
if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL,
GET_STM_RTINFO(pOperator->pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code);
Expand Down Expand Up @@ -349,8 +349,14 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
if (pCtx[k].fpSet.process == NULL) {
continue;
}

/*
int32_t firstData = 0;
if (pCtx[k].numOfParams > 1) {
firstData = 1;
}
*/
if ((&pCtx[k])->input.pData[0] == NULL) {
// if ((&pCtx[k])->input.pData[firstData] == NULL) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
qError("%s aggregate function error happens, input data is NULL.", GET_TASKID(pOperator->pTaskInfo));
} else {
Expand All @@ -371,8 +377,8 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
}

static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSDataBlock* pBlock = NULL;
if (!tsCountAlwaysReturnValue) {
return TSDB_CODE_SUCCESS;
Expand Down Expand Up @@ -583,7 +589,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
}

uint32_t defaultPgsz = 0;
int64_t defaultBufsz = 0;
int64_t defaultBufsz = 0;
code = getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
if (code) {
qError("failed to get buff page size, rowSize:%d", pAggSup->resultRowSize);
Expand Down Expand Up @@ -620,7 +626,7 @@ void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp
if (!needCleanup) {
return;
}

for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i);
SRowBuffPos* pPos = pWinInfo->pStatePos;
Expand All @@ -642,7 +648,7 @@ void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp
}

void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo) {
SGroupResInfo* pGroupResInfo) {
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
SqlFunctionCtx* pCtx = pSup->pCtx;
Expand All @@ -657,16 +663,15 @@ void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup,
}

for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
SResultRow* pRow = NULL;
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
SResultRow* pRow = NULL;
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
if (page == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
continue;
}
pRow = (SResultRow*)((char*)page + pPos->pos.offset);


for (int32_t j = 0; j < numOfExprs; ++j) {
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
if (pCtx[j].fpSet.cleanup) {
Expand All @@ -678,7 +683,7 @@ void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup,
}

void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) {
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) {
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
SqlFunctionCtx* pCtx = pSup->pCtx;
Expand Down Expand Up @@ -715,8 +720,8 @@ void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDisk
}
}

void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo,
SAggSupporter *pAggSup, bool cleanGroupResInfo) {
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo, SAggSupporter* pAggSup,
bool cleanGroupResInfo) {
if (cleanGroupResInfo) {
cleanupResultInfoInGroupResInfo(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo);
} else {
Expand Down Expand Up @@ -775,21 +780,22 @@ int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx*

if (fmIsPlaceHolderFunc(pCtx[k].functionId)) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
char* p = GET_ROWCELL_INTERBUF(pEntryInfo);

TAOS_CHECK_EXIT(fmSetStreamPseudoFuncParamVal(pCtx[k].functionId, pCtx[k].pExpr->base.pParamList, &taskInfo->pStreamRuntimeInfo->funcInfo));
TAOS_CHECK_EXIT(fmSetStreamPseudoFuncParamVal(pCtx[k].functionId, pCtx[k].pExpr->base.pParamList,
&taskInfo->pStreamRuntimeInfo->funcInfo));

SValueNode *valueNode = (SValueNode *)nodesListGetNode(pCtx[k].pExpr->base.pParamList, 0);
SValueNode* valueNode = (SValueNode*)nodesListGetNode(pCtx[k].pExpr->base.pParamList, 0);
pEntryInfo->isNullRes = 0;
if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) {
pEntryInfo->isNullRes = 1;
} else if (IS_VAR_DATA_TYPE(pCtx[k].pExpr->base.resSchema.type)){
} else if (IS_VAR_DATA_TYPE(pCtx[k].pExpr->base.resSchema.type)) {
void* v = nodesGetValueFromNode(valueNode);
memcpy(p, v, varDataTLen(v));
} else {
memcpy(p, nodesGetValueFromNode(valueNode), pCtx[k].pExpr->base.resSchema.bytes);
}

pEntryInfo->numOfRes = 1;
} else if (pCtx[k].isPseudoFunc) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
Expand Down Expand Up @@ -852,22 +858,21 @@ void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {

static int32_t resetAggregateOperatorState(SOperatorInfo* pOper) {
SAggOperatorInfo* pAgg = pOper->info;
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pOper->pPhyNode;
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pOper->pPhyNode;

pOper->status = OP_NOT_OPENED;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
SExecTaskInfo* pTaskInfo = pOper->pTaskInfo;
cleanupResultInfo(pTaskInfo, &pOper->exprSupp, &pAgg->groupResInfo, &pAgg->aggSup,
pAgg->cleanGroupResInfo);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
SExecTaskInfo* pTaskInfo = pOper->pTaskInfo;
cleanupResultInfo(pTaskInfo, &pOper->exprSupp, &pAgg->groupResInfo, &pAgg->aggSup, pAgg->cleanGroupResInfo);
cleanupGroupResInfo(&pAgg->groupResInfo);
resetBasicOperatorState(&pAgg->binfo);

int32_t code = resetAggSup(&pOper->exprSupp, &pAgg->aggSup, pTaskInfo, pAggNode->pAggFuncs, pAggNode->pGroupKeys,
keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);

int32_t code =
resetAggSup(&pOper->exprSupp, &pAgg->aggSup, pTaskInfo, pAggNode->pAggFuncs, pAggNode->pGroupKeys, keyBufSize,
pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);

if (code == 0) {
code = resetExprSupp(&pAgg->scalarExprSup, pTaskInfo, pAggNode->pExprs, NULL,
&pTaskInfo->storageAPI.functionStore);
code = resetExprSupp(&pAgg->scalarExprSup, pTaskInfo, pAggNode->pExprs, NULL, &pTaskInfo->storageAPI.functionStore);
}

pAgg->groupId = UINT64_MAX;
Expand Down
9 changes: 7 additions & 2 deletions source/libs/function/inc/builtinsimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ extern "C" {
#include "functionMgt.h"
#include "functionResInfoInt.h"


int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems);
int32_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res);
int32_t i16VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res);
Expand Down Expand Up @@ -87,6 +86,8 @@ int32_t stdFunction(SqlFunctionCtx* pCtx);
int32_t stdFunctionMerge(SqlFunctionCtx* pCtx);
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t stdvarFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t stddevsampFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t stdvarsampFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t stdPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);

#ifdef BUILD_NO_CALL
Expand All @@ -96,6 +97,11 @@ int32_t stdInvertFunction(SqlFunctionCtx* pCtx);
int32_t stdCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t getStdInfoSize();

bool gconcatGetFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t gconcatFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t gconcatFunction(SqlFunctionCtx* pCtx);
int32_t gconcatFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);

bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t leastSQRFunction(SqlFunctionCtx* pCtx);
Expand Down Expand Up @@ -252,7 +258,6 @@ int32_t blockDBUsageSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo
int32_t blockDBUsageFunction(SqlFunctionCtx* pCtx);
int32_t blockDBUsageFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);


#ifdef __cplusplus
}
#endif
Expand Down
Loading
Loading