diff --git a/include/common/tglobal.h b/include/common/tglobal.h index aec6a2982ab6..9a8d2000723d 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -62,6 +62,9 @@ extern int32_t tsForceReadConfig; extern int32_t tsdmConfigVersion; extern int32_t tsConfigInited; extern int32_t tsStatusInterval; +extern int32_t tsStatusIntervalMs; +extern int32_t tsStatusSRTimeoutMs; +extern int32_t tsStatusTimeoutMs; extern int32_t tsNumOfSupportVnodes; extern char tsEncryptAlgorithm[]; extern char tsEncryptScope[]; @@ -124,17 +127,25 @@ extern int32_t tsNumOfRetentionThreads; // sync raft extern int32_t tsElectInterval; extern int32_t tsHeartbeatInterval; +extern int32_t tsVnodeElectIntervalMs; +extern int32_t tsVnodeHeartbeatIntervalMs; +extern int32_t tsMnodeElectIntervalMs; +extern int32_t tsMnodeHeartbeatIntervalMs; extern int32_t tsHeartbeatTimeout; extern int32_t tsSnapReplMaxWaitN; extern int64_t tsLogBufferMemoryAllowed; // maximum allowed log buffer size in bytes for each dnode extern int64_t tsSyncApplyQueueSize; extern int32_t tsRoutineReportInterval; extern bool tsSyncLogHeartbeat; +extern int32_t tsSyncTimeout; // arbitrator extern int32_t tsArbHeartBeatIntervalSec; extern int32_t tsArbCheckSyncIntervalSec; extern int32_t tsArbSetAssignedTimeoutSec; +extern int32_t tsArbHeartBeatIntervalMs; +extern int32_t tsArbCheckSyncIntervalMs; +extern int32_t tsArbSetAssignedTimeoutMs; // vnode extern int64_t tsVndCommitMaxIntervalMs; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5d7d4e59a4ce..87f91383dcad 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1972,6 +1972,7 @@ typedef struct { int8_t encryptionKeyStat; uint32_t encryptionKeyChksum; SMonitorParas monitorParas; + int32_t statusIntervalMs; } SClusterCfg; typedef struct { diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 780cdbf34ec1..185c3cd9b36e 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -100,7 +100,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr * @return int32_t 0 for success, -1 for failure. */ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); - +int32_t mndResetTimer(SMnode *pMnode); int64_t mndGetRoleTimeMs(SMnode *pMnode); /** diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 8231821112c4..7bbdb629caf0 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -294,6 +294,7 @@ int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg); int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm); SSyncState syncGetState(int64_t rid); +int32_t syncResetTimer(int64_t rid, int32_t electInterval, int32_t heartbeatInterval); void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex); int32_t syncGetArbToken(int64_t rid, char* outToken); int32_t syncCheckSynced(int64_t rid); diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index 133fee501365..3c36e2d60b64 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -1593,6 +1593,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { TAOS_CHECK_EXIT(tEncodeI64(&encoder, pload->bufferSegmentSize)); } + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->clusterCfg.statusIntervalMs)); + tEndEncode(&encoder); _exit: @@ -1744,6 +1746,9 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { } } + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->clusterCfg.statusIntervalMs)); + } tEndDecode(&decoder); _exit: diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index a4ce638a543c..42b5c85f17e9 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -46,6 +46,9 @@ int32_t tsForceReadConfig = 0; int32_t tsdmConfigVersion = -1; int32_t tsConfigInited = 0; int32_t tsStatusInterval = 1; // second +int32_t tsStatusIntervalMs = 1000; +int32_t tsStatusSRTimeoutMs = 5000; +int32_t tsStatusTimeoutMs = 5000; int32_t tsNumOfSupportVnodes = 256; char tsEncryptAlgorithm[16] = {0}; char tsEncryptScope[100] = {0}; @@ -113,14 +116,19 @@ int32_t tsNumOfCompactThreads = 2; int32_t tsNumOfRetentionThreads = 1; // sync raft -int32_t tsElectInterval = 25 * 1000; +int32_t tsElectInterval = 4000; int32_t tsHeartbeatInterval = 1000; +int32_t tsVnodeElectIntervalMs = 4000; +int32_t tsVnodeHeartbeatIntervalMs = 1000; +int32_t tsMnodeElectIntervalMs = 3000; +int32_t tsMnodeHeartbeatIntervalMs = 500; int32_t tsHeartbeatTimeout = 20 * 1000; int32_t tsSnapReplMaxWaitN = 128; int64_t tsLogBufferMemoryAllowed = 0; // bytes int64_t tsSyncApplyQueueSize = 512; int32_t tsRoutineReportInterval = 300; bool tsSyncLogHeartbeat = false; +int32_t tsSyncTimeout = 0; // mnode int64_t tsMndSdbWriteDelta = 200; @@ -133,6 +141,9 @@ bool tsForceKillTrans = false; int32_t tsArbHeartBeatIntervalSec = 2; int32_t tsArbCheckSyncIntervalSec = 3; int32_t tsArbSetAssignedTimeoutSec = 14; +int32_t tsArbHeartBeatIntervalMs = 2000; +int32_t tsArbCheckSyncIntervalMs = 3000; +int32_t tsArbSetAssignedTimeoutMs = 14000; // dnode int64_t tsDndStart = 0; @@ -898,6 +909,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "maxShellConns", tsMaxShellConns, 10, 50000000, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY, CFG_CATEGORY_LOCAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "statusIntervalMs", tsStatusIntervalMs, 10, 30000, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_GLOBAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "statusSRTimeoutMs", tsStatusSRTimeoutMs, 10, 30000, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_GLOBAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "statusTimeoutMs", tsStatusTimeoutMs, 10, 30000, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY, CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); @@ -924,6 +938,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * RPC_MEMORY_USAGE_RATIO * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncVnodeElectIntervalMs", tsVnodeElectIntervalMs, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncVnodeHeartbeatIntervalMs", tsVnodeHeartbeatIntervalMs, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncMnodeElectIntervalMs", tsMnodeElectIntervalMs, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncMnodeHeartbeatIntervalMs", tsMnodeHeartbeatIntervalMs, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "syncLogBufferMemoryAllowed", tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_LOCAL)); @@ -931,9 +949,14 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncRoutineReportInterval", tsRoutineReportInterval, 5, 600, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "syncLogHeartbeat", tsSyncLogHeartbeat, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_LOCAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncTimeout", tsSyncTimeout, 0, 60 * 24 * 2 * 1000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_LOCAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbHeartBeatIntervalSec", tsArbHeartBeatIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbCheckSyncIntervalSec", tsArbCheckSyncIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbSetAssignedTimeoutSec", tsArbSetAssignedTimeoutSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbHeartBeatIntervalMs", tsArbHeartBeatIntervalMs, 100, 60 * 24 * 2 * 1000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbCheckSyncIntervalMs", tsArbCheckSyncIntervalMs, 100, 60 * 24 * 2 * 1000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbSetAssignedTimeoutMs", tsArbSetAssignedTimeoutMs, 100, 60 * 24 * 2 * 1000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); @@ -1614,6 +1637,16 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "statusInterval"); tsStatusInterval = pItem->i32; + tsStatusIntervalMs = pItem->i32 * 1000; + + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "statusIntervalMs"); + tsStatusIntervalMs = pItem->i32; + + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "statusSRTimeoutMs"); + tsStatusSRTimeoutMs = pItem->i32; + + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "statusTimeoutMs"); + tsStatusTimeoutMs = pItem->i32; TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "minSlidingTime"); tsMinSlidingTime = pItem->i32; @@ -1850,6 +1883,18 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncHeartbeatInterval"); tsHeartbeatInterval = pItem->i32; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncVnodeElectIntervalMs"); + tsVnodeElectIntervalMs = pItem->i32; + + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncVnodeHeartbeatIntervalMs"); + tsVnodeHeartbeatIntervalMs = pItem->i32; + + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncMnodeElectIntervalMs"); + tsMnodeElectIntervalMs = pItem->i32; + + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncMnodeHeartbeatIntervalMs"); + tsMnodeHeartbeatIntervalMs = pItem->i32; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncHeartbeatTimeout"); tsHeartbeatTimeout = pItem->i32; @@ -1869,13 +1914,35 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsSyncLogHeartbeat = pItem->bval; TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbHeartBeatIntervalSec"); - tsArbHeartBeatIntervalSec = pItem->i32; + tsArbHeartBeatIntervalMs = pItem->i32 * 1000; TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbCheckSyncIntervalSec"); - tsArbCheckSyncIntervalSec = pItem->i32; + tsArbCheckSyncIntervalMs = pItem->i32 * 1000; TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbSetAssignedTimeoutSec"); - tsArbSetAssignedTimeoutSec = pItem->i32; + tsArbSetAssignedTimeoutMs = pItem->i32 * 1000; + + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbHeartBeatIntervalMs"); + tsArbHeartBeatIntervalMs = pItem->i32; + + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbCheckSyncIntervalMs"); + tsArbCheckSyncIntervalMs = pItem->i32; + + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbSetAssignedTimeoutMs"); + tsArbSetAssignedTimeoutMs = pItem->i32; + + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncTimeout"); + tsSyncTimeout = pItem->i32; + + /* + if(tsSyncTimeout > 0){ + tsElectInterval = tsSyncTimeout; + tsArbSetAssignedTimeoutMs = tsSyncTimeout; + tsHeartbeatInterval = tsSyncTimeout/4; + tsArbHeartBeatIntervalMs = tsSyncTimeout/4; + tsArbCheckSyncIntervalMs = tsSyncTimeout/4; + } + */ TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "mndSdbWriteDelta"); tsMndSdbWriteDelta = pItem->i64; @@ -2705,6 +2772,9 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"disableStream", &tsDisableStream}, {"enableWhiteList", &tsEnableWhiteList}, {"statusInterval", &tsStatusInterval}, + {"statusIntervalMs", &tsStatusIntervalMs}, + {"statusSRTimeoutMs", &tsStatusSRTimeoutMs}, + {"statusTimeoutMs", &tsStatusTimeoutMs}, {"telemetryReporting", &tsEnableTelem}, {"monitor", &tsEnableMonitor}, {"monitorInterval", &tsMonitorInterval}, @@ -2736,10 +2806,16 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"syncLogBufferMemoryAllowed", &tsLogBufferMemoryAllowed}, {"syncApplyQueueSize", &tsSyncApplyQueueSize}, {"syncHeartbeatInterval", &tsHeartbeatInterval}, + {"syncElectInterval", &tsElectInterval}, + {"syncVnodeHeartbeatIntervalMs", &tsVnodeHeartbeatIntervalMs}, + {"syncVnodeElectIntervalMs", &tsVnodeElectIntervalMs}, + {"syncMnodeHeartbeatIntervalMs", &tsMnodeHeartbeatIntervalMs}, + {"syncMnodeElectIntervalMs", &tsMnodeElectIntervalMs}, {"syncHeartbeatTimeout", &tsHeartbeatTimeout}, {"syncSnapReplMaxWaitN", &tsSnapReplMaxWaitN}, {"syncRoutineReportInterval", &tsRoutineReportInterval}, {"syncLogHeartbeat", &tsSyncLogHeartbeat}, + {"syncTimeout", &tsSyncTimeout}, {"walFsyncDataSizeLimit", &tsWalFsyncDataSizeLimit}, {"numOfCores", &tsNumOfCores}, @@ -2788,9 +2864,12 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"arbHeartBeatIntervalSec", &tsArbHeartBeatIntervalSec}, {"arbCheckSyncIntervalSec", &tsArbCheckSyncIntervalSec}, {"arbSetAssignedTimeoutSec", &tsArbSetAssignedTimeoutSec}, + {"arbHeartBeatIntervalMs", &tsArbHeartBeatIntervalMs}, + {"arbCheckSyncIntervalMs", &tsArbCheckSyncIntervalMs}, + {"arbSetAssignedTimeoutMs", &tsArbSetAssignedTimeoutMs}, {"queryNoFetchTimeoutSec", &tsQueryNoFetchTimeoutSec}, {"enableStrongPassword", &tsEnableStrongPassword}, - {"forceKillTrans", &tsForceKillTrans}}; + {"forceKillTrans", &tsForceKillTrans}}; if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) { code = taosCfgSetOption(options, tListLen(options), pItem, false); diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index 45a597ec90ed..edca528086d1 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -44,7 +44,9 @@ typedef struct SDnodeMgmt { SendAuditRecordsFp sendAuditRecordsFp; GetVnodeLoadsFp getVnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsLiteFp; + SetVnodeSyncTimeoutFp setVnodeSyncTimeoutFp; GetMnodeLoadsFp getMnodeLoadsFp; + SetMnodeSyncTimeoutFp setMnodeSyncTimeoutFp; GetQnodeLoadsFp getQnodeLoadsFp; int32_t statusSeq; } SDnodeMgmt; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index f19a65a4cd2e..bfe45b6ca9d0 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -194,7 +194,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN); tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1); - req.clusterCfg.statusInterval = tsStatusInterval; + req.clusterCfg.statusIntervalMs = tsStatusIntervalMs; req.clusterCfg.checkTime = 0; req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite; req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0; @@ -267,10 +267,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { (void)dmGetMnodeEpSet(pMgmt->pData, &epSet); dDebug("send status req to mnode, statusSeq:%d, begin to send rpc msg", pMgmt->statusSeq); - code = - rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000); + code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs); if (code != 0) { - dError("failed to SendRecv status req with timeout %d since %s", tsStatusInterval * 5 * 1000, tstrerror(code)); + dError("failed to SendRecv status req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code)); return; } @@ -387,10 +386,9 @@ void dmSendConfigReq(SDnodeMgmt *pMgmt) { (void)dmGetMnodeEpSet(pMgmt->pData, &epSet); dDebug("send status req to mnode, statusSeq:%d, begin to send rpc msg", pMgmt->statusSeq); - code = - rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000); + code = rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusSRTimeoutMs); if (code != 0) { - dError("failed to SendRecv config req with timeout %d since %s", tsStatusInterval * 5 * 1000, tstrerror(code)); + dError("failed to SendRecv config req with timeout %d since %s", tsStatusSRTimeoutMs, tstrerror(code)); return; } if (rpcRsp.code != 0) { @@ -515,8 +513,59 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { if (pItem == NULL) { return TSDB_CODE_CFG_NOT_FOUND; } + + if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) { + char value[10] = {0}; + sscanf(cfgReq.value, "%d", &tsSyncTimeout); + + if (tsSyncTimeout > 0) { + char tmp[10] = {0}; + + sprintf(tmp, "%d", tsSyncTimeout); + TAOS_CHECK_RETURN( + cfgGetAndSetItem(pCfg, &pItem, "arbSetAssignedTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true)); + sprintf(tmp, "%d", tsSyncTimeout / 4); + TAOS_CHECK_RETURN( + cfgGetAndSetItem(pCfg, &pItem, "arbHeartBeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true)); + TAOS_CHECK_RETURN( + cfgGetAndSetItem(pCfg, &pItem, "arbCheckSyncIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true)); + + sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout/4)/2); + TAOS_CHECK_RETURN( + cfgGetAndSetItem(pCfg, &pItem, "syncVnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true)); + TAOS_CHECK_RETURN( + cfgGetAndSetItem(pCfg, &pItem, "syncMnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true)); + TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItem, "statusTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true)); + + sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout / 4) / 4); + TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItem, "statusSRTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true)); + + sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout/4)/8); + TAOS_CHECK_RETURN( + cfgGetAndSetItem(pCfg, &pItem, "syncVnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true)); + TAOS_CHECK_RETURN( + cfgGetAndSetItem(pCfg, &pItem, "syncMnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true)); + TAOS_CHECK_RETURN(cfgGetAndSetItem(pCfg, &pItem, "statusIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true)); + + dInfo("change syncTimeout, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value, tsSyncTimeout); + } + } + if (!isConifgItemLazyMode(pItem)) { TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true)); + + if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) { + TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbHeartBeatIntervalMs", true)); + TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbCheckSyncIntervalMs", true)); + TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true)); + TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true)); + TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbSetAssignedTimeoutMs", true)); + TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true)); + TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true)); + TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusTimeoutMs", true)); + TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusSRTimeoutMs", true)); + TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "statusIntervalMs", true)); + } } if (pItem->category == CFG_CATEGORY_GLOBAL) { @@ -530,6 +579,23 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { dError("failed to persist local config since %s", tstrerror(code)); } } + + if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) { + dInfo("finished change syncTimeout, option:%s, value:%s, tsArbHeartBeatIntervalMs:%d", cfgReq.config, cfgReq.value, + tsArbHeartBeatIntervalMs); + + (*pMgmt->setMnodeSyncTimeoutFp)(); + (*pMgmt->setVnodeSyncTimeoutFp)(); + } + + if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0 || taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) { + (*pMgmt->setVnodeSyncTimeoutFp)(); + } + + if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0 || taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) { + (*pMgmt->setMnodeSyncTimeoutFp)(); + } + if (cfgReq.version > 0) { tsdmConfigVersion = cfgReq.version; } diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 8797440f5428..7da6cc751b1c 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -82,6 +82,8 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp; pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp; pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp; + pMgmt->setMnodeSyncTimeoutFp = pInput->setMnodeSyncTimeoutFp; + pMgmt->setVnodeSyncTimeoutFp = pInput->setVnodeSyncTimeoutFp; if ((code = dmStartWorker(pMgmt)) != 0) { return code; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 0376b6e9ae58..b9535186a1c2 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -29,8 +29,8 @@ static void *dmStatusThreadFp(void *param) { int64_t curTime = taosGetTimestampMs(); if (curTime < lastTime) lastTime = curTime; - float interval = (curTime - lastTime) / 1000.0f; - if (interval >= tsStatusInterval) { + float interval = curTime - lastTime; + if (interval >= tsStatusIntervalMs) { dmSendStatusReq(pMgmt); lastTime = curTime; } @@ -49,8 +49,8 @@ static void *dmConfigThreadFp(void *param) { int64_t curTime = taosGetTimestampMs(); if (curTime < lastTime) lastTime = curTime; - float interval = (curTime - lastTime) / 1000.0f; - if (interval >= tsStatusInterval) { + float interval = curTime - lastTime; + if (interval >= tsStatusIntervalMs) { dmSendConfigReq(pMgmt); lastTime = curTime; } @@ -72,8 +72,8 @@ static void *dmStatusInfoThreadFp(void *param) { int64_t curTime = taosGetTimestampMs(); if (curTime < lastTime) lastTime = curTime; - float interval = (curTime - lastTime) / 1000.0f; - if (interval >= tsStatusInterval) { + float interval = curTime - lastTime; + if (interval >= tsStatusIntervalMs) { dmUpdateStatusInfo(pMgmt); lastTime = curTime; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 57b930fe2807..130f30dfda76 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -27,6 +27,12 @@ void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { (void)mndGetLoad(pMgmt->pMnode, &pInfo->load); } +void mmSetMnodeSyncTimeout(SMnodeMgmt *pMgmt) { + int32_t code = 0; + code = mndResetTimer(pMgmt->pMnode); + if (code != 0) dError("failed to mmSetMnodeSyncTimeout since %s", tstrerror(code)); +} + int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t code = 0; const STraceId *trace = &pMsg->info.traceId; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index bebcc3da5d4e..0280f55a54a2 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -49,6 +49,25 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) { (void)taosThreadRwlockUnlock(&pMgmt->hashLock); } +void vmSetVnodeSyncTimeout(SVnodeMgmt *pMgmt) { + (void)taosThreadRwlockRdlock(&pMgmt->hashLock); + + void *pIter = taosHashIterate(pMgmt->runngingHash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + if (ppVnode == NULL || *ppVnode == NULL) continue; + + SVnodeObj *pVnode = *ppVnode; + + if (vnodeSetSyncTimeout(pVnode->pImpl, tsVnodeElectIntervalMs) != 0) { + dError("vgId:%d, failed to vnodeSetSyncTimeout", pVnode->vgId); + } + pIter = taosHashIterate(pMgmt->runngingHash, pIter); + } + + (void)taosThreadRwlockUnlock(&pMgmt->hashLock); +} + void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite)); if (!pInfo->pVloads) return; diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index bcedcdefa677..56fe41ea91b8 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -138,7 +138,9 @@ void dmMonitorCleanExpiredSamples(); void dmSendAuditRecords(); void dmGetVnodeLoads(SMonVloadInfo *pInfo); void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo); +void dmSetVnodeSyncTimeout(); void dmGetMnodeLoads(SMonMloadInfo *pInfo); +void dmSetMnodeSyncTimeout(); void dmGetQnodeLoads(SQnodeLoad *pInfo); #ifdef __cplusplus diff --git a/source/dnode/mgmt/node_mgmt/inc/dmNodes.h b/source/dnode/mgmt/node_mgmt/inc/dmNodes.h index 2561a13b92fc..45004668453c 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmNodes.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmNodes.h @@ -38,6 +38,8 @@ void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo, bool isReset); void vmGetVnodeLoadsLite(void *pMgmt, SMonVloadInfo *pInfo); void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo); void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo); +void mmSetMnodeSyncTimeout(void *pMgmt); +void vmSetVnodeSyncTimeout(void *pMgmt); void vmCleanExpriedSamples(void *pMgmt); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 8e1b50a78911..33a998d76757 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -422,7 +422,9 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { .sendAuditRecordFp = auditSendRecordsInBatch, .getVnodeLoadsFp = dmGetVnodeLoads, .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite, + .setVnodeSyncTimeoutFp = dmSetVnodeSyncTimeout, .getMnodeLoadsFp = dmGetMnodeLoads, + .setMnodeSyncTimeoutFp = dmSetMnodeSyncTimeout, .getQnodeLoadsFp = dmGetQnodeLoads, .stopDnodeFp = dmStop, }; diff --git a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c index ce0b2b59e001..6767e9dbe994 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c @@ -178,6 +178,30 @@ void dmGetMnodeLoads(SMonMloadInfo *pInfo) { } } +void dmSetMnodeSyncTimeout() { + dInfo("dmSetMnodeSyncTimeout"); + SDnode *pDnode = dmInstance(); + SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE]; + if (dmMarkWrapper(pWrapper) == 0) { + if (pWrapper->pMgmt != NULL) { + mmSetMnodeSyncTimeout(pWrapper->pMgmt); + } + dmReleaseWrapper(pWrapper); + } +} + +void dmSetVnodeSyncTimeout() { + dInfo("dmSetVnodeSyncTimeout"); + SDnode *pDnode = dmInstance(); + SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE]; + if (dmMarkWrapper(pWrapper) == 0) { + if (pWrapper->pMgmt != NULL) { + vmSetVnodeSyncTimeout(pWrapper->pMgmt); + } + dmReleaseWrapper(pWrapper); + } +} + void dmGetQnodeLoads(SQnodeLoad *pInfo) { SDnode *pDnode = dmInstance(); SMgmtWrapper *pWrapper = &pDnode->wrappers[QNODE]; diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index fc880ec05515..ba2f2a40510a 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -120,6 +120,8 @@ typedef void (*MonitorCleanExpiredSamplesFp)(); typedef void (*SendAuditRecordsFp)(); typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo); typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo); +typedef void (*SetMnodeSyncTimeoutFp)(); +typedef void (*SetVnodeSyncTimeoutFp)(); typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo); typedef int32_t (*ProcessAlterNodeTypeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef void (*StopDnodeFp)(); @@ -161,7 +163,9 @@ typedef struct { SendAuditRecordsFp sendAuditRecordFp; GetVnodeLoadsFp getVnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsLiteFp; + SetVnodeSyncTimeoutFp setVnodeSyncTimeoutFp; GetMnodeLoadsFp getMnodeLoadsFp; + SetMnodeSyncTimeoutFp setMnodeSyncTimeoutFp; GetQnodeLoadsFp getQnodeLoadsFp; StopDnodeFp stopDnodeFp; } SMgmtInputOpt; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 8c8553627622..b185eb16e8a9 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -118,6 +118,7 @@ typedef struct SMnode { int32_t selfDnodeId; int64_t clusterId; TdThread thread; + TdThread arbThread; TdThreadRwlock lock; int32_t rpcRef; int32_t syncRef; diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index 0a2e20ae208b..e5962b30d613 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -390,7 +390,11 @@ static int32_t mndSendArbHeartBeatReq(SDnodeObj *pDnode, char *arbToken, int64_t if (code != 0) { mError("arbgroup:0, dnodeId:%d, failed to send arb-hb request to dnode since 0x%x", pDnode->id, code); } else { - mTrace("arbgroup:0, dnodeId:%d, send arb-hb request to dnode", pDnode->id); + if (tsSyncLogHeartbeat) { + mInfo("arbgroup:0, dnodeId:%d, send arb-hb request to dnode", pDnode->id); + } else { + mTrace("arbgroup:0, dnodeId:%d, send arb-hb request to dnode", pDnode->id); + } } return code; } @@ -540,7 +544,7 @@ static int32_t mndSendArbCheckSyncReq(SMnode *pMnode, int32_t vgId, char *arbTok static bool mndCheckArbMemberHbTimeout(SArbGroup *pArbGroup, int32_t index, int64_t nowMs) { SArbGroupMember *pArbMember = &pArbGroup->members[index]; - return pArbMember->state.lastHbMs < (nowMs - tsArbSetAssignedTimeoutSec * 1000); + return pArbMember->state.lastHbMs < (nowMs - tsArbSetAssignedTimeoutMs); } static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm, @@ -747,7 +751,7 @@ static int32_t mndArbProcessTimer(SRpcMsg *pReq) { int64_t roleTimeMs = mndGetRoleTimeMs(pMnode); int64_t nowMs = taosGetTimestampMs(); - if (nowMs - roleTimeMs < tsArbHeartBeatIntervalSec * 1000 * 2) { + if (nowMs - roleTimeMs < tsArbHeartBeatIntervalMs * 2) { mInfo("arbgroup:0, arb skip to check sync since mnd had just switch over, roleTime:%" PRId64 " now:%" PRId64, roleTimeMs, nowMs); return 0; @@ -901,7 +905,7 @@ static int32_t mndArbPutBatchUpdateIntoWQ(SMnode *pMnode, SArray *newGroupArray) mndInitArbUpdateGroup(pNewGroup, &newGroup); if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER; - mInfo("arbgroup:%d, put into arb update hash", pNewGroup->vgId); + mInfo("arbgroup:%d, put into arb update hash in array", pNewGroup->vgId); if ((ret = taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0)) != 0) { mError("arbgroup:%d, failed to put into arb update hash since %s", pNewGroup->vgId, tstrerror(ret)); goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index e053092ea3e9..1b249970c4bc 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -392,7 +392,7 @@ int32_t mndGetDbSize(SMnode *pMnode) { bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) { int64_t interval = TABS(pDnode->lastAccessTime - curMs); - if (interval > 5000 * (int64_t)tsStatusInterval) { + if (interval > (int64_t)tsStatusTimeoutMs) { if (pDnode->rebootTime > 0 && pDnode->offlineReason == DND_REASON_ONLINE) { pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT; } @@ -482,9 +482,9 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const S return DND_REASON_STATUS_MONITOR_NOT_MATCH; } - if (pCfg->statusInterval != tsStatusInterval) { - mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval, - tsStatusInterval); + if (pCfg->statusIntervalMs != tsStatusIntervalMs) { + mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusIntervalMs, + tsStatusIntervalMs); terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL; return DND_REASON_STATUS_INTERVAL_NOT_MATCH; } @@ -1524,8 +1524,8 @@ static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p int32_t code = 0; int32_t lino = 0; - cfgOpts[totalRows] = "statusInterval"; - (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval); + cfgOpts[totalRows] = "statusIntervalMs"; + (void)snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusIntervalMs); totalRows++; cfgOpts[totalRows] = "timezone"; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 9939f74ca6a9..c1b6026223d5 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -290,7 +290,8 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) if (pGid->dnodeId == dnodeId) { if (pGid->syncState != TAOS_SYNC_STATE_OFFLINE) { mInfo( - "vgId:%d, state changed by offline check, old state:%s restored:%d canRead:%d new state:error restored:0 " + "vgId:%d, state changed by offline check, old state:%s restored:%d canRead:%d new state:offline " + "restored:0 " "canRead:0", pVgroup->vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead); pGid->syncState = TAOS_SYNC_STATE_OFFLINE; @@ -375,8 +376,6 @@ static int32_t minCronTime() { min = TMIN(min, tsMqRebalanceInterval); min = TMIN(min, tsStreamCheckpointInterval); min = TMIN(min, tsStreamNodeCheckInterval); - min = TMIN(min, tsArbHeartBeatIntervalSec); - min = TMIN(min, tsArbCheckSyncIntervalSec); int64_t telemInt = TMIN(60, (tsTelemInterval - 1)); min = TMIN(min, telemInt); @@ -440,30 +439,38 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) { if (sec % tsUptimeInterval == 0) { mndIncreaseUpTime(pMnode); } +} + +void mndDoArbTimerPullupTask(SMnode *pMnode, int64_t ms) { + int32_t code = 0; #ifndef TD_ASTRA - if (sec % (tsArbHeartBeatIntervalSec) == 0) { + if (ms % (tsArbHeartBeatIntervalMs) == 0) { if ((code = mndPullupArbHeartbeat(pMnode)) != 0) { mError("failed to pullup arb heartbeat, since:%s", tstrerror(code)); } } - if (sec % (tsArbCheckSyncIntervalSec) == 0) { + if (ms % (tsArbCheckSyncIntervalMs) == 0) { if ((code = mndPullupArbCheckSync(pMnode)) != 0) { mError("failed to pullup arb check sync, since:%s", tstrerror(code)); } } #endif } -void mndDoTimerCheckTask(SMnode *pMnode, int64_t sec) { - if (sec % (tsStatusInterval * 5) == 0) { + +void mndDoTimerCheckStatus(SMnode *pMnode, int64_t ms) { + if (ms % (tsStatusTimeoutMs) == 0) { mndCheckDnodeOffline(pMnode); } +} + +void mndDoTimerCheckSync(SMnode *pMnode, int64_t sec) { if (sec % (MNODE_TIMEOUT_SEC / 2) == 0) { mndSyncCheckTimeout(pMnode); } } -static void *mndThreadFp(void *param) { +static void *mndThreadSecFp(void *param) { SMnode *pMnode = param; int64_t lastTime = 0; setThreadName("mnode-timer"); @@ -481,7 +488,7 @@ static void *mndThreadFp(void *param) { } int64_t sec = lastTime / 10; - mndDoTimerCheckTask(pMnode, sec); + mndDoTimerCheckSync(pMnode, sec); mndDoTimerPullupTask(pMnode, sec); } @@ -489,6 +496,31 @@ static void *mndThreadFp(void *param) { return NULL; } +static void *mndThreadMsFp(void *param) { + SMnode *pMnode = param; + int64_t lastTime = 0; + setThreadName("mnode-arb-timer"); + + while (1) { + lastTime += 100; + taosMsleep(100); + + if (mndGetStop(pMnode)) break; + if (lastTime % 10 != 0) continue; + + if (mnodeIsNotLeader(pMnode)) { + mTrace("timer not process since mnode is not leader"); + continue; + } + + mndDoTimerCheckStatus(pMnode, lastTime); + + mndDoArbTimerPullupTask(pMnode, lastTime); + } + + return NULL; +} + static int32_t mndInitTimer(SMnode *pMnode) { int32_t code = 0; TdThreadAttr thAttr; @@ -497,13 +529,27 @@ static int32_t mndInitTimer(SMnode *pMnode) { #ifdef TD_COMPACT_OS (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL); #endif - if ((code = taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode)) != 0) { + if ((code = taosThreadCreate(&pMnode->thread, &thAttr, mndThreadSecFp, pMnode)) != 0) { mError("failed to create timer thread since %s", tstrerror(code)); TAOS_RETURN(code); } (void)taosThreadAttrDestroy(&thAttr); tmsgReportStartup("mnode-timer", "initialized"); + + TdThreadAttr arbAttr; + (void)taosThreadAttrInit(&arbAttr); + (void)taosThreadAttrSetDetachState(&arbAttr, PTHREAD_CREATE_JOINABLE); +#ifdef TD_COMPACT_OS + (void)taosThreadAttrSetStackSize(&arbAttr, STACK_SIZE_SMALL); +#endif + if ((code = taosThreadCreate(&pMnode->arbThread, &arbAttr, mndThreadMsFp, pMnode)) != 0) { + mError("failed to create arb timer thread since %s", tstrerror(code)); + TAOS_RETURN(code); + } + + (void)taosThreadAttrDestroy(&arbAttr); + tmsgReportStartup("mnode-timer", "initialized"); TAOS_RETURN(code); } @@ -512,6 +558,10 @@ static void mndCleanupTimer(SMnode *pMnode) { (void)taosThreadJoin(pMnode->thread, NULL); taosThreadClear(&pMnode->thread); } + if (taosCheckPthreadValid(pMnode->arbThread)) { + (void)taosThreadJoin(pMnode->arbThread, NULL); + taosThreadClear(&pMnode->arbThread); + } } static int32_t mndCreateDir(SMnode *pMnode, const char *path) { @@ -1194,6 +1244,10 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr TAOS_RETURN(code); } +int32_t mndResetTimer(SMnode *pMnode){ + return syncResetTimer(pMnode->syncMgmt.sync, tsMnodeElectIntervalMs, tsMnodeHeartbeatIntervalMs); +} + int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { mTrace("mnode get load"); SSyncState state = syncGetState(pMnode->syncMgmt.sync); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index c61760c06e4d..5224fdbb9f7e 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -492,15 +492,16 @@ int32_t mndInitSync(SMnode *pMnode) { .syncEqMsg = mndSyncEqMsg, .syncEqCtrlMsg = mndSyncEqCtrlMsg, .pingMs = 5000, - .electMs = 3000, - .heartbeatMs = 500, + .electMs = tsMnodeElectIntervalMs, + .heartbeatMs = tsMnodeHeartbeatIntervalMs, }; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); syncInfo.pFsm = mndSyncMakeFsm(pMnode); - mInfo("vgId:1, start to open mnode sync, replica:%d selfIndex:%d", pMgmt->numOfReplicas, pMgmt->selfIndex); SSyncCfg *pCfg = &syncInfo.syncCfg; + mInfo("vgId:1, start to open mnode sync, replica:%d selfIndex:%d, electMs:%d, heartbeatMs:%d", pMgmt->numOfReplicas, + pMgmt->selfIndex, syncInfo.electMs, syncInfo.heartbeatMs); pCfg->totalReplicaNum = pMgmt->numOfTotalReplicas; pCfg->replicaNum = pMgmt->numOfReplicas; pCfg->myIndex = pMgmt->selfIndex; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index ae2f996e15b6..5fec6ec46586 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -101,6 +101,7 @@ int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num); int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid, SSchemaWrapper **pTagSchema); void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad); +int32_t vnodeSetSyncTimeout(SVnode *pVnode, int32_t ms); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeGetLoadLite(SVnode *pVnode, SVnodeLoadLite *pLoad); int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 347f6a92cd9d..f377510fd03c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -770,15 +770,16 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) { .syncEqMsg = vnodeSyncEqMsg, .syncEqCtrlMsg = vnodeSyncEqCtrlMsg, .pingMs = 5000, - .electMs = 4000, - .heartbeatMs = 700, + .electMs = tsVnodeElectIntervalMs, + .heartbeatMs = tsVnodeHeartbeatIntervalMs, }; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP); syncInfo.pFsm = vnodeSyncMakeFsm(pVnode); SSyncCfg *pCfg = &syncInfo.syncCfg; - vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex); + vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d, electMs:%d, heartbeatMs:%d", pVnode->config.vgId, + pCfg->replicaNum, pCfg->myIndex, syncInfo.electMs, syncInfo.heartbeatMs); for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) { SNodeInfo *pNode = &pCfg->nodeInfo[i]; vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn, @@ -804,6 +805,15 @@ int32_t vnodeSyncStart(SVnode *pVnode) { return 0; } +int32_t vnodeSetSyncTimeout(SVnode *pVnode, int32_t ms) { + int32_t code = syncResetTimer(pVnode->sync, tsVnodeElectIntervalMs, tsVnodeHeartbeatIntervalMs); + if (code) { + vError("vgId:%d, failed to vnode Set SyncTimeout since %s", pVnode->config.vgId, tstrerror(code)); + return code; + } + return 0; +} + void vnodeSyncPreClose(SVnode *pVnode) { vInfo("vgId:%d, sync pre close", pVnode->config.vgId); int32_t code = syncLeaderTransfer(pVnode->sync); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 3ed47f3885f2..77accabcac21 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -244,7 +244,7 @@ struct SSyncNode { }; // open/close -------------- -SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion); +SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion, int32_t electInterval, int32_t heartbeatInterval); int32_t syncNodeStart(SSyncNode* pSyncNode); int32_t syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); @@ -279,7 +279,7 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); void syncNodeResetElectTimer(SSyncNode* pSyncNode); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); -int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode); +int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode, int32_t heartbeatInterval); // utils -------------- int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 815ed7fa874b..0a2c254648b8 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -62,7 +62,8 @@ static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { sInfo("vgId:%d, start to open sync", pSyncInfo->vgId); - SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion); + + SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion, pSyncInfo->electMs, pSyncInfo->heartbeatMs); if (pSyncNode == NULL) { sError("vgId:%d, failed to open sync node", pSyncInfo->vgId); return -1; @@ -80,7 +81,8 @@ int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs; pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs; pSyncNode->msgcb = pSyncInfo->msgcb; - sInfo("vgId:%d, sync opened", pSyncInfo->vgId); + sInfo("vgId:%d, sync opened, electBaseLine:%d, hbBaseLine:%d", pSyncInfo->vgId, pSyncNode->electBaseLine, + pSyncNode->hbBaseLine); return pSyncNode->rid; } @@ -650,6 +652,24 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) { return ret; } +int32_t syncResetTimer(int64_t rid, int32_t electInterval, int32_t heartbeatInterval) { + int32_t code = 0; + sInfo("sync Reset Timer, rid:%" PRId64, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid); + TAOS_RETURN(code); + } + pSyncNode->electBaseLine = electInterval; + syncNodeResetElectTimer(pSyncNode); + + sInfo("vgId:%d, sync Reset Timer, rid:%" PRId64, pSyncNode->vgId, rid); + TAOS_CHECK_RETURN(syncNodeRestartHeartbeatTimer(pSyncNode, heartbeatInterval)); + return code; +} + SSyncState syncGetState(int64_t rid) { SSyncState state = {.state = TAOS_SYNC_STATE_ERROR}; @@ -1025,9 +1045,12 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa pSyncTimer->destId = destId; pSyncTimer->timeStamp = taosGetTimestampMs(); atomic_store_64(&pSyncTimer->logicClock, 0); + sInfo("vgId:%d, HbTimer init, timerMs:%d for addr:0x%" PRIx64, pSyncNode->vgId, pSyncTimer->timerMS, destId.addr); return 0; } +static void syncHBSetTimerMS(SSyncTimer* pSyncTimer, int32_t ms) { pSyncTimer->timerMS = ms; } + static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { int32_t code = 0; int64_t tsNow = taosGetTimestampMs(); @@ -1046,8 +1069,8 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { pData->logicClock = pSyncTimer->logicClock; pData->execTime = tsNow + pSyncTimer->timerMS; - sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:0x%" PRIx64 " at %d", pSyncNode->vgId, pData->rid, - pData->destId.addr, pSyncTimer->timerMS); + sInfo("vgId:%d, start hb timer, rid:%" PRId64 " addr:0x%" PRIx64 " at %d", pSyncNode->vgId, pData->rid, + pData->destId.addr, pSyncTimer->timerMS); bool stopped = taosTmrResetPriority(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid), syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2); @@ -1106,7 +1129,7 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) { } // open/close -------------- -SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { +SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion, int32_t electInterval, int32_t heartbeatInterval) { int32_t code = 0; SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode)); if (pSyncNode == NULL) { @@ -1346,8 +1369,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { // timer ms init pSyncNode->pingBaseLine = PING_TIMER_MS; - pSyncNode->electBaseLine = tsElectInterval; - pSyncNode->hbBaseLine = tsHeartbeatInterval; + pSyncNode->electBaseLine = electInterval; + pSyncNode->hbBaseLine = heartbeatInterval; // init ping timer pSyncNode->pPingTimer = NULL; @@ -1458,8 +1481,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { pSyncNode->hbrSlowNum = 0; pSyncNode->tmrRoutineNum = 0; - sNInfo(pSyncNode, "sync node opened, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode, - tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout); + sNInfo(pSyncNode, "sync node opened, node:%p electBaseLine:%d hbBaseLine:%d heartbeatTimeout:%d", pSyncNode, + pSyncNode->electBaseLine, pSyncNode->hbBaseLine, tsHeartbeatTimeout); return pSyncNode; _error: @@ -1813,6 +1836,19 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { return ret; } +int32_t syncNodeSetHeartbeatTimerMs(SSyncNode* pSyncNode, int32_t ms) { + int32_t code = 0; + + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { + SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); + if (pSyncTimer != NULL) { + syncHBSetTimerMS(pSyncTimer, ms); + } + } + + return code; +} + int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { int32_t code = 0; @@ -1833,15 +1869,17 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { return code; } -#ifdef BUILD_NO_CALL -int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) { - // TODO check return value +int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode, int32_t heartbeatInterval) { int32_t code = 0; - TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode)); - TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode)); + sInfo("vgId:%d, sync Node Restart HeartbeatTimer, state=%d", pSyncNode->vgId, pSyncNode->state); + TAOS_CHECK_RETURN(syncNodeSetHeartbeatTimerMs(pSyncNode, heartbeatInterval)); + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode)); + TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode)); + } + return 0; } -#endif int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) { SEpSet* epSet = NULL; diff --git a/test/cases/22-Show/test_show_basic.py b/test/cases/22-Show/test_show_basic.py index bd9531c1cf38..c574582b1bef 100644 --- a/test/cases/22-Show/test_show_basic.py +++ b/test/cases/22-Show/test_show_basic.py @@ -179,7 +179,7 @@ def test_show_basic(self): tdSql.error(f"show create stable t0;") tdSql.query(f"show variables;") - tdSql.checkRows(93) + tdSql.checkRows(103) tdSql.query(f"show dnode 1 variables;") if tdSql.getRows() <= 0: diff --git a/test/cases/50-Others/01-Valgrind/test_valgrind_checkerror1.py b/test/cases/50-Others/01-Valgrind/test_valgrind_checkerror1.py index c5a39d2affe0..5f5f6f3c648e 100644 --- a/test/cases/50-Others/01-Valgrind/test_valgrind_checkerror1.py +++ b/test/cases/50-Others/01-Valgrind/test_valgrind_checkerror1.py @@ -97,7 +97,7 @@ def test_valgrind_check_error1(self): tdSql.checkRows(3) tdSql.query(f"show variables;") - tdSql.checkRows(93) + tdSql.checkRows(103) tdSql.query(f"show dnode 1 variables;") tdSql.checkAssert(tdSql.getRows() > 0)