Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/common/tglobal.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,25 @@ extern int32_t tsNumOfRetentionThreads;
// sync raft
extern int32_t tsElectInterval;
extern int32_t tsHeartbeatInterval;
extern int32_t tsVnodeElectIntervalMs;
extern int32_t tsVnodeHeartbeatIntervalMs;
extern int32_t tsMnodeElectIntervalMs;
extern int32_t tsMnodeHeartbeatIntervalMs;
extern int32_t tsHeartbeatTimeout;
extern int32_t tsSnapReplMaxWaitN;
extern int64_t tsLogBufferMemoryAllowed; // maximum allowed log buffer size in bytes for each dnode
extern int64_t tsSyncApplyQueueSize;
extern int32_t tsRoutineReportInterval;
extern bool tsSyncLogHeartbeat;
extern int32_t tsSyncTimeout;

// arbitrator
extern int32_t tsArbHeartBeatIntervalSec;
extern int32_t tsArbCheckSyncIntervalSec;
extern int32_t tsArbSetAssignedTimeoutSec;
extern int32_t tsArbHeartBeatIntervalMs;
extern int32_t tsArbCheckSyncIntervalMs;
extern int32_t tsArbSetAssignedTimeoutMs;

// vnode
extern int64_t tsVndCommitMaxIntervalMs;
Expand Down
2 changes: 1 addition & 1 deletion include/dnode/mnode/mnode.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
* @return int32_t 0 for success, -1 for failure.
*/
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);

int32_t mndResetTimer(SMnode *pMnode);
int64_t mndGetRoleTimeMs(SMnode *pMnode);

/**
Expand Down
1 change: 1 addition & 0 deletions include/libs/sync/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg);
int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm);

SSyncState syncGetState(int64_t rid);
int32_t syncResetTimer(int64_t rid, int32_t electInterval, int32_t heartbeatInterval);
void syncGetCommitIndex(int64_t rid, int64_t* syncCommitIndex);
int32_t syncGetArbToken(int64_t rid, char* outToken);
int32_t syncCheckSynced(int64_t rid);
Expand Down
70 changes: 65 additions & 5 deletions source/common/src/tglobal.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,19 @@ int32_t tsNumOfCompactThreads = 2;
int32_t tsNumOfRetentionThreads = 1;

// sync raft
int32_t tsElectInterval = 25 * 1000;
int32_t tsElectInterval = 4000;
int32_t tsHeartbeatInterval = 1000;
int32_t tsVnodeElectIntervalMs = 4000;
int32_t tsVnodeHeartbeatIntervalMs = 1000;
int32_t tsMnodeElectIntervalMs = 3000;
int32_t tsMnodeHeartbeatIntervalMs = 500;
int32_t tsHeartbeatTimeout = 20 * 1000;
int32_t tsSnapReplMaxWaitN = 128;
int64_t tsLogBufferMemoryAllowed = 0; // bytes
int64_t tsSyncApplyQueueSize = 512;
int32_t tsRoutineReportInterval = 300;
bool tsSyncLogHeartbeat = false;
int32_t tsSyncTimeout = 0;

// mnode
int64_t tsMndSdbWriteDelta = 200;
Expand All @@ -133,6 +138,9 @@ bool tsForceKillTrans = false;
int32_t tsArbHeartBeatIntervalSec = 2;
int32_t tsArbCheckSyncIntervalSec = 3;
int32_t tsArbSetAssignedTimeoutSec = 14;
int32_t tsArbHeartBeatIntervalMs = 2000;
int32_t tsArbCheckSyncIntervalMs = 3000;
int32_t tsArbSetAssignedTimeoutMs = 14000;

// dnode
int64_t tsDndStart = 0;
Expand Down Expand Up @@ -924,16 +932,25 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * RPC_MEMORY_USAGE_RATIO * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncVnodeElectIntervalMs", tsVnodeElectIntervalMs, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncVnodeHeartbeatIntervalMs", tsVnodeHeartbeatIntervalMs, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncMnodeElectIntervalMs", tsMnodeElectIntervalMs, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncMnodeHeartbeatIntervalMs", tsMnodeHeartbeatIntervalMs, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "syncLogBufferMemoryAllowed", tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "syncApplyQueueSize", tsSyncApplyQueueSize, 32, 2048, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncRoutineReportInterval", tsRoutineReportInterval, 5, 600, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "syncLogHeartbeat", tsSyncLogHeartbeat, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_LOCAL));

TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncTimeout", tsSyncTimeout, 0, 60 * 24 * 2 * 1000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_LOCAL));

TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbHeartBeatIntervalSec", tsArbHeartBeatIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbCheckSyncIntervalSec", tsArbCheckSyncIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbSetAssignedTimeoutSec", tsArbSetAssignedTimeoutSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbHeartBeatIntervalMs", tsArbHeartBeatIntervalMs, 100, 60 * 24 * 2 * 1000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbCheckSyncIntervalMs", tsArbCheckSyncIntervalMs, 100, 60 * 24 * 2 * 1000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbSetAssignedTimeoutMs", tsArbSetAssignedTimeoutMs, 100, 60 * 24 * 2 * 1000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));

TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
Expand Down Expand Up @@ -1850,6 +1867,18 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncHeartbeatInterval");
tsHeartbeatInterval = pItem->i32;

TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncVnodeElectIntervalMs");
tsVnodeElectIntervalMs = pItem->i32;

TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncVnodeHeartbeatIntervalMs");
tsVnodeHeartbeatIntervalMs = pItem->i32;

TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncMnodeElectIntervalMs");
tsMnodeElectIntervalMs = pItem->i32;

TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncMnodeHeartbeatIntervalMs");
tsMnodeHeartbeatIntervalMs = pItem->i32;

TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncHeartbeatTimeout");
tsHeartbeatTimeout = pItem->i32;

Expand All @@ -1869,13 +1898,35 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsSyncLogHeartbeat = pItem->bval;

TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbHeartBeatIntervalSec");
tsArbHeartBeatIntervalSec = pItem->i32;
tsArbHeartBeatIntervalMs = pItem->i32 * 1000;

TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbCheckSyncIntervalSec");
tsArbCheckSyncIntervalSec = pItem->i32;
tsArbCheckSyncIntervalMs = pItem->i32 * 1000;

TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbSetAssignedTimeoutSec");
tsArbSetAssignedTimeoutSec = pItem->i32;
tsArbSetAssignedTimeoutMs = pItem->i32 * 1000;

TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbHeartBeatIntervalMs");
tsArbHeartBeatIntervalMs = pItem->i32;

TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbCheckSyncIntervalMs");
tsArbCheckSyncIntervalMs = pItem->i32;

TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbSetAssignedTimeoutMs");
tsArbSetAssignedTimeoutMs = pItem->i32;

TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncTimeout");
tsSyncTimeout = pItem->i32;

/*
if(tsSyncTimeout > 0){
tsElectInterval = tsSyncTimeout;
tsArbSetAssignedTimeoutMs = tsSyncTimeout;
tsHeartbeatInterval = tsSyncTimeout/4;
tsArbHeartBeatIntervalMs = tsSyncTimeout/4;
tsArbCheckSyncIntervalMs = tsSyncTimeout/4;
}
*/

TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "mndSdbWriteDelta");
tsMndSdbWriteDelta = pItem->i64;
Expand Down Expand Up @@ -2736,10 +2787,16 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"syncLogBufferMemoryAllowed", &tsLogBufferMemoryAllowed},
{"syncApplyQueueSize", &tsSyncApplyQueueSize},
{"syncHeartbeatInterval", &tsHeartbeatInterval},
{"syncElectInterval", &tsElectInterval},
{"syncVnodeHeartbeatIntervalMs", &tsVnodeHeartbeatIntervalMs},
{"syncVnodeElectIntervalMs", &tsVnodeElectIntervalMs},
{"syncMnodeHeartbeatIntervalMs", &tsMnodeHeartbeatIntervalMs},
{"syncMnodeElectIntervalMs", &tsMnodeElectIntervalMs},
{"syncHeartbeatTimeout", &tsHeartbeatTimeout},
{"syncSnapReplMaxWaitN", &tsSnapReplMaxWaitN},
{"syncRoutineReportInterval", &tsRoutineReportInterval},
{"syncLogHeartbeat", &tsSyncLogHeartbeat},
{"syncTimeout", &tsSyncTimeout},
{"walFsyncDataSizeLimit", &tsWalFsyncDataSizeLimit},

{"numOfCores", &tsNumOfCores},
Expand Down Expand Up @@ -2788,9 +2845,12 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"arbHeartBeatIntervalSec", &tsArbHeartBeatIntervalSec},
{"arbCheckSyncIntervalSec", &tsArbCheckSyncIntervalSec},
{"arbSetAssignedTimeoutSec", &tsArbSetAssignedTimeoutSec},
{"arbHeartBeatIntervalMs", &tsArbHeartBeatIntervalMs},
{"arbCheckSyncIntervalMs", &tsArbCheckSyncIntervalMs},
{"arbSetAssignedTimeoutMs", &tsArbSetAssignedTimeoutMs},
{"queryNoFetchTimeoutSec", &tsQueryNoFetchTimeoutSec},
{"enableStrongPassword", &tsEnableStrongPassword},
{"forceKillTrans", &tsForceKillTrans}};
{"forceKillTrans", &tsForceKillTrans}};

if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {
code = taosCfgSetOption(options, tListLen(options), pItem, false);
Expand Down
2 changes: 2 additions & 0 deletions source/dnode/mgmt/mgmt_dnode/inc/dmInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ typedef struct SDnodeMgmt {
SendAuditRecordsFp sendAuditRecordsFp;
GetVnodeLoadsFp getVnodeLoadsFp;
GetVnodeLoadsFp getVnodeLoadsLiteFp;
SetVnodeSyncTimeoutFp setVnodeSyncTimeoutFp;
GetMnodeLoadsFp getMnodeLoadsFp;
SetMnodeSyncTimeoutFp setMnodeSyncTimeoutFp;
GetQnodeLoadsFp getQnodeLoadsFp;
int32_t statusSeq;
} SDnodeMgmt;
Expand Down
76 changes: 76 additions & 0 deletions source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,67 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
if (pItem == NULL) {
return TSDB_CODE_CFG_NOT_FOUND;
}

if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
char value[10] = {0};
sscanf(cfgReq.value, "%d", &tsSyncTimeout);

if (tsSyncTimeout > 0) {
char tmp[10] = {0};

sprintf(tmp, "%d", tsSyncTimeout);
TAOS_CHECK_RETURN(
cfgGetAndSetItem(pCfg, &pItem, "arbSetAssignedTimeoutMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
sprintf(tmp, "%d", tsSyncTimeout / 4);
TAOS_CHECK_RETURN(
cfgGetAndSetItem(pCfg, &pItem, "arbHeartBeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
TAOS_CHECK_RETURN(
cfgGetAndSetItem(pCfg, &pItem, "arbCheckSyncIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));

sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout/4)/2);
TAOS_CHECK_RETURN(
cfgGetAndSetItem(pCfg, &pItem, "syncVnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
TAOS_CHECK_RETURN(
cfgGetAndSetItem(pCfg, &pItem, "syncMnodeElectIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));

sprintf(tmp, "%d", (tsSyncTimeout - tsSyncTimeout/4)/8);
TAOS_CHECK_RETURN(
cfgGetAndSetItem(pCfg, &pItem, "syncVnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));
TAOS_CHECK_RETURN(
cfgGetAndSetItem(pCfg, &pItem, "syncMnodeHeartbeatIntervalMs", tmp, CFG_STYPE_ALTER_SERVER_CMD, true));

dInfo("change syncTimeout, option:%s, value:%s, tsSyncTimeout:%d", cfgReq.config, cfgReq.value, tsSyncTimeout);
}
}

if (!isConifgItemLazyMode(pItem)) {
TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, cfgReq.config, true));

if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbHeartBeatIntervalMs", true));
TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbCheckSyncIntervalMs", true));
TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true));
TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true));
TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "arbSetAssignedTimeoutMs", true));
TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true));
TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true));
}

if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0) {
TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeElectIntervalMs", true));
}

if (taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) {
TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncVnodeHeartbeatIntervalMs", true));
}

if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0) {
TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeElectIntervalMs", true));
}

if (taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) {
TAOS_CHECK_RETURN(taosCfgDynamicOptions(pCfg, "syncMnodeHeartbeatIntervalMs", true));
}
}

if (pItem->category == CFG_CATEGORY_GLOBAL) {
Expand All @@ -530,6 +589,23 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dError("failed to persist local config since %s", tstrerror(code));
}
}

if (taosStrncasecmp(cfgReq.config, "syncTimeout", 128) == 0) {
dInfo("finished change syncTimeout, option:%s, value:%s, tsArbHeartBeatIntervalMs:%d", cfgReq.config, cfgReq.value,
tsArbHeartBeatIntervalMs);

(*pMgmt->setMnodeSyncTimeoutFp)();
(*pMgmt->setVnodeSyncTimeoutFp)();
}

if (taosStrncasecmp(cfgReq.config, "syncVnodeElectIntervalMs", 128) == 0 || taosStrncasecmp(cfgReq.config, "syncVnodeHeartbeatIntervalMs", 128) == 0) {
(*pMgmt->setVnodeSyncTimeoutFp)();
}

if (taosStrncasecmp(cfgReq.config, "syncMnodeElectIntervalMs", 128) == 0 || taosStrncasecmp(cfgReq.config, "syncMnodeHeartbeatIntervalMs", 128) == 0) {
(*pMgmt->setMnodeSyncTimeoutFp)();
}

if (cfgReq.version > 0) {
tsdmConfigVersion = cfgReq.version;
}
Expand Down
2 changes: 2 additions & 0 deletions source/dnode/mgmt/mgmt_dnode/src/dmInt.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp;
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp;
pMgmt->setMnodeSyncTimeoutFp = pInput->setMnodeSyncTimeoutFp;
pMgmt->setVnodeSyncTimeoutFp = pInput->setVnodeSyncTimeoutFp;

if ((code = dmStartWorker(pMgmt)) != 0) {
return code;
Expand Down
6 changes: 6 additions & 0 deletions source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
(void)mndGetLoad(pMgmt->pMnode, &pInfo->load);
}

void mmSetMnodeSyncTimeout(SMnodeMgmt *pMgmt) {
int32_t code = 0;
code = mndResetTimer(pMgmt->pMnode);
if (code != 0) dError("failed to mmSetMnodeSyncTimeout since %s", tstrerror(code));
}

int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
int32_t code = 0;
const STraceId *trace = &pMsg->info.traceId;
Expand Down
19 changes: 19 additions & 0 deletions source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,25 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
}

void vmSetVnodeSyncTimeout(SVnodeMgmt *pMgmt) {
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);

void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL || *ppVnode == NULL) continue;

SVnodeObj *pVnode = *ppVnode;

if (vnodeSetSyncTimeout(pVnode->pImpl, tsVnodeElectIntervalMs) != 0) {
dError("vgId:%d, failed to vnodeSetSyncTimeout", pVnode->vgId);
}
pIter = taosHashIterate(pMgmt->runngingHash, pIter);
}

(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
}

void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
if (!pInfo->pVloads) return;
Expand Down
2 changes: 2 additions & 0 deletions source/dnode/mgmt/node_mgmt/inc/dmMgmt.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ void dmMonitorCleanExpiredSamples();
void dmSendAuditRecords();
void dmGetVnodeLoads(SMonVloadInfo *pInfo);
void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo);
void dmSetVnodeSyncTimeout();
void dmGetMnodeLoads(SMonMloadInfo *pInfo);
void dmSetMnodeSyncTimeout();
void dmGetQnodeLoads(SQnodeLoad *pInfo);

#ifdef __cplusplus
Expand Down
2 changes: 2 additions & 0 deletions source/dnode/mgmt/node_mgmt/inc/dmNodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo, bool isReset);
void vmGetVnodeLoadsLite(void *pMgmt, SMonVloadInfo *pInfo);
void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo);
void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo);
void mmSetMnodeSyncTimeout(void *pMgmt);
void vmSetVnodeSyncTimeout(void *pMgmt);

void vmCleanExpriedSamples(void *pMgmt);

Expand Down
2 changes: 2 additions & 0 deletions source/dnode/mgmt/node_mgmt/src/dmEnv.c
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,9 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
.sendAuditRecordFp = auditSendRecordsInBatch,
.getVnodeLoadsFp = dmGetVnodeLoads,
.getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
.setVnodeSyncTimeoutFp = dmSetVnodeSyncTimeout,
.getMnodeLoadsFp = dmGetMnodeLoads,
.setMnodeSyncTimeoutFp = dmSetMnodeSyncTimeout,
.getQnodeLoadsFp = dmGetQnodeLoads,
.stopDnodeFp = dmStop,
};
Expand Down
24 changes: 24 additions & 0 deletions source/dnode/mgmt/node_mgmt/src/dmMonitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,30 @@ void dmGetMnodeLoads(SMonMloadInfo *pInfo) {
}
}

void dmSetMnodeSyncTimeout() {
dInfo("dmSetMnodeSyncTimeout");
SDnode *pDnode = dmInstance();
SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE];
if (dmMarkWrapper(pWrapper) == 0) {
if (pWrapper->pMgmt != NULL) {
mmSetMnodeSyncTimeout(pWrapper->pMgmt);
}
dmReleaseWrapper(pWrapper);
}
}

void dmSetVnodeSyncTimeout() {
dInfo("dmSetVnodeSyncTimeout");
SDnode *pDnode = dmInstance();
SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE];
if (dmMarkWrapper(pWrapper) == 0) {
if (pWrapper->pMgmt != NULL) {
vmSetVnodeSyncTimeout(pWrapper->pMgmt);
}
dmReleaseWrapper(pWrapper);
}
}

void dmGetQnodeLoads(SQnodeLoad *pInfo) {
SDnode *pDnode = dmInstance();
SMgmtWrapper *pWrapper = &pDnode->wrappers[QNODE];
Expand Down
Loading
Loading