From b9c99195ceb3eedb02c8642e2f55cee3aaeead50 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 22 Aug 2025 11:05:05 +0800 Subject: [PATCH 1/6] feat(agg/var): sampled funcs for stddev & var, and others --- include/libs/function/functionMgt.h | 43 ++-- source/libs/function/inc/builtinsimpl.h | 4 +- source/libs/function/src/builtins.c | 277 ++++++++++++++++++++++-- source/libs/function/src/builtinsimpl.c | 50 +++++ 4 files changed, 335 insertions(+), 39 deletions(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 7eed7a43b66e..6c1982d29013 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -48,6 +48,9 @@ typedef enum EFunctionType { FUNCTION_TYPE_HISTOGRAM, FUNCTION_TYPE_HYPERLOGLOG, FUNCTION_TYPE_STDVAR, + FUNCTION_TYPE_STDDEV_SAMP, + FUNCTION_TYPE_STDVAR_SAMP, + FUNCTION_TYPE_GROUP_CONCAT, // nonstandard SQL function FUNCTION_TYPE_BOTTOM = 500, @@ -164,19 +167,19 @@ typedef enum EFunctionType { FUNCTION_TYPE_IROWTS_ORIGIN, FUNCTION_TYPE_GROUP_ID, FUNCTION_TYPE_IS_WINDOW_FILLED, - FUNCTION_TYPE_TPREV_TS, // _tprev_ts - FUNCTION_TYPE_TCURRENT_TS, // _tcurrent_ts - FUNCTION_TYPE_TNEXT_TS, // _tnext_ts - FUNCTION_TYPE_TWSTART, // _twstart - FUNCTION_TYPE_TWEND, // _twend - FUNCTION_TYPE_TWDURATION, // _twduration - FUNCTION_TYPE_TWROWNUM, // _twrownum - FUNCTION_TYPE_TPREV_LOCALTIME, // _tprev_localtime - FUNCTION_TYPE_TNEXT_LOCALTIME, // _tnext_localtime - FUNCTION_TYPE_TLOCALTIME, // _tlocaltime - FUNCTION_TYPE_TGRPID, // _tgrpid - FUNCTION_TYPE_PLACEHOLDER_COLUMN, // %%n - FUNCTION_TYPE_PLACEHOLDER_TBNAME, // %%tbname + FUNCTION_TYPE_TPREV_TS, // _tprev_ts + FUNCTION_TYPE_TCURRENT_TS, // _tcurrent_ts + FUNCTION_TYPE_TNEXT_TS, // _tnext_ts + FUNCTION_TYPE_TWSTART, // _twstart + FUNCTION_TYPE_TWEND, // _twend + FUNCTION_TYPE_TWDURATION, // _twduration + FUNCTION_TYPE_TWROWNUM, // _twrownum + FUNCTION_TYPE_TPREV_LOCALTIME, // _tprev_localtime + FUNCTION_TYPE_TNEXT_LOCALTIME, // _tnext_localtime + FUNCTION_TYPE_TLOCALTIME, // _tlocaltime + FUNCTION_TYPE_TGRPID, // _tgrpid + FUNCTION_TYPE_PLACEHOLDER_COLUMN, // %%n + FUNCTION_TYPE_PLACEHOLDER_TBNAME, // %%tbname // internal function FUNCTION_TYPE_SELECT_VALUE = 3750, @@ -214,6 +217,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_STD_PARTIAL, FUNCTION_TYPE_STDDEV_MERGE, FUNCTION_TYPE_STDVAR_MERGE, + FUNCTION_TYPE_STDDEV_SAMP_MERGE, + FUNCTION_TYPE_STDVAR_SAMP_MERGE, FUNCTION_TYPE_IRATE_PARTIAL, FUNCTION_TYPE_IRATE_MERGE, FUNCTION_TYPE_AVG_STATE, @@ -229,7 +234,6 @@ typedef enum EFunctionType { FUNCTION_TYPE_HYPERLOGLOG_STATE, FUNCTION_TYPE_HYPERLOGLOG_STATE_MERGE, - // geometry functions FUNCTION_TYPE_GEOM_FROM_TEXT = 4250, FUNCTION_TYPE_AS_TEXT, @@ -325,7 +329,8 @@ bool fmIsPlaceHolderFunc(int32_t funcId); void getLastCacheDataType(SDataType* pType, int32_t pkBytes); int32_t createFunction(const char* pName, SNodeList* pParameterList, SFunctionNode** pFunc); -int32_t createFunctionWithSrcFunc(const char* pName, const SFunctionNode* pSrcFunc, SNodeList* pParameterList, SFunctionNode** pFunc); +int32_t createFunctionWithSrcFunc(const char* pName, const SFunctionNode* pSrcFunc, SNodeList* pParameterList, + SFunctionNode** pFunc); int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc, SFunctionNode** pMergeFunc); @@ -373,13 +378,15 @@ bool fmIsCountLikeFunc(int32_t funcId); // } SStreamPseudoFuncType; -int32_t fmGetStreamPesudoFuncEnv(int32_t funcId, SNodeList* pParamNodes, SFuncExecEnv *pEnv); +int32_t fmGetStreamPesudoFuncEnv(int32_t funcId, SNodeList* pParamNodes, SFuncExecEnv* pEnv); -int32_t fmSetStreamPseudoFuncParamVal(int32_t funcId, SNodeList* pParamNodes, const SStreamRuntimeFuncInfo* pStreamRuntimeInfo); +int32_t fmSetStreamPseudoFuncParamVal(int32_t funcId, SNodeList* pParamNodes, + const SStreamRuntimeFuncInfo* pStreamRuntimeInfo); const void* fmGetStreamPesudoFuncVal(int32_t funcId, const SStreamRuntimeFuncInfo* pStreamRuntimeFuncInfo); -void fmGetStreamPesudoFuncValTbname(int32_t funcId, const SStreamRuntimeFuncInfo* pStreamRuntimeFuncInfo, void** data, int32_t* dataLen); +void fmGetStreamPesudoFuncValTbname(int32_t funcId, const SStreamRuntimeFuncInfo* pStreamRuntimeFuncInfo, void** data, + int32_t* dataLen); #ifdef __cplusplus } #endif diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index a8a5a96a22f2..de11f48950e1 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -24,7 +24,6 @@ extern "C" { #include "functionMgt.h" #include "functionResInfoInt.h" - int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems); int32_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res); int32_t i16VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res); @@ -87,6 +86,8 @@ int32_t stdFunction(SqlFunctionCtx* pCtx); int32_t stdFunctionMerge(SqlFunctionCtx* pCtx); int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t stdvarFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t stddevsampFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t stdvarsampFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t stdPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); #ifdef BUILD_NO_CALL @@ -252,7 +253,6 @@ int32_t blockDBUsageSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo int32_t blockDBUsageFunction(SqlFunctionCtx* pCtx); int32_t blockDBUsageFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); - #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index f3aabf4fb375..5878486628f2 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -16,14 +16,14 @@ #include "builtins.h" #include "builtinsimpl.h" #include "cJSON.h" +#include "functionMgt.h" #include "geomFunc.h" #include "querynodes.h" #include "scalar.h" #include "tanalytics.h" #include "taoserror.h" -#include "functionMgt.h" -#include "ttypes.h" #include "tglobal.h" +#include "ttypes.h" static int32_t buildFuncErrMsg(char* pErrBuf, int32_t len, int32_t errCode, const char* pFormat, ...) { va_list vArgList; @@ -189,8 +189,8 @@ static int32_t countTrailingSpaces(const SValueNode* pVal, bool isLtrim) { } static int32_t addTimezoneParam(SNodeList* pList, timezone_t tz) { - char buf[TD_TIME_STR_LEN] = {0}; - time_t t; + char buf[TD_TIME_STR_LEN] = {0}; + time_t t; int32_t code = taosTime(&t); if (code != 0) { return code; @@ -517,7 +517,7 @@ static bool paramSupportDataType(SDataType* pDataType, uint64_t typeFlag) { case TSDB_DATA_TYPE_BLOB: return paramSupportBlob(typeFlag); default: - + return false; } } @@ -990,7 +990,7 @@ static int32_t translateAvg(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { uint8_t dt = TSDB_DATA_TYPE_DOUBLE, prec = 0, scale = 0; - bool isMergeFunc = pFunc->funcType == FUNCTION_TYPE_AVG_MERGE || pFunc->funcType == FUNCTION_TYPE_AVG_STATE_MERGE; + bool isMergeFunc = pFunc->funcType == FUNCTION_TYPE_AVG_MERGE || pFunc->funcType == FUNCTION_TYPE_AVG_STATE_MERGE; SDataType* pInputDt = getSDataTypeFromNode( nodesListGetNode(isMergeFunc ? pFunc->pSrcFuncRef->pParameterList : pFunc->pParameterList, 0)); pFunc->srcFuncInputType = *pInputDt; @@ -1023,7 +1023,7 @@ static int32_t translateBase64(SFunctionNode* pFunc, char* pErrBuf, int32_t len) FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len)); SDataType* pRestType1 = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0)); - int32_t outputLength = base64BufSize(pRestType1->bytes); + int32_t outputLength = base64BufSize(pRestType1->bytes); pFunc->node.resType = (SDataType){.bytes = outputLength, .type = TSDB_DATA_TYPE_VARCHAR}; return TSDB_CODE_SUCCESS; @@ -1088,7 +1088,8 @@ static int32_t translateSum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { scale = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.scale; } - pFunc->node.resType = (SDataType){.bytes = tDataTypes[resType].bytes, .type = resType, .precision = prec, .scale = scale}; + pFunc->node.resType = + (SDataType){.bytes = tDataTypes[resType].bytes, .type = resType, .precision = prec, .scale = scale}; return TSDB_CODE_SUCCESS; } @@ -1166,8 +1167,8 @@ static int32_t translatePlaceHolderPseudoColumn(SFunctionNode* pFunc, char* pErr break; } case FUNCTION_TYPE_PLACEHOLDER_TBNAME: { - pFunc->node.resType = (SDataType){.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE, - .type = TSDB_DATA_TYPE_BINARY}; + pFunc->node.resType = + (SDataType){.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; break; } case FUNCTION_TYPE_PLACEHOLDER_COLUMN: { @@ -1273,9 +1274,10 @@ static int32_t translateSampleTail(SFunctionNode* pFunc, char* pErrBuf, int32_t uint8_t colType = pSDataType->type; // set result type - pFunc->node.resType = - (SDataType){.bytes = IS_STR_DATA_TYPE(colType) ? pSDataType->bytes : tDataTypes[colType].bytes, .type = colType, - .precision = pSDataType->precision, .scale = pSDataType->scale}; + pFunc->node.resType = (SDataType){.bytes = IS_STR_DATA_TYPE(colType) ? pSDataType->bytes : tDataTypes[colType].bytes, + .type = colType, + .precision = pSDataType->precision, + .scale = pSDataType->scale}; return TSDB_CODE_SUCCESS; } @@ -1793,12 +1795,12 @@ static int32_t translateOutVarchar(SFunctionNode* pFunc, char* pErrBuf, int32_t case FUNCTION_TYPE_TBNAME: bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE; break; - case FUNCTION_TYPE_TIMEZONE:{ + case FUNCTION_TYPE_TIMEZONE: { if (pFunc->tz == NULL) { bytes = VARSTR_HEADER_SIZE + strlen(tsTimezoneStr); - }else{ - char *tzName = (char*)taosHashGet(pTimezoneNameMap, &pFunc->tz, sizeof(timezone_t)); - if (tzName == NULL){ + } else { + char* tzName = (char*)taosHashGet(pTimezoneNameMap, &pFunc->tz, sizeof(timezone_t)); + if (tzName == NULL) { tzName = TZ_UNKNOWN; } bytes = strlen(tzName) + VARSTR_HEADER_SIZE; @@ -1882,7 +1884,7 @@ static int32_t translateGreatestleast(SFunctionNode* pFunc, char* pErrBuf, int32 bool mixTypeToStrings = tsCompareAsStrInGreatest; SDataType res = {.type = 0}; - bool resInit = false; + bool resInit = false; for (int32_t i = 0; i < LIST_LENGTH(pFunc->pParameterList); i++) { SDataType* para = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, i)); @@ -2063,6 +2065,45 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .pStateFunc = "max", .pMergeFunc = "max", }, + { + .name = "group_concat", + .type = FUNCTION_TYPE_GROUP_CONCAT, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, + .parameters = {.minParamNum = 3, + .maxParamNum = 9, + .paramInfoPattern = 1, + .inputParaInfo[0][0] = {.isLastParam = false, + .startParam = 1, + .endParam = 1, + .validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE | FUNC_PARAM_SUPPORT_NCHAR_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_VALUE_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .inputParaInfo[0][1] = {.isLastParam = true, + .startParam = 2, + .endParam = 9, + .validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE | FUNC_PARAM_SUPPORT_NCHAR_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE | FUNC_PARAM_SUPPORT_NCHAR_TYPE}}, + .translateFunc = translateConcatWs, + .sprocessFunc = concatWsFunction, + /* + .getEnvFunc = getGroupConcatFuncEnv, + .initFunc = functionSetup, + .processFunc = groupConcatFunction, + .sprocessFunc = groupConcatScalarFunction, + .finalizeFunc = groupConcatFinalize, +#ifdef BUILD_NO_CALL + .invertFunc = groupConcatInvertFunction, +#endif + .combineFunc = groupConcatCombine, + .pPartialFunc = "groupConcat", + .pStateFunc = "groupConcat", + .pMergeFunc = "groupConcat" + */ + }, { .name = "stddev", .type = FUNCTION_TYPE_STDDEV, @@ -2144,6 +2185,33 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .pPartialFunc = "_std_state_merge", .pMergeFunc = "_stddev_merge", }, + { + .name = "_stddev_samp_merge", + .type = FUNCTION_TYPE_STDDEV_SAMP_MERGE, + .classification = FUNC_MGT_AGG_FUNC, + .parameters = {.minParamNum = 1, + .maxParamNum = 1, + .paramInfoPattern = 1, + .inputParaInfo[0][0] = {.isLastParam = true, + .startParam = 1, + .endParam = 1, + .validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, + .translateFunc = translateOutDouble, + .getEnvFunc = getStdFuncEnv, + .initFunc = stdFunctionSetup, + .processFunc = stdFunctionMerge, + .finalizeFunc = stddevsampFinalize, +#ifdef BUILD_NO_CALL + .invertFunc = stdInvertFunction, +#endif + .combineFunc = stdCombine, + .pPartialFunc = "_std_state_merge", + .pMergeFunc = "_stddev_samp_merge", + }, { .name = "leastsquares", .type = FUNCTION_TYPE_LEASTSQUARES, @@ -5171,6 +5239,35 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = groupConstValueFunction, .finalizeFunc = groupConstValueFinalize, }, + { + .name = "stddev_samp", + .type = FUNCTION_TYPE_STDDEV_SAMP, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, + .parameters = {.minParamNum = 1, + .maxParamNum = 1, + .paramInfoPattern = 1, + .inputParaInfo[0][0] = {.isLastParam = true, + .startParam = 1, + .endParam = 1, + .validDataType = FUNC_PARAM_SUPPORT_NUMERIC_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, + .translateFunc = translateOutDouble, + .getEnvFunc = getStdFuncEnv, + .initFunc = stdFunctionSetup, + .processFunc = stdFunction, + .sprocessFunc = stdScalarFunction, + .finalizeFunc = stddevsampFinalize, + #ifdef BUILD_NO_CALL + .invertFunc = stdInvertFunction, + #endif + .combineFunc = stdCombine, + .pPartialFunc = "_std_partial", + .pStateFunc = "_std_state", + .pMergeFunc = "_stddev_samp_merge" + }, { .name = "stddev_pop", .type = FUNCTION_TYPE_STDDEV, @@ -5200,6 +5297,93 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .pStateFunc = "_std_state", .pMergeFunc = "_stddev_merge" }, + { + .name = "std", + .type = FUNCTION_TYPE_STDDEV, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, + .parameters = {.minParamNum = 1, + .maxParamNum = 1, + .paramInfoPattern = 1, + .inputParaInfo[0][0] = {.isLastParam = true, + .startParam = 1, + .endParam = 1, + .validDataType = FUNC_PARAM_SUPPORT_NUMERIC_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, + .translateFunc = translateOutDouble, + .getEnvFunc = getStdFuncEnv, + .initFunc = stdFunctionSetup, + .processFunc = stdFunction, + .sprocessFunc = stdScalarFunction, + .finalizeFunc = stddevFinalize, + #ifdef BUILD_NO_CALL + .invertFunc = stdInvertFunction, + #endif + .combineFunc = stdCombine, + .pPartialFunc = "_std_partial", + .pStateFunc = "_std_state", + .pMergeFunc = "_stddev_merge" + }, + { + .name = "std", + .type = FUNCTION_TYPE_STDDEV, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, + .parameters = {.minParamNum = 1, + .maxParamNum = 1, + .paramInfoPattern = 1, + .inputParaInfo[0][0] = {.isLastParam = true, + .startParam = 1, + .endParam = 1, + .validDataType = FUNC_PARAM_SUPPORT_NUMERIC_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, + .translateFunc = translateOutDouble, + .getEnvFunc = getStdFuncEnv, + .initFunc = stdFunctionSetup, + .processFunc = stdFunction, + .sprocessFunc = stdScalarFunction, + .finalizeFunc = stddevFinalize, + #ifdef BUILD_NO_CALL + .invertFunc = stdInvertFunction, + #endif + .combineFunc = stdCombine, + .pPartialFunc = "_std_partial", + .pStateFunc = "_std_state", + .pMergeFunc = "_stddev_merge" + }, + { + .name = "var_samp", + .type = FUNCTION_TYPE_STDVAR_SAMP, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, + .parameters = {.minParamNum = 1, + .maxParamNum = 1, + .paramInfoPattern = 1, + .inputParaInfo[0][0] = {.isLastParam = true, + .startParam = 1, + .endParam = 1, + .validDataType = FUNC_PARAM_SUPPORT_NUMERIC_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, + .translateFunc = translateOutDouble, + .getEnvFunc = getStdFuncEnv, + .initFunc = stdFunctionSetup, + .processFunc = stdFunction, + .sprocessFunc = stdScalarFunction, + .finalizeFunc = stdvarsampFinalize, + #ifdef BUILD_NO_CALL + .invertFunc = stdInvertFunction, + #endif + .combineFunc = stdCombine, + .pPartialFunc = "_std_partial", + .pStateFunc = "_std_state", + .pMergeFunc = "_stdvar_samp_merge" + }, { .name = "var_pop", .type = FUNCTION_TYPE_STDVAR, @@ -5229,6 +5413,35 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .pStateFunc = "_std_state", .pMergeFunc = "_stdvar_merge" }, + { + .name = "variance", + .type = FUNCTION_TYPE_STDVAR, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, + .parameters = {.minParamNum = 1, + .maxParamNum = 1, + .paramInfoPattern = 1, + .inputParaInfo[0][0] = {.isLastParam = true, + .startParam = 1, + .endParam = 1, + .validDataType = FUNC_PARAM_SUPPORT_NUMERIC_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, + .translateFunc = translateOutDouble, + .getEnvFunc = getStdFuncEnv, + .initFunc = stdFunctionSetup, + .processFunc = stdFunction, + .sprocessFunc = stdScalarFunction, + .finalizeFunc = stdvarFinalize, + #ifdef BUILD_NO_CALL + .invertFunc = stdInvertFunction, + #endif + .combineFunc = stdCombine, + .pPartialFunc = "_std_partial", + .pStateFunc = "_std_state", + .pMergeFunc = "_stdvar_merge" + }, { .name = "crc32", .type = FUNCTION_TYPE_CRC32, @@ -5277,6 +5490,33 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .pPartialFunc = "_std_state_merge", .pMergeFunc = "_stdvar_merge", }, + { + .name = "_stdvar_samp_merge", + .type = FUNCTION_TYPE_STDVAR_SAMP_MERGE, + .classification = FUNC_MGT_AGG_FUNC, + .parameters = {.minParamNum = 1, + .maxParamNum = 1, + .paramInfoPattern = 1, + .inputParaInfo[0][0] = {.isLastParam = true, + .startParam = 1, + .endParam = 1, + .validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, + .translateFunc = translateOutDouble, + .getEnvFunc = getStdFuncEnv, + .initFunc = stdFunctionSetup, + .processFunc = stdFunctionMerge, + .finalizeFunc = stdvarsampFinalize, + #ifdef BUILD_NO_CALL + .invertFunc = stdInvertFunction, + #endif + .combineFunc = stdCombine, + .pPartialFunc = "_std_state_merge", + .pMergeFunc = "_stdvar_merge", + }, { .name = "pi", .type = FUNCTION_TYPE_PI, @@ -6190,4 +6430,3 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { // clang-format on const int32_t funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFuncDefinition)); - diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index e1d7bcbfd028..7fbca6b3559d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1582,6 +1582,56 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +int32_t stddevsampFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SInputColumnInfoData* pInput = &pCtx->input; + SStdRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t type = pStddevRes->type; + double avg; + + if (pStddevRes->count == 0) { + GET_RES_INFO(pCtx)->numOfRes = 0; + return functionFinalize(pCtx, pBlock); + } + + if (pStddevRes->count == 1) { + pStddevRes->result = 0.0; + } else { + pStddevRes->result = sqrt(pStddevRes->quadraticDSum / (pStddevRes->count - 1)); + } + + // check for overflow + if (isinf(pStddevRes->result) || isnan(pStddevRes->result)) { + GET_RES_INFO(pCtx)->numOfRes = 0; + } + + return functionFinalize(pCtx, pBlock); +} + +int32_t stdvarsampFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SInputColumnInfoData* pInput = &pCtx->input; + SStdRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t type = pStddevRes->type; + double avg; + + if (pStddevRes->count == 0) { + GET_RES_INFO(pCtx)->numOfRes = 0; + return functionFinalize(pCtx, pBlock); + } + + if (pStddevRes->count == 1) { + pStddevRes->result = 0.0; + } else { + pStddevRes->result = pStddevRes->quadraticDSum / (pStddevRes->count - 1); + } + + // check for overflow + if (isinf(pStddevRes->result) || isnan(pStddevRes->result)) { + GET_RES_INFO(pCtx)->numOfRes = 0; + } + + return functionFinalize(pCtx, pBlock); +} + int32_t stdvarFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SInputColumnInfoData* pInput = &pCtx->input; SStdRes* pStdvarRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); From 6708e804fd0a91f57b306cea5c119a334d551f9f Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 22 Aug 2025 11:09:59 +0800 Subject: [PATCH 2/6] fix enum --- include/libs/function/functionMgt.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 6c1982d29013..032e010faebb 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -217,8 +217,6 @@ typedef enum EFunctionType { FUNCTION_TYPE_STD_PARTIAL, FUNCTION_TYPE_STDDEV_MERGE, FUNCTION_TYPE_STDVAR_MERGE, - FUNCTION_TYPE_STDDEV_SAMP_MERGE, - FUNCTION_TYPE_STDVAR_SAMP_MERGE, FUNCTION_TYPE_IRATE_PARTIAL, FUNCTION_TYPE_IRATE_MERGE, FUNCTION_TYPE_AVG_STATE, @@ -233,6 +231,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_STD_STATE_MERGE, FUNCTION_TYPE_HYPERLOGLOG_STATE, FUNCTION_TYPE_HYPERLOGLOG_STATE_MERGE, + FUNCTION_TYPE_STDDEV_SAMP_MERGE, + FUNCTION_TYPE_STDVAR_SAMP_MERGE, // geometry functions FUNCTION_TYPE_GEOM_FROM_TEXT = 4250, From 9249fe51ad17a86f48d1b1388064d27810dae032 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 22 Aug 2025 17:28:26 +0800 Subject: [PATCH 3/6] move new funcs to the end of the array --- source/libs/function/src/builtins.c | 447 +++++++++++++--------------- 1 file changed, 209 insertions(+), 238 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 5878486628f2..48625cacc919 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2065,45 +2065,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .pStateFunc = "max", .pMergeFunc = "max", }, - { - .name = "group_concat", - .type = FUNCTION_TYPE_GROUP_CONCAT, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, - .parameters = {.minParamNum = 3, - .maxParamNum = 9, - .paramInfoPattern = 1, - .inputParaInfo[0][0] = {.isLastParam = false, - .startParam = 1, - .endParam = 1, - .validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE | FUNC_PARAM_SUPPORT_NCHAR_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, - .validNodeType = FUNC_PARAM_SUPPORT_VALUE_NODE, - .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, - .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, - .inputParaInfo[0][1] = {.isLastParam = true, - .startParam = 2, - .endParam = 9, - .validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE | FUNC_PARAM_SUPPORT_NCHAR_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, - .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, - .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, - .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, - .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE | FUNC_PARAM_SUPPORT_NCHAR_TYPE}}, - .translateFunc = translateConcatWs, - .sprocessFunc = concatWsFunction, - /* - .getEnvFunc = getGroupConcatFuncEnv, - .initFunc = functionSetup, - .processFunc = groupConcatFunction, - .sprocessFunc = groupConcatScalarFunction, - .finalizeFunc = groupConcatFinalize, -#ifdef BUILD_NO_CALL - .invertFunc = groupConcatInvertFunction, -#endif - .combineFunc = groupConcatCombine, - .pPartialFunc = "groupConcat", - .pStateFunc = "groupConcat", - .pMergeFunc = "groupConcat" - */ - }, { .name = "stddev", .type = FUNCTION_TYPE_STDDEV, @@ -2185,33 +2146,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .pPartialFunc = "_std_state_merge", .pMergeFunc = "_stddev_merge", }, - { - .name = "_stddev_samp_merge", - .type = FUNCTION_TYPE_STDDEV_SAMP_MERGE, - .classification = FUNC_MGT_AGG_FUNC, - .parameters = {.minParamNum = 1, - .maxParamNum = 1, - .paramInfoPattern = 1, - .inputParaInfo[0][0] = {.isLastParam = true, - .startParam = 1, - .endParam = 1, - .validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE, - .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, - .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, - .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, - .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, - .translateFunc = translateOutDouble, - .getEnvFunc = getStdFuncEnv, - .initFunc = stdFunctionSetup, - .processFunc = stdFunctionMerge, - .finalizeFunc = stddevsampFinalize, -#ifdef BUILD_NO_CALL - .invertFunc = stdInvertFunction, -#endif - .combineFunc = stdCombine, - .pPartialFunc = "_std_state_merge", - .pMergeFunc = "_stddev_samp_merge", - }, { .name = "leastsquares", .type = FUNCTION_TYPE_LEASTSQUARES, @@ -5239,35 +5173,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = groupConstValueFunction, .finalizeFunc = groupConstValueFinalize, }, - { - .name = "stddev_samp", - .type = FUNCTION_TYPE_STDDEV_SAMP, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, - .parameters = {.minParamNum = 1, - .maxParamNum = 1, - .paramInfoPattern = 1, - .inputParaInfo[0][0] = {.isLastParam = true, - .startParam = 1, - .endParam = 1, - .validDataType = FUNC_PARAM_SUPPORT_NUMERIC_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, - .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, - .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, - .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, - .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, - .translateFunc = translateOutDouble, - .getEnvFunc = getStdFuncEnv, - .initFunc = stdFunctionSetup, - .processFunc = stdFunction, - .sprocessFunc = stdScalarFunction, - .finalizeFunc = stddevsampFinalize, - #ifdef BUILD_NO_CALL - .invertFunc = stdInvertFunction, - #endif - .combineFunc = stdCombine, - .pPartialFunc = "_std_partial", - .pStateFunc = "_std_state", - .pMergeFunc = "_stddev_samp_merge" - }, { .name = "stddev_pop", .type = FUNCTION_TYPE_STDDEV, @@ -5297,93 +5202,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .pStateFunc = "_std_state", .pMergeFunc = "_stddev_merge" }, - { - .name = "std", - .type = FUNCTION_TYPE_STDDEV, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, - .parameters = {.minParamNum = 1, - .maxParamNum = 1, - .paramInfoPattern = 1, - .inputParaInfo[0][0] = {.isLastParam = true, - .startParam = 1, - .endParam = 1, - .validDataType = FUNC_PARAM_SUPPORT_NUMERIC_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, - .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, - .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, - .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, - .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, - .translateFunc = translateOutDouble, - .getEnvFunc = getStdFuncEnv, - .initFunc = stdFunctionSetup, - .processFunc = stdFunction, - .sprocessFunc = stdScalarFunction, - .finalizeFunc = stddevFinalize, - #ifdef BUILD_NO_CALL - .invertFunc = stdInvertFunction, - #endif - .combineFunc = stdCombine, - .pPartialFunc = "_std_partial", - .pStateFunc = "_std_state", - .pMergeFunc = "_stddev_merge" - }, - { - .name = "std", - .type = FUNCTION_TYPE_STDDEV, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, - .parameters = {.minParamNum = 1, - .maxParamNum = 1, - .paramInfoPattern = 1, - .inputParaInfo[0][0] = {.isLastParam = true, - .startParam = 1, - .endParam = 1, - .validDataType = FUNC_PARAM_SUPPORT_NUMERIC_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, - .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, - .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, - .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, - .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, - .translateFunc = translateOutDouble, - .getEnvFunc = getStdFuncEnv, - .initFunc = stdFunctionSetup, - .processFunc = stdFunction, - .sprocessFunc = stdScalarFunction, - .finalizeFunc = stddevFinalize, - #ifdef BUILD_NO_CALL - .invertFunc = stdInvertFunction, - #endif - .combineFunc = stdCombine, - .pPartialFunc = "_std_partial", - .pStateFunc = "_std_state", - .pMergeFunc = "_stddev_merge" - }, - { - .name = "var_samp", - .type = FUNCTION_TYPE_STDVAR_SAMP, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, - .parameters = {.minParamNum = 1, - .maxParamNum = 1, - .paramInfoPattern = 1, - .inputParaInfo[0][0] = {.isLastParam = true, - .startParam = 1, - .endParam = 1, - .validDataType = FUNC_PARAM_SUPPORT_NUMERIC_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, - .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, - .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, - .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, - .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, - .translateFunc = translateOutDouble, - .getEnvFunc = getStdFuncEnv, - .initFunc = stdFunctionSetup, - .processFunc = stdFunction, - .sprocessFunc = stdScalarFunction, - .finalizeFunc = stdvarsampFinalize, - #ifdef BUILD_NO_CALL - .invertFunc = stdInvertFunction, - #endif - .combineFunc = stdCombine, - .pPartialFunc = "_std_partial", - .pStateFunc = "_std_state", - .pMergeFunc = "_stdvar_samp_merge" - }, { .name = "var_pop", .type = FUNCTION_TYPE_STDVAR, @@ -5413,35 +5231,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .pStateFunc = "_std_state", .pMergeFunc = "_stdvar_merge" }, - { - .name = "variance", - .type = FUNCTION_TYPE_STDVAR, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, - .parameters = {.minParamNum = 1, - .maxParamNum = 1, - .paramInfoPattern = 1, - .inputParaInfo[0][0] = {.isLastParam = true, - .startParam = 1, - .endParam = 1, - .validDataType = FUNC_PARAM_SUPPORT_NUMERIC_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, - .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, - .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, - .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, - .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, - .translateFunc = translateOutDouble, - .getEnvFunc = getStdFuncEnv, - .initFunc = stdFunctionSetup, - .processFunc = stdFunction, - .sprocessFunc = stdScalarFunction, - .finalizeFunc = stdvarFinalize, - #ifdef BUILD_NO_CALL - .invertFunc = stdInvertFunction, - #endif - .combineFunc = stdCombine, - .pPartialFunc = "_std_partial", - .pStateFunc = "_std_state", - .pMergeFunc = "_stdvar_merge" - }, { .name = "crc32", .type = FUNCTION_TYPE_CRC32, @@ -5490,33 +5279,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .pPartialFunc = "_std_state_merge", .pMergeFunc = "_stdvar_merge", }, - { - .name = "_stdvar_samp_merge", - .type = FUNCTION_TYPE_STDVAR_SAMP_MERGE, - .classification = FUNC_MGT_AGG_FUNC, - .parameters = {.minParamNum = 1, - .maxParamNum = 1, - .paramInfoPattern = 1, - .inputParaInfo[0][0] = {.isLastParam = true, - .startParam = 1, - .endParam = 1, - .validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE, - .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, - .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, - .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, - .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, - .translateFunc = translateOutDouble, - .getEnvFunc = getStdFuncEnv, - .initFunc = stdFunctionSetup, - .processFunc = stdFunctionMerge, - .finalizeFunc = stdvarsampFinalize, - #ifdef BUILD_NO_CALL - .invertFunc = stdInvertFunction, - #endif - .combineFunc = stdCombine, - .pPartialFunc = "_std_state_merge", - .pMergeFunc = "_stdvar_merge", - }, { .name = "pi", .type = FUNCTION_TYPE_PI, @@ -6426,6 +6188,215 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .sprocessFunc = dateFunction, .finalizeFunc = NULL }, + { + .name = "std", + .type = FUNCTION_TYPE_STDDEV, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, + .parameters = {.minParamNum = 1, + .maxParamNum = 1, + .paramInfoPattern = 1, + .inputParaInfo[0][0] = {.isLastParam = true, + .startParam = 1, + .endParam = 1, + .validDataType = FUNC_PARAM_SUPPORT_NUMERIC_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, + .translateFunc = translateOutDouble, + .getEnvFunc = getStdFuncEnv, + .initFunc = stdFunctionSetup, + .processFunc = stdFunction, + .sprocessFunc = stdScalarFunction, + .finalizeFunc = stddevFinalize, + #ifdef BUILD_NO_CALL + .invertFunc = stdInvertFunction, + #endif + .combineFunc = stdCombine, + .pPartialFunc = "_std_partial", + .pStateFunc = "_std_state", + .pMergeFunc = "_stddev_merge" + }, + { + .name = "variance", + .type = FUNCTION_TYPE_STDVAR, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, + .parameters = {.minParamNum = 1, + .maxParamNum = 1, + .paramInfoPattern = 1, + .inputParaInfo[0][0] = {.isLastParam = true, + .startParam = 1, + .endParam = 1, + .validDataType = FUNC_PARAM_SUPPORT_NUMERIC_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, + .translateFunc = translateOutDouble, + .getEnvFunc = getStdFuncEnv, + .initFunc = stdFunctionSetup, + .processFunc = stdFunction, + .sprocessFunc = stdScalarFunction, + .finalizeFunc = stdvarFinalize, + #ifdef BUILD_NO_CALL + .invertFunc = stdInvertFunction, + #endif + .combineFunc = stdCombine, + .pPartialFunc = "_std_partial", + .pStateFunc = "_std_state", + .pMergeFunc = "_stdvar_merge" + }, + { + .name = "var_samp", + .type = FUNCTION_TYPE_STDVAR_SAMP, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, + .parameters = {.minParamNum = 1, + .maxParamNum = 1, + .paramInfoPattern = 1, + .inputParaInfo[0][0] = {.isLastParam = true, + .startParam = 1, + .endParam = 1, + .validDataType = FUNC_PARAM_SUPPORT_NUMERIC_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, + .translateFunc = translateOutDouble, + .getEnvFunc = getStdFuncEnv, + .initFunc = stdFunctionSetup, + .processFunc = stdFunction, + .sprocessFunc = stdScalarFunction, + .finalizeFunc = stdvarsampFinalize, + #ifdef BUILD_NO_CALL + .invertFunc = stdInvertFunction, + #endif + .combineFunc = stdCombine, + .pPartialFunc = "_std_partial", + .pStateFunc = "_std_state", + .pMergeFunc = "_stdvar_samp_merge" + }, + { + .name = "stddev_samp", + .type = FUNCTION_TYPE_STDDEV_SAMP, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, + .parameters = {.minParamNum = 1, + .maxParamNum = 1, + .paramInfoPattern = 1, + .inputParaInfo[0][0] = {.isLastParam = true, + .startParam = 1, + .endParam = 1, + .validDataType = FUNC_PARAM_SUPPORT_NUMERIC_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, + .translateFunc = translateOutDouble, + .getEnvFunc = getStdFuncEnv, + .initFunc = stdFunctionSetup, + .processFunc = stdFunction, + .sprocessFunc = stdScalarFunction, + .finalizeFunc = stddevsampFinalize, + #ifdef BUILD_NO_CALL + .invertFunc = stdInvertFunction, + #endif + .combineFunc = stdCombine, + .pPartialFunc = "_std_partial", + .pStateFunc = "_std_state", + .pMergeFunc = "_stddev_samp_merge" + }, + { + .name = "_stddev_samp_merge", + .type = FUNCTION_TYPE_STDDEV_SAMP_MERGE, + .classification = FUNC_MGT_AGG_FUNC, + .parameters = {.minParamNum = 1, + .maxParamNum = 1, + .paramInfoPattern = 1, + .inputParaInfo[0][0] = {.isLastParam = true, + .startParam = 1, + .endParam = 1, + .validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, + .translateFunc = translateOutDouble, + .getEnvFunc = getStdFuncEnv, + .initFunc = stdFunctionSetup, + .processFunc = stdFunctionMerge, + .finalizeFunc = stddevsampFinalize, +#ifdef BUILD_NO_CALL + .invertFunc = stdInvertFunction, +#endif + .combineFunc = stdCombine, + .pPartialFunc = "_std_state_merge", + .pMergeFunc = "_stddev_samp_merge", + }, + { + .name = "_stdvar_samp_merge", + .type = FUNCTION_TYPE_STDVAR_SAMP_MERGE, + .classification = FUNC_MGT_AGG_FUNC, + .parameters = {.minParamNum = 1, + .maxParamNum = 1, + .paramInfoPattern = 1, + .inputParaInfo[0][0] = {.isLastParam = true, + .startParam = 1, + .endParam = 1, + .validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_DOUBLE_TYPE}}, + .translateFunc = translateOutDouble, + .getEnvFunc = getStdFuncEnv, + .initFunc = stdFunctionSetup, + .processFunc = stdFunctionMerge, + .finalizeFunc = stdvarsampFinalize, + #ifdef BUILD_NO_CALL + .invertFunc = stdInvertFunction, + #endif + .combineFunc = stdCombine, + .pPartialFunc = "_std_state_merge", + .pMergeFunc = "_stdvar_samp_merge", + }, + { + .name = "group_concat", + .type = FUNCTION_TYPE_GROUP_CONCAT, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, + .parameters = {.minParamNum = 3, + .maxParamNum = 9, + .paramInfoPattern = 1, + .inputParaInfo[0][0] = {.isLastParam = false, + .startParam = 1, + .endParam = 1, + .validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE | FUNC_PARAM_SUPPORT_NCHAR_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_VALUE_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .inputParaInfo[0][1] = {.isLastParam = true, + .startParam = 2, + .endParam = 9, + .validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE | FUNC_PARAM_SUPPORT_NCHAR_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, + .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, + .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, + .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE | FUNC_PARAM_SUPPORT_NCHAR_TYPE}}, + .translateFunc = translateConcatWs, + .sprocessFunc = concatWsFunction, + /* + .getEnvFunc = getGroupConcatFuncEnv, + .initFunc = functionSetup, + .processFunc = groupConcatFunction, + .sprocessFunc = groupConcatScalarFunction, + .finalizeFunc = groupConcatFinalize, +#ifdef BUILD_NO_CALL + .invertFunc = groupConcatInvertFunction, +#endif + .combineFunc = groupConcatCombine, + .pPartialFunc = "groupConcat", + .pStateFunc = "groupConcat", + .pMergeFunc = "groupConcat" + */ + }, }; // clang-format on From 684819f38bd492fde2077dda0ca0b5bcb30f753a Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 29 Aug 2025 14:44:05 +0800 Subject: [PATCH 4/6] first implementation of group_concat with parameter separator --- source/libs/executor/src/aggregateoperator.c | 72 +++--- source/libs/function/inc/builtinsimpl.h | 5 + source/libs/function/inc/functionResInfoInt.h | 85 +++--- source/libs/function/src/builtins.c | 70 +++-- source/libs/function/src/builtinsimpl.c | 242 ++++++++++++++---- 5 files changed, 341 insertions(+), 133 deletions(-) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 93c4b0a308df..5e928b34c2ff 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -53,7 +53,7 @@ typedef struct SAggOperatorInfo { bool cleanGroupResInfo; } SAggOperatorInfo; -static void destroyAggOperatorInfo(void* param); +static void destroyAggOperatorInfo(void* param); static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock); @@ -100,12 +100,11 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); - code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num); TSDB_CHECK_CODE(code, lino, _error); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, - pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); + pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); TSDB_CHECK_CODE(code, lino, _error); if (pAggNode->pExprs != NULL) { @@ -192,7 +191,7 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SAggOperatorInfo* pAggInfo = pOperator->info; - if(!pAggInfo) { + if (!pAggInfo) { qError("function:%s, pAggInfo is NULL", __func__); return false; } @@ -239,7 +238,8 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) { // there is an scalar expression that needs to be calculated before apply the group aggregation. if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) { SExprSupp* pSup1 = &pAggInfo->scalarExprSup; - code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo)); + code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL, + GET_STM_RTINFO(pOperator->pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { destroyDataBlockForEmptyInput(blockAllocated, &pBlock); T_LONG_JMP(pTaskInfo->env, code); @@ -350,7 +350,12 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { continue; } - if ((&pCtx[k])->input.pData[0] == NULL) { + int32_t firstData = 0; + if (pCtx[k].numOfParams > 1) { + firstData = 1; + } + // if ((&pCtx[k])->input.pData[0] == NULL) { + if ((&pCtx[k])->input.pData[firstData] == NULL) { code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; qError("%s aggregate function error happens, input data is NULL.", GET_TASKID(pOperator->pTaskInfo)); } else { @@ -371,8 +376,8 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { } static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SSDataBlock* pBlock = NULL; if (!tsCountAlwaysReturnValue) { return TSDB_CODE_SUCCESS; @@ -583,7 +588,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n } uint32_t defaultPgsz = 0; - int64_t defaultBufsz = 0; + int64_t defaultBufsz = 0; code = getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz); if (code) { qError("failed to get buff page size, rowSize:%d", pAggSup->resultRowSize); @@ -620,7 +625,7 @@ void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp if (!needCleanup) { return; } - + for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i); SRowBuffPos* pPos = pWinInfo->pStatePos; @@ -642,7 +647,7 @@ void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp } void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf, - SGroupResInfo* pGroupResInfo) { + SGroupResInfo* pGroupResInfo) { int32_t numOfExprs = pSup->numOfExprs; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; SqlFunctionCtx* pCtx = pSup->pCtx; @@ -657,16 +662,15 @@ void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, } for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { - SResultRow* pRow = NULL; - SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); - SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); + SResultRow* pRow = NULL; + SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); + SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); if (page == NULL) { qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); continue; } pRow = (SResultRow*)((char*)page + pPos->pos.offset); - for (int32_t j = 0; j < numOfExprs; ++j) { pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); if (pCtx[j].fpSet.cleanup) { @@ -678,7 +682,7 @@ void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, } void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf, - SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) { + SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) { int32_t numOfExprs = pSup->numOfExprs; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; SqlFunctionCtx* pCtx = pSup->pCtx; @@ -715,8 +719,8 @@ void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDisk } } -void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo, - SAggSupporter *pAggSup, bool cleanGroupResInfo) { +void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo, SAggSupporter* pAggSup, + bool cleanGroupResInfo) { if (cleanGroupResInfo) { cleanupResultInfoInGroupResInfo(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo); } else { @@ -775,21 +779,22 @@ int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* if (fmIsPlaceHolderFunc(pCtx[k].functionId)) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); - char* p = GET_ROWCELL_INTERBUF(pEntryInfo); + char* p = GET_ROWCELL_INTERBUF(pEntryInfo); - TAOS_CHECK_EXIT(fmSetStreamPseudoFuncParamVal(pCtx[k].functionId, pCtx[k].pExpr->base.pParamList, &taskInfo->pStreamRuntimeInfo->funcInfo)); + TAOS_CHECK_EXIT(fmSetStreamPseudoFuncParamVal(pCtx[k].functionId, pCtx[k].pExpr->base.pParamList, + &taskInfo->pStreamRuntimeInfo->funcInfo)); - SValueNode *valueNode = (SValueNode *)nodesListGetNode(pCtx[k].pExpr->base.pParamList, 0); + SValueNode* valueNode = (SValueNode*)nodesListGetNode(pCtx[k].pExpr->base.pParamList, 0); pEntryInfo->isNullRes = 0; if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) { pEntryInfo->isNullRes = 1; - } else if (IS_VAR_DATA_TYPE(pCtx[k].pExpr->base.resSchema.type)){ + } else if (IS_VAR_DATA_TYPE(pCtx[k].pExpr->base.resSchema.type)) { void* v = nodesGetValueFromNode(valueNode); memcpy(p, v, varDataTLen(v)); } else { memcpy(p, nodesGetValueFromNode(valueNode), pCtx[k].pExpr->base.resSchema.bytes); } - + pEntryInfo->numOfRes = 1; } else if (pCtx[k].isPseudoFunc) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); @@ -852,22 +857,21 @@ void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { static int32_t resetAggregateOperatorState(SOperatorInfo* pOper) { SAggOperatorInfo* pAgg = pOper->info; - SAggPhysiNode* pAggNode = (SAggPhysiNode*)pOper->pPhyNode; - + SAggPhysiNode* pAggNode = (SAggPhysiNode*)pOper->pPhyNode; + pOper->status = OP_NOT_OPENED; - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - SExecTaskInfo* pTaskInfo = pOper->pTaskInfo; - cleanupResultInfo(pTaskInfo, &pOper->exprSupp, &pAgg->groupResInfo, &pAgg->aggSup, - pAgg->cleanGroupResInfo); + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + SExecTaskInfo* pTaskInfo = pOper->pTaskInfo; + cleanupResultInfo(pTaskInfo, &pOper->exprSupp, &pAgg->groupResInfo, &pAgg->aggSup, pAgg->cleanGroupResInfo); cleanupGroupResInfo(&pAgg->groupResInfo); resetBasicOperatorState(&pAgg->binfo); - - int32_t code = resetAggSup(&pOper->exprSupp, &pAgg->aggSup, pTaskInfo, pAggNode->pAggFuncs, pAggNode->pGroupKeys, - keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); + + int32_t code = + resetAggSup(&pOper->exprSupp, &pAgg->aggSup, pTaskInfo, pAggNode->pAggFuncs, pAggNode->pGroupKeys, keyBufSize, + pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); if (code == 0) { - code = resetExprSupp(&pAgg->scalarExprSup, pTaskInfo, pAggNode->pExprs, NULL, - &pTaskInfo->storageAPI.functionStore); + code = resetExprSupp(&pAgg->scalarExprSup, pTaskInfo, pAggNode->pExprs, NULL, &pTaskInfo->storageAPI.functionStore); } pAgg->groupId = UINT64_MAX; diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index de11f48950e1..f33737805c70 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -97,6 +97,11 @@ int32_t stdInvertFunction(SqlFunctionCtx* pCtx); int32_t stdCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getStdInfoSize(); +bool gconcatGetFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); +int32_t gconcatFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t gconcatFunction(SqlFunctionCtx* pCtx); +int32_t gconcatFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); + bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t leastSQRFunction(SqlFunctionCtx* pCtx); diff --git a/source/libs/function/inc/functionResInfoInt.h b/source/libs/function/inc/functionResInfoInt.h index 763bd4331d44..3a1950b0b50f 100644 --- a/source/libs/function/inc/functionResInfoInt.h +++ b/source/libs/function/inc/functionResInfoInt.h @@ -1,17 +1,17 @@ /* -* Copyright (c) 2019 TAOS Data, Inc. -* -* This program is free software: you can use, redistribute, and/or modify -* it under the terms of the GNU Affero General Public License, version 3 -* or later ("AGPL"), as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, but WITHOUT -* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -* FITNESS FOR A PARTICULAR PURPOSE. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see . -*/ + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ #ifndef TDENGINE_FUNCTIONRESINFOINT_H #define TDENGINE_FUNCTIONRESINFOINT_H @@ -20,12 +20,12 @@ extern "C" { #endif +#include "decimal.h" +#include "functionResInfo.h" #include "os.h" -#include "thistogram.h" #include "tdigest.h" -#include "functionResInfo.h" +#include "thistogram.h" #include "tpercentile.h" -#include "decimal.h" #define USE_ARRAYLIST @@ -53,10 +53,10 @@ typedef struct SDecimalSumRes { int64_t prevTs; bool isPrevTsSet; bool overflow; - uint32_t flag; // currently not used + uint32_t flag; // currently not used } SDecimalSumRes; -#define SUM_RES_GET_RES(pSumRes) ((SSumRes*)pSumRes) +#define SUM_RES_GET_RES(pSumRes) ((SSumRes*)pSumRes) #define SUM_RES_GET_DECIMAL_RES(pSumRes) ((SDecimalSumRes*)pSumRes) #define SUM_RES_GET_SIZE(type) IS_DECIMAL_TYPE(type) ? sizeof(SDecimalSumRes) : sizeof(SSumRes) @@ -77,9 +77,9 @@ typedef struct SDecimalSumRes { (checkInputType && IS_DECIMAL_TYPE(inputType) ? SUM_RES_GET_DECIMAL_RES(pSumRes)->overflow \ : SUM_RES_GET_RES(pSumRes)->overflow) -#define SUM_RES_GET_ISUM(pSumRes) (((SSumRes*)(pSumRes))->isum) -#define SUM_RES_GET_USUM(pSumRes) (((SSumRes*)(pSumRes))->usum) -#define SUM_RES_GET_DSUM(pSumRes) (((SSumRes*)(pSumRes))->dsum) +#define SUM_RES_GET_ISUM(pSumRes) (((SSumRes*)(pSumRes))->isum) +#define SUM_RES_GET_USUM(pSumRes) (((SSumRes*)(pSumRes))->usum) +#define SUM_RES_GET_DSUM(pSumRes) (((SSumRes*)(pSumRes))->dsum) #define SUM_RES_INC_ISUM(pSumRes, val) ((SSumRes*)(pSumRes))->isum += val #define SUM_RES_INC_USUM(pSumRes, val) ((SSumRes*)(pSumRes))->usum += val #define SUM_RES_INC_DSUM(pSumRes, val) ((SSumRes*)(pSumRes))->dsum += val @@ -90,10 +90,10 @@ typedef struct SDecimalSumRes { const SDecimalOps* pOps = getDecimalOps(TSDB_DATA_TYPE_DECIMAL); \ int32_t wordNum = 0; \ if (type == TSDB_DATA_TYPE_DECIMAL64) { \ - wordNum = DECIMAL_WORD_NUM(Decimal64); \ + wordNum = DECIMAL_WORD_NUM(Decimal64); \ overflow = decimal128AddCheckOverflow(&SUM_RES_GET_DECIMAL_SUM(pSumRes), pVal, wordNum); \ } else { \ - wordNum = DECIMAL_WORD_NUM(Decimal); \ + wordNum = DECIMAL_WORD_NUM(Decimal); \ overflow = decimal128AddCheckOverflow(&SUM_RES_GET_DECIMAL_SUM(pSumRes), pVal, wordNum); \ } \ pOps->add(&SUM_RES_GET_DECIMAL_SUM(pSumRes), pVal, wordNum); \ @@ -101,13 +101,13 @@ typedef struct SDecimalSumRes { } while (0) typedef struct SMinmaxResInfo { - bool assign; // assign the first value or not + bool assign; // assign the first value or not union { struct { int64_t v; char* str; }; - int64_t dec[2]; // for decimal types + int64_t dec[2]; // for decimal types }; STuplePos tuplePos; @@ -143,6 +143,13 @@ typedef struct SStdRes { int16_t type; } SStdRes; +typedef struct SGconcatRes { + char* result; + char* separator; + uint32_t type; + bool nchar; +} SGconcatRes; + typedef struct SHistBin { double val; int64_t num; @@ -207,18 +214,18 @@ typedef struct SDecimalAvgRes { Decimal128 avg; SDecimalSumRes sum; int64_t count; - int16_t type; // store the original input type and scale, used in merge function + int16_t type; // store the original input type and scale, used in merge function uint8_t scale; } SDecimalAvgRes; -#define AVG_RES_GET_RES(pAvgRes) ((SAvgRes*)pAvgRes) +#define AVG_RES_GET_RES(pAvgRes) ((SAvgRes*)pAvgRes) #define AVG_RES_GET_DECIMAL_RES(pAvgRes) ((SDecimalAvgRes*)pAvgRes) #define AVG_RES_SET_TYPE(pAvgRes, inputType, _type) \ do { \ if (IS_DECIMAL_TYPE(inputType)) \ AVG_RES_GET_DECIMAL_RES(pAvgRes)->type = _type; \ else \ - AVG_RES_GET_RES(pAvgRes)->type = _type; \ + AVG_RES_GET_RES(pAvgRes)->type = _type; \ } while (0) #define AVG_RES_SET_INPUT_SCALE(pAvgRes, _scale) \ @@ -233,8 +240,8 @@ typedef struct SDecimalAvgRes { #define AVG_RES_GET_SIZE(inputType) (IS_DECIMAL_TYPE(inputType) ? sizeof(SDecimalAvgRes) : sizeof(SAvgRes)) #define AVG_RES_GET_AVG(pAvgRes) (AVG_RES_GET_RES(pAvgRes)->result) -#define AVG_RES_GET_SUM(pAvgRes) (AVG_RES_GET_RES(pAvgRes)->sum) -#define AVG_RES_GET_COUNT(pAvgRes, checkInputType, inputType) \ +#define AVG_RES_GET_SUM(pAvgRes) (AVG_RES_GET_RES(pAvgRes)->sum) +#define AVG_RES_GET_COUNT(pAvgRes, checkInputType, inputType) \ (checkInputType && IS_DECIMAL_TYPE(inputType) ? AVG_RES_GET_DECIMAL_RES(pAvgRes)->count \ : AVG_RES_GET_RES(pAvgRes)->count) #define AVG_RES_INC_COUNT(pAvgRes, inputType, val) \ @@ -299,7 +306,7 @@ typedef struct MinMaxEntry { typedef struct { int32_t size; int32_t pageId; - SFilePage *data; + SFilePage* data; } SSlotInfo; typedef struct tMemBucketSlot { @@ -308,7 +315,7 @@ typedef struct tMemBucketSlot { } tMemBucketSlot; struct tMemBucket; -typedef int32_t (*__perc_hash_func_t)(struct tMemBucket *pBucket, const void *value, int32_t *index); +typedef int32_t (*__perc_hash_func_t)(struct tMemBucket* pBucket, const void* value, int32_t* index); typedef struct tMemBucket { int16_t numOfSlots; @@ -322,10 +329,10 @@ typedef struct tMemBucket { MinMaxEntry range; // value range int32_t times; // count that has been checked for deciding the correct data value buckets. __compar_fn_t comparFn; - tMemBucketSlot *pSlots; - SDiskbasedBuf *pBuffer; + tMemBucketSlot* pSlots; + SDiskbasedBuf* pBuffer; __perc_hash_func_t hashFunc; - SHashObj *groupPagesMap; // disk page map for different groups; + SHashObj* groupPagesMap; // disk page map for different groups; } tMemBucket; typedef struct SPercentileInfo { @@ -473,11 +480,11 @@ typedef struct SRateInfo { TSKEY lastKey; int8_t hasResult; // flag to denote has value - char* firstPk; - char* lastPk; - int8_t pkType; + char* firstPk; + char* lastPk; + int8_t pkType; int32_t pkBytes; - char pkData[]; + char pkData[]; } SRateInfo; #ifdef __cplusplus diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 48625cacc919..ba4156490ace 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1244,6 +1244,22 @@ int32_t apercentileCreateMergeParam(SNodeList* pRawParameters, SNode* pPartialRe return code; } +static int32_t gconcatCreateMergeParam(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) { + int32_t code = 0; + SNode* pNew = NULL; + + code = nodesCloneNode(nodesListGetNode(pRawParameters, 0), &pNew); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeAppend(pParameters, pNew); + + if (TSDB_CODE_SUCCESS == code) { + code = nodesListStrictAppend(*pParameters, pPartialRes); + } + } + + return code; +} + static int32_t translateElapsedPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return 0; } static int32_t translateElapsedMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return 0; } @@ -1447,6 +1463,26 @@ static int32_t translateConcatWs(SFunctionNode* pFunc, char* pErrBuf, int32_t le return translateConcatImpl(pFunc, pErrBuf, len, 3, 9, true); } +static int32_t translateGroupConcat(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len)); + + int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); + int32_t resultBytes = TSDB_MAX_FIELD_LEN - VARSTR_HEADER_SIZE; + uint8_t resultType = TSDB_DATA_TYPE_BINARY; + + /* if params have NCHAR type, promote the final result to NCHAR */ + for (int32_t i = 0; i < numOfParams; ++i) { + SNode* pPara = nodesListGetNode(pFunc->pParameterList, i); + uint8_t paraType = getSDataTypeFromNode(pPara)->type; + if (TSDB_DATA_TYPE_NCHAR == paraType) { + resultType = paraType; + } + } + + pFunc->node.resType = (SDataType){.bytes = resultBytes, .type = resultType}; + return TSDB_CODE_SUCCESS; +} + static int32_t translateChar(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len)); int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); @@ -6361,8 +6397,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "group_concat", .type = FUNCTION_TYPE_GROUP_CONCAT, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, - .parameters = {.minParamNum = 3, + .classification = FUNC_MGT_AGG_FUNC, // | FUNC_MGT_TSMA_FUNC, + .parameters = {.minParamNum = 2, .maxParamNum = 9, .paramInfoPattern = 1, .inputParaInfo[0][0] = {.isLastParam = false, @@ -6380,22 +6416,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE | FUNC_PARAM_SUPPORT_NCHAR_TYPE}}, - .translateFunc = translateConcatWs, - .sprocessFunc = concatWsFunction, - /* - .getEnvFunc = getGroupConcatFuncEnv, - .initFunc = functionSetup, - .processFunc = groupConcatFunction, - .sprocessFunc = groupConcatScalarFunction, - .finalizeFunc = groupConcatFinalize, + .translateFunc = translateGroupConcat, + .sprocessFunc = NULL, + .getEnvFunc = gconcatGetFuncEnv, + .initFunc = gconcatFunctionSetup, + .processFunc = gconcatFunction, + .finalizeFunc = gconcatFinalize, #ifdef BUILD_NO_CALL - .invertFunc = groupConcatInvertFunction, + .invertFunc = gconcatInvertFunction, #endif - .combineFunc = groupConcatCombine, - .pPartialFunc = "groupConcat", - .pStateFunc = "groupConcat", - .pMergeFunc = "groupConcat" + /* + .combineFunc = gconcatCombine, // stream + .pStateFunc = "group_concat", // tsma + .pMiddleFunc = "group_concat", + + .pPartialFunc = "_gconcat_partial", + .pMergeFunc = "_gconcat_merge", */ + .pPartialFunc = "group_concat", + .pMergeFunc = "group_concat", + .createMergeParaFuc = gconcatCreateMergeParam }, }; // clang-format on diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 7fbca6b3559d..dcbdd7933854 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1582,35 +1582,74 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } -int32_t stddevsampFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { +int32_t stdvarFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SInputColumnInfoData* pInput = &pCtx->input; - SStdRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t type = pStddevRes->type; + SStdRes* pStdvarRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t type = pStdvarRes->type; double avg; - if (pStddevRes->count == 0) { + if (pStdvarRes->count == 0) { GET_RES_INFO(pCtx)->numOfRes = 0; return functionFinalize(pCtx, pBlock); } - if (pStddevRes->count == 1) { - pStddevRes->result = 0.0; + if (pStdvarRes->count == 1) { + pStdvarRes->result = 0.0; } else { - pStddevRes->result = sqrt(pStddevRes->quadraticDSum / (pStddevRes->count - 1)); + pStdvarRes->result = pStdvarRes->quadraticDSum / pStdvarRes->count; } // check for overflow - if (isinf(pStddevRes->result) || isnan(pStddevRes->result)) { + if (isinf(pStdvarRes->result) || isnan(pStdvarRes->result)) { GET_RES_INFO(pCtx)->numOfRes = 0; } return functionFinalize(pCtx, pBlock); } -int32_t stdvarsampFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { +int32_t stdPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SStdRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getStdInfoSize(); + char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + if (NULL == res) { + return terrno; + } + (void)memcpy(varDataVal(res), pInfo, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + if (NULL == pCol) { + taosMemoryFree(res); + return TSDB_CODE_OUT_OF_RANGE; + } + + int32_t code = colDataSetVal(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return code; +} + +int32_t stdCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { + SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); + SStdRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); + + SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); + SStdRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); + int16_t type = pDBuf->type == TSDB_DATA_TYPE_NULL ? pSBuf->type : pDBuf->type; + + stdTransferInfo(pSBuf, pDBuf); + + pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); + pDResInfo->isNullRes &= pSResInfo->isNullRes; + return TSDB_CODE_SUCCESS; +} + +int32_t stddevsampFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SInputColumnInfoData* pInput = &pCtx->input; SStdRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t type = pStddevRes->type; double avg; if (pStddevRes->count == 0) { @@ -1621,7 +1660,7 @@ int32_t stdvarsampFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { if (pStddevRes->count == 1) { pStddevRes->result = 0.0; } else { - pStddevRes->result = pStddevRes->quadraticDSum / (pStddevRes->count - 1); + pStddevRes->result = sqrt(pStddevRes->quadraticDSum / (pStddevRes->count - 1)); } // check for overflow @@ -1632,69 +1671,182 @@ int32_t stdvarsampFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } -int32_t stdvarFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { +int32_t stdvarsampFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SInputColumnInfoData* pInput = &pCtx->input; - SStdRes* pStdvarRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t type = pStdvarRes->type; + SStdRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); double avg; - if (pStdvarRes->count == 0) { + if (pStddevRes->count == 0) { GET_RES_INFO(pCtx)->numOfRes = 0; return functionFinalize(pCtx, pBlock); } - if (pStdvarRes->count == 1) { - pStdvarRes->result = 0.0; + if (pStddevRes->count == 1) { + pStddevRes->result = 0.0; } else { - pStdvarRes->result = pStdvarRes->quadraticDSum / pStdvarRes->count; + pStddevRes->result = pStddevRes->quadraticDSum / (pStddevRes->count - 1); } // check for overflow - if (isinf(pStdvarRes->result) || isnan(pStdvarRes->result)) { + if (isinf(pStddevRes->result) || isnan(pStddevRes->result)) { GET_RES_INFO(pCtx)->numOfRes = 0; } return functionFinalize(pCtx, pBlock); } -int32_t stdPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SStdRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t resultBytes = getStdInfoSize(); - char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); +bool gconcatGetFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SGconcatRes); + return true; +} - if (NULL == res) { - return terrno; +int32_t gconcatFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { + if (pResultInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } - (void)memcpy(varDataVal(res), pInfo, resultBytes); - varDataSetLen(res, resultBytes); - int32_t slotId = pCtx->pExpr->base.resSchema.slotId; - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - if (NULL == pCol) { - taosMemoryFree(res); - return TSDB_CODE_OUT_OF_RANGE; + SGconcatRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo); + (void)memset(pRes, 0, sizeof(SStdRes)); + + // pRes->separator = varDataVal(pCtx->param[0].param.pz); + pRes->separator = pCtx->param[0].param.pz; + pRes->type = pCtx->param[0].param.nType; + + /* + SInputColumnInfoData* pInput = &pCtx->input; + int32_t type = pInput->pData[0]->info.type; + + pRes->nchar = (type == TSDB_DATA_TYPE_NCHAR); + */ + + return TSDB_CODE_SUCCESS; +} + +static int32_t gconcatHelper(const char* input, char* output, bool hasNchar, int32_t type, VarDataLenT* dataLen, + void* charsetCxt) { + if (hasNchar && type == TSDB_DATA_TYPE_VARCHAR) { + TdUcs4* newBuf = taosMemoryCalloc((varDataLen(input) + 1) * TSDB_NCHAR_SIZE, 1); + if (NULL == newBuf) { + return terrno; + } + int32_t len = varDataLen(input); + bool ret = + taosMbsToUcs4(varDataVal(input), len, newBuf, (varDataLen(input) + 1) * TSDB_NCHAR_SIZE, &len, charsetCxt); + if (!ret) { + taosMemoryFree(newBuf); + return TSDB_CODE_SCALAR_CONVERT_ERROR; + } + (void)memcpy(varDataVal(output) + *dataLen, newBuf, len); + *dataLen += len; + taosMemoryFree(newBuf); + } else { + (void)memcpy(varDataVal(output) + *dataLen, varDataVal(input), varDataLen(input)); + *dataLen += varDataLen(input); } - int32_t code = colDataSetVal(pCol, pBlock->info.rows, res, false); + return TSDB_CODE_SUCCESS; +} - taosMemoryFree(res); +int32_t gconcatFunction(SqlFunctionCtx* pCtx) { + int32_t code = 0, numOfElem = 0; + SInputColumnInfoData* pInput = &pCtx->input; + int32_t rowStart = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + int32_t numOfCols = pInput->numOfInputCols; + SGconcatRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + char* sep = pRes->separator; + bool hasNchar = pRes->nchar; + VarDataLenT dataLen = 0; + bool prefixSep = false; + + if (!pRes->result) { + pRes->result = taosMemoryCalloc(1, TSDB_MAX_FIELD_LEN); + if (!pRes->result) { + return terrno; + } + + varDataSetLen(pRes->result, 0); + + for (int c = 1; c < numOfCols; ++c) { + SColumnInfoData* pCol = pInput->pData[c]; + int32_t type = pCol->info.type; + + if (TSDB_DATA_TYPE_NCHAR == type) { + pRes->nchar = true; + } + } + } else { + dataLen = varDataLen(pRes->result); + + prefixSep = true; + /* + code = gconcatHelper(sep, pRes->result, hasNchar, pRes->type, &dataLen, NULL); + if (code) { + goto _over; + } + */ + } + + // computing based on the true data block + char* buf = pRes->result; + for (int r = rowStart; r < rowStart + numOfRows; ++r) { + if (prefixSep) { + // concat the separator + // setup sepatator's charset instead of the default: pRes->charsetCxt + + code = gconcatHelper(sep, buf, hasNchar, pRes->type, &dataLen, NULL); + if (code) { + goto _over; + } + } + + for (int c = 1; c < numOfCols; ++c) { + SColumnInfoData* pCol = pInput->pData[c]; + int32_t type = pCol->info.type; + + if (IS_NULL_TYPE(type) || (pCol->hasNull && colDataIsNull_f(pCol, r))) { + continue; + } + + // concat this row's all columns + code = gconcatHelper(colDataGetData(pCol, r), buf, hasNchar, pInput->pData[c]->info.type, &dataLen, NULL); + if (code) { + goto _over; + } + } + + prefixSep = true; + } + + varDataSetLen(buf, dataLen); + numOfElem += 1; + +_over: + // data in the check operation are all null, not output + SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); return code; } -int32_t stdCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { - SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); - SStdRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); +int32_t gconcatFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + int32_t code = 0; + SInputColumnInfoData* pInput = &pCtx->input; + SGconcatRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); - SStdRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); - int16_t type = pDBuf->type == TSDB_DATA_TYPE_NULL ? pSBuf->type : pDBuf->type; + if (NULL == pCol) { + taosMemoryFree(pRes->result); + return TSDB_CODE_OUT_OF_RANGE; + } - stdTransferInfo(pSBuf, pDBuf); + code = colDataSetVal(pCol, pBlock->info.rows, pRes->result, NULL == pRes->result); - pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); - pDResInfo->isNullRes &= pSResInfo->isNullRes; - return TSDB_CODE_SUCCESS; + taosMemoryFree(pRes->result); + + return code; } bool getLeastSQRFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { From a65bee9a20d0450528aefd0c18a964e91471eebf Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 29 Aug 2025 18:31:31 +0800 Subject: [PATCH 5/6] use last param as separator instead of the first one --- source/libs/executor/src/aggregateoperator.c | 7 +++-- source/libs/function/src/builtins.c | 31 ++++++++++---------- source/libs/function/src/builtinsimpl.c | 16 ++++++---- 3 files changed, 30 insertions(+), 24 deletions(-) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 5e928b34c2ff..c48878f59200 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -349,13 +349,14 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { if (pCtx[k].fpSet.process == NULL) { continue; } - + /* int32_t firstData = 0; if (pCtx[k].numOfParams > 1) { firstData = 1; } - // if ((&pCtx[k])->input.pData[0] == NULL) { - if ((&pCtx[k])->input.pData[firstData] == NULL) { + */ + if ((&pCtx[k])->input.pData[0] == NULL) { + // if ((&pCtx[k])->input.pData[firstData] == NULL) { code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; qError("%s aggregate function error happens, input data is NULL.", GET_TASKID(pOperator->pTaskInfo)); } else { diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index ba4156490ace..c054743e9790 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1245,9 +1245,15 @@ int32_t apercentileCreateMergeParam(SNodeList* pRawParameters, SNode* pPartialRe } static int32_t gconcatCreateMergeParam(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) { - int32_t code = 0; - SNode* pNew = NULL; - + int32_t code = nodesListMakeAppend(pParameters, pPartialRes); + if (TSDB_CODE_SUCCESS == code) { + SNode* pNew = NULL; + code = nodesCloneNode(nodesListGetNode(pRawParameters, pRawParameters->length - 1), &pNew); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListStrictAppend(*pParameters, pNew); + } + } + /* code = nodesCloneNode(nodesListGetNode(pRawParameters, 0), &pNew); if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeAppend(pParameters, pNew); @@ -1256,7 +1262,7 @@ static int32_t gconcatCreateMergeParam(SNodeList* pRawParameters, SNode* pPartia code = nodesListStrictAppend(*pParameters, pPartialRes); } } - + */ return code; } @@ -6398,19 +6404,12 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "group_concat", .type = FUNCTION_TYPE_GROUP_CONCAT, .classification = FUNC_MGT_AGG_FUNC, // | FUNC_MGT_TSMA_FUNC, - .parameters = {.minParamNum = 2, - .maxParamNum = 9, + .parameters = {.minParamNum = 1, + .maxParamNum = -1, .paramInfoPattern = 1, - .inputParaInfo[0][0] = {.isLastParam = false, + .inputParaInfo[0][0] = {.isLastParam = true, .startParam = 1, - .endParam = 1, - .validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE | FUNC_PARAM_SUPPORT_NCHAR_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, - .validNodeType = FUNC_PARAM_SUPPORT_VALUE_NODE, - .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, - .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, - .inputParaInfo[0][1] = {.isLastParam = true, - .startParam = 2, - .endParam = 9, + .endParam = -1, .validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE | FUNC_PARAM_SUPPORT_NCHAR_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE, .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, @@ -6428,8 +6427,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { /* .combineFunc = gconcatCombine, // stream .pStateFunc = "group_concat", // tsma - .pMiddleFunc = "group_concat", + .pMiddleFunc = "group_concat", .pPartialFunc = "_gconcat_partial", .pMergeFunc = "_gconcat_merge", */ diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index dcbdd7933854..0a6eecaf2c5c 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1712,8 +1712,10 @@ int32_t gconcatFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultI (void)memset(pRes, 0, sizeof(SStdRes)); // pRes->separator = varDataVal(pCtx->param[0].param.pz); - pRes->separator = pCtx->param[0].param.pz; - pRes->type = pCtx->param[0].param.nType; + + int32_t sepParamIdx = pCtx->numOfParams - 1; + pRes->separator = pCtx->param[sepParamIdx].param.pz; + pRes->type = pCtx->param[sepParamIdx].param.nType; /* SInputColumnInfoData* pInput = &pCtx->input; @@ -1770,7 +1772,7 @@ int32_t gconcatFunction(SqlFunctionCtx* pCtx) { varDataSetLen(pRes->result, 0); - for (int c = 1; c < numOfCols; ++c) { + for (int c = 0; c < numOfCols - 1; ++c) { SColumnInfoData* pCol = pInput->pData[c]; int32_t type = pCol->info.type; @@ -1791,7 +1793,11 @@ int32_t gconcatFunction(SqlFunctionCtx* pCtx) { } // computing based on the true data block - char* buf = pRes->result; + char* buf = pRes->result; + SColumnInfoData* pCol = pInput->pData[numOfCols - 1]; + + sep = colDataGetData(pCol, 0); + pRes->type = pCol->info.type; for (int r = rowStart; r < rowStart + numOfRows; ++r) { if (prefixSep) { // concat the separator @@ -1803,7 +1809,7 @@ int32_t gconcatFunction(SqlFunctionCtx* pCtx) { } } - for (int c = 1; c < numOfCols; ++c) { + for (int c = 0; c < numOfCols - 1; ++c) { SColumnInfoData* pCol = pInput->pData[c]; int32_t type = pCol->info.type; From 3758561b7a0fddc22f9dedf72fa1896b1c6783db Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 1 Sep 2025 14:25:08 +0800 Subject: [PATCH 6/6] revert agg operator --- source/libs/executor/src/aggregateoperator.c | 73 +++++++++----------- 1 file changed, 34 insertions(+), 39 deletions(-) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index c48878f59200..93c4b0a308df 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -53,7 +53,7 @@ typedef struct SAggOperatorInfo { bool cleanGroupResInfo; } SAggOperatorInfo; -static void destroyAggOperatorInfo(void* param); +static void destroyAggOperatorInfo(void* param); static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock); @@ -100,11 +100,12 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); + code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num); TSDB_CHECK_CODE(code, lino, _error); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, - pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); + pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); TSDB_CHECK_CODE(code, lino, _error); if (pAggNode->pExprs != NULL) { @@ -191,7 +192,7 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SAggOperatorInfo* pAggInfo = pOperator->info; - if (!pAggInfo) { + if(!pAggInfo) { qError("function:%s, pAggInfo is NULL", __func__); return false; } @@ -238,8 +239,7 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) { // there is an scalar expression that needs to be calculated before apply the group aggregation. if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) { SExprSupp* pSup1 = &pAggInfo->scalarExprSup; - code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL, - GET_STM_RTINFO(pOperator->pTaskInfo)); + code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { destroyDataBlockForEmptyInput(blockAllocated, &pBlock); T_LONG_JMP(pTaskInfo->env, code); @@ -349,14 +349,8 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { if (pCtx[k].fpSet.process == NULL) { continue; } - /* - int32_t firstData = 0; - if (pCtx[k].numOfParams > 1) { - firstData = 1; - } - */ + if ((&pCtx[k])->input.pData[0] == NULL) { - // if ((&pCtx[k])->input.pData[firstData] == NULL) { code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; qError("%s aggregate function error happens, input data is NULL.", GET_TASKID(pOperator->pTaskInfo)); } else { @@ -377,8 +371,8 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { } static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SSDataBlock* pBlock = NULL; if (!tsCountAlwaysReturnValue) { return TSDB_CODE_SUCCESS; @@ -589,7 +583,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n } uint32_t defaultPgsz = 0; - int64_t defaultBufsz = 0; + int64_t defaultBufsz = 0; code = getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz); if (code) { qError("failed to get buff page size, rowSize:%d", pAggSup->resultRowSize); @@ -626,7 +620,7 @@ void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp if (!needCleanup) { return; } - + for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i); SRowBuffPos* pPos = pWinInfo->pStatePos; @@ -648,7 +642,7 @@ void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp } void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf, - SGroupResInfo* pGroupResInfo) { + SGroupResInfo* pGroupResInfo) { int32_t numOfExprs = pSup->numOfExprs; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; SqlFunctionCtx* pCtx = pSup->pCtx; @@ -663,15 +657,16 @@ void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, } for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { - SResultRow* pRow = NULL; - SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); - SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); + SResultRow* pRow = NULL; + SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); + SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); if (page == NULL) { qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); continue; } pRow = (SResultRow*)((char*)page + pPos->pos.offset); + for (int32_t j = 0; j < numOfExprs; ++j) { pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); if (pCtx[j].fpSet.cleanup) { @@ -683,7 +678,7 @@ void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, } void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf, - SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) { + SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) { int32_t numOfExprs = pSup->numOfExprs; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; SqlFunctionCtx* pCtx = pSup->pCtx; @@ -720,8 +715,8 @@ void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDisk } } -void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo, SAggSupporter* pAggSup, - bool cleanGroupResInfo) { +void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo, + SAggSupporter *pAggSup, bool cleanGroupResInfo) { if (cleanGroupResInfo) { cleanupResultInfoInGroupResInfo(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo); } else { @@ -780,22 +775,21 @@ int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* if (fmIsPlaceHolderFunc(pCtx[k].functionId)) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); - char* p = GET_ROWCELL_INTERBUF(pEntryInfo); + char* p = GET_ROWCELL_INTERBUF(pEntryInfo); - TAOS_CHECK_EXIT(fmSetStreamPseudoFuncParamVal(pCtx[k].functionId, pCtx[k].pExpr->base.pParamList, - &taskInfo->pStreamRuntimeInfo->funcInfo)); + TAOS_CHECK_EXIT(fmSetStreamPseudoFuncParamVal(pCtx[k].functionId, pCtx[k].pExpr->base.pParamList, &taskInfo->pStreamRuntimeInfo->funcInfo)); - SValueNode* valueNode = (SValueNode*)nodesListGetNode(pCtx[k].pExpr->base.pParamList, 0); + SValueNode *valueNode = (SValueNode *)nodesListGetNode(pCtx[k].pExpr->base.pParamList, 0); pEntryInfo->isNullRes = 0; if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) { pEntryInfo->isNullRes = 1; - } else if (IS_VAR_DATA_TYPE(pCtx[k].pExpr->base.resSchema.type)) { + } else if (IS_VAR_DATA_TYPE(pCtx[k].pExpr->base.resSchema.type)){ void* v = nodesGetValueFromNode(valueNode); memcpy(p, v, varDataTLen(v)); } else { memcpy(p, nodesGetValueFromNode(valueNode), pCtx[k].pExpr->base.resSchema.bytes); } - + pEntryInfo->numOfRes = 1; } else if (pCtx[k].isPseudoFunc) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); @@ -858,21 +852,22 @@ void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { static int32_t resetAggregateOperatorState(SOperatorInfo* pOper) { SAggOperatorInfo* pAgg = pOper->info; - SAggPhysiNode* pAggNode = (SAggPhysiNode*)pOper->pPhyNode; - + SAggPhysiNode* pAggNode = (SAggPhysiNode*)pOper->pPhyNode; + pOper->status = OP_NOT_OPENED; - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - SExecTaskInfo* pTaskInfo = pOper->pTaskInfo; - cleanupResultInfo(pTaskInfo, &pOper->exprSupp, &pAgg->groupResInfo, &pAgg->aggSup, pAgg->cleanGroupResInfo); + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + SExecTaskInfo* pTaskInfo = pOper->pTaskInfo; + cleanupResultInfo(pTaskInfo, &pOper->exprSupp, &pAgg->groupResInfo, &pAgg->aggSup, + pAgg->cleanGroupResInfo); cleanupGroupResInfo(&pAgg->groupResInfo); resetBasicOperatorState(&pAgg->binfo); - - int32_t code = - resetAggSup(&pOper->exprSupp, &pAgg->aggSup, pTaskInfo, pAggNode->pAggFuncs, pAggNode->pGroupKeys, keyBufSize, - pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); + + int32_t code = resetAggSup(&pOper->exprSupp, &pAgg->aggSup, pTaskInfo, pAggNode->pAggFuncs, pAggNode->pGroupKeys, + keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); if (code == 0) { - code = resetExprSupp(&pAgg->scalarExprSup, pTaskInfo, pAggNode->pExprs, NULL, &pTaskInfo->storageAPI.functionStore); + code = resetExprSupp(&pAgg->scalarExprSup, pTaskInfo, pAggNode->pExprs, NULL, + &pTaskInfo->storageAPI.functionStore); } pAgg->groupId = UINT64_MAX;