Skip to content

Commit e30a3d3

Browse files
[Improvement-18039][Metrics] Add missing metrics for workflow and task state transitions
This PR adds missing metrics for task and workflow execution into DolphinScheduler metrics. It also adopts an event-driven mechanism to track Task metrics cleanly upon Task event bus fires.
1 parent d30bc4f commit e30a3d3

8 files changed

Lines changed: 336 additions & 70 deletions

File tree

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,40 @@ private void doFireSingleEvent(final IWorkflowExecutionRunnable workflowExecutio
156156
throw new RuntimeException("No EventHandler found for event: " + event.getEventType());
157157
}
158158
lifecycleEventHandler.handle(workflowExecutionRunnable, event);
159+
160+
recordTaskInstanceMetrics(event);
161+
}
162+
163+
private void recordTaskInstanceMetrics(AbstractLifecycleEvent event) {
164+
if (!(event
165+
.getEventType() instanceof org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType)) {
166+
return;
167+
}
168+
169+
switch ((org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType) event
170+
.getEventType()) {
171+
case DISPATCHED:
172+
org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("dispatch");
173+
break;
174+
case SUCCEEDED:
175+
org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("success");
176+
break;
177+
case FAILED:
178+
case FATAL:
179+
org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("fail");
180+
break;
181+
case KILLED:
182+
org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("kill");
183+
break;
184+
case RETRY:
185+
org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("retry");
186+
break;
187+
case TIMEOUT:
188+
org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("timeout");
189+
break;
190+
default:
191+
break;
192+
}
159193
}
160194

161195
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ private void persistentTaskInstanceKilledEventToDB(final ITaskExecutionRunnable
196196
taskInstance.setState(TaskExecutionStatus.KILL);
197197
taskInstance.setEndTime(taskKilledEvent.getEndTime());
198198
taskInstanceDao.updateById(taskInstance);
199-
200199
}
201200

202201
@Override

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnableFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
2424
import org.apache.dolphinscheduler.server.master.engine.command.ICommandHandler;
2525
import org.apache.dolphinscheduler.server.master.engine.exceptions.CommandDuplicateHandleException;
26+
import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics;
2627

2728
import java.util.List;
2829

@@ -52,8 +53,11 @@ public class WorkflowExecutionRunnableFactory {
5253
*/
5354
@Transactional
5455
public IWorkflowExecutionRunnable createWorkflowExecuteRunnable(Command command) {
56+
long startTime = System.currentTimeMillis();
5557
deleteCommandOrThrow(command);
56-
return doCreateWorkflowExecutionRunnable(command);
58+
IWorkflowExecutionRunnable workflowExecutionRunnable = doCreateWorkflowExecutionRunnable(command);
59+
WorkflowInstanceMetrics.recordWorkflowInstanceGenerateTime(System.currentTimeMillis() - startTime);
60+
return workflowExecutionRunnable;
5761
}
5862

5963
/**

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,9 @@ protected void transformWorkflowInstanceState(final IWorkflowExecutionRunnable w
176176
workflowInstanceDao.updateById(workflowInstance);
177177
log.info("Success set WorkflowExecuteRunnable: {} state from: {} to {}",
178178
workflowInstance.getName(), originState.name(), targetState.name());
179+
WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode(
180+
targetState,
181+
String.valueOf(workflowInstance.getWorkflowDefinitionCode()));
179182
} catch (Exception ex) {
180183
workflowInstance.setState(originState);
181184
throw ex;

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -50,39 +50,12 @@ public class TaskMetrics {
5050

5151
}
5252

53-
private final Counter taskDispatchCounter =
54-
Counter.builder("ds.task.dispatch.count")
55-
.description("Task dispatch count")
56-
.register(Metrics.globalRegistry);
57-
58-
private final Counter taskDispatchFailCounter =
59-
Counter.builder("ds.task.dispatch.failure.count")
60-
.description("Task dispatch failures count, retried ones included")
61-
.register(Metrics.globalRegistry);
62-
63-
private final Counter taskDispatchErrorCounter =
64-
Counter.builder("ds.task.dispatch.error.count")
65-
.description("Number of errors during task dispatch")
66-
.register(Metrics.globalRegistry);
67-
6853
public synchronized void registerTaskPrepared(Supplier<Number> consumer) {
6954
Gauge.builder("ds.task.prepared", consumer)
7055
.description("Task prepared count")
7156
.register(Metrics.globalRegistry);
7257
}
7358

74-
public void incTaskDispatchFailed(int failedCount) {
75-
taskDispatchFailCounter.increment(failedCount);
76-
}
77-
78-
public void incTaskDispatchError() {
79-
taskDispatchErrorCounter.increment();
80-
}
81-
82-
public void incTaskDispatch() {
83-
taskDispatchCounter.increment();
84-
}
85-
8659
public void incTaskInstanceByState(final String state) {
8760
if (taskInstanceCounters.get(state) == null) {
8861
return;
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.metrics;
19+
20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
import static org.junit.jupiter.api.Assertions.assertNotNull;
22+
import static org.junit.jupiter.api.Assertions.assertTrue;
23+
24+
import java.util.Arrays;
25+
import java.util.List;
26+
27+
import org.junit.jupiter.api.Test;
28+
29+
import io.micrometer.core.instrument.Counter;
30+
import io.micrometer.core.instrument.Metrics;
31+
32+
class TaskMetricsTest {
33+
34+
@Test
35+
void testIncTaskInstanceByState_validStates() {
36+
List<String> validStates = Arrays.asList(
37+
"submit", "timeout", "finish", "failover", "retry", "dispatch", "success", "kill", "fail", "stop");
38+
39+
for (String state : validStates) {
40+
Counter counter = Metrics.globalRegistry.find("ds.task.instance.count")
41+
.tag("state", state)
42+
.counter();
43+
assertNotNull(counter, "Counter should exist for state: " + state);
44+
double before = counter.count();
45+
TaskMetrics.incTaskInstanceByState(state);
46+
assertEquals(before + 1, counter.count(), 0.001,
47+
"Counter should be incremented for state: " + state);
48+
}
49+
}
50+
51+
@Test
52+
void testIncTaskInstanceByState_invalidState() {
53+
TaskMetrics.incTaskInstanceByState("nonexistent_state");
54+
Counter counter = Metrics.globalRegistry.find("ds.task.instance.count")
55+
.tag("state", "nonexistent_state")
56+
.counter();
57+
assertTrue(counter == null || counter.count() == 0,
58+
"Counter should not exist or be zero for invalid state");
59+
}
60+
61+
@Test
62+
void testIncTaskInstanceByState_multipleIncrements() {
63+
Counter counter = Metrics.globalRegistry.find("ds.task.instance.count")
64+
.tag("state", "submit")
65+
.counter();
66+
assertNotNull(counter);
67+
double before = counter.count();
68+
69+
TaskMetrics.incTaskInstanceByState("submit");
70+
TaskMetrics.incTaskInstanceByState("submit");
71+
TaskMetrics.incTaskInstanceByState("submit");
72+
73+
assertEquals(before + 3, counter.count(), 0.001,
74+
"Counter should be incremented by 3 after three calls");
75+
}
76+
77+
@Test
78+
void testRegisterTaskPrepared() {
79+
TaskMetrics.registerTaskPrepared(() -> 5);
80+
assertNotNull(Metrics.globalRegistry.find("ds.task.prepared").gauge(),
81+
"Task prepared gauge should be registered");
82+
}
83+
84+
}
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.metrics;
19+
20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
import static org.junit.jupiter.api.Assertions.assertNotNull;
22+
import static org.junit.jupiter.api.Assertions.assertNull;
23+
24+
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
25+
26+
import org.junit.jupiter.api.Test;
27+
28+
import io.micrometer.core.instrument.Counter;
29+
import io.micrometer.core.instrument.Metrics;
30+
import io.micrometer.core.instrument.Timer;
31+
32+
class WorkflowInstanceMetricsTest {
33+
34+
@Test
35+
void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_submitState() {
36+
String defCode = "test_submit_1";
37+
WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode(
38+
WorkflowExecutionStatus.SUBMITTED_SUCCESS, defCode);
39+
Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count")
40+
.tag("state", "submit")
41+
.tag("workflow.definition.code", defCode)
42+
.counter();
43+
assertNotNull(counter, "Counter should be registered for submit state");
44+
assertEquals(1, counter.count(), 0.001);
45+
}
46+
47+
@Test
48+
void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_failureState() {
49+
String defCode = "test_failure_1";
50+
WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode(
51+
WorkflowExecutionStatus.FAILURE, defCode);
52+
Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count")
53+
.tag("state", "fail")
54+
.tag("workflow.definition.code", defCode)
55+
.counter();
56+
assertNotNull(counter, "Counter should be registered for fail state");
57+
assertEquals(1, counter.count(), 0.001);
58+
}
59+
60+
@Test
61+
void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_successState() {
62+
String defCode = "test_success_1";
63+
WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode(
64+
WorkflowExecutionStatus.SUCCESS, defCode);
65+
Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count")
66+
.tag("state", "success")
67+
.tag("workflow.definition.code", defCode)
68+
.counter();
69+
assertNotNull(counter, "Counter should be registered for success state");
70+
assertEquals(1, counter.count(), 0.001);
71+
}
72+
73+
@Test
74+
void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_stopState() {
75+
String defCode = "test_stop_1";
76+
WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode(
77+
WorkflowExecutionStatus.STOP, defCode);
78+
Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count")
79+
.tag("state", "stop")
80+
.tag("workflow.definition.code", defCode)
81+
.counter();
82+
assertNotNull(counter, "Counter should be registered for stop state");
83+
assertEquals(1, counter.count(), 0.001);
84+
}
85+
86+
@Test
87+
void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_pauseState() {
88+
String defCode = "test_pause_1";
89+
WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode(
90+
WorkflowExecutionStatus.PAUSE, defCode);
91+
Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count")
92+
.tag("state", "pause")
93+
.tag("workflow.definition.code", defCode)
94+
.counter();
95+
assertNotNull(counter, "Counter should be registered for pause state");
96+
assertEquals(1, counter.count(), 0.001);
97+
}
98+
99+
@Test
100+
void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_failoverState() {
101+
String defCode = "test_failover_1";
102+
WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode(
103+
WorkflowExecutionStatus.FAILOVER, defCode);
104+
Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count")
105+
.tag("state", "failover")
106+
.tag("workflow.definition.code", defCode)
107+
.counter();
108+
assertNotNull(counter, "Counter should be registered for failover state");
109+
assertEquals(1, counter.count(), 0.001);
110+
}
111+
112+
@Test
113+
void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_defaultMapping() {
114+
String defCode = "test_running_1";
115+
WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode(
116+
WorkflowExecutionStatus.RUNNING_EXECUTION, defCode);
117+
Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count")
118+
.tag("state", "running_execution")
119+
.tag("workflow.definition.code", defCode)
120+
.counter();
121+
assertNotNull(counter, "Counter should be registered for default-mapped state");
122+
assertEquals(1, counter.count(), 0.001);
123+
}
124+
125+
@Test
126+
void testRecordCommandQueryTime() {
127+
WorkflowInstanceMetrics.recordCommandQueryTime(100L);
128+
Timer timer = Metrics.globalRegistry.find("ds.workflow.command.query.duration").timer();
129+
assertNotNull(timer, "Command query timer should be registered");
130+
assertEquals(1, timer.count(), "Timer should have recorded one event");
131+
}
132+
133+
@Test
134+
void testRecordWorkflowInstanceGenerateTime() {
135+
WorkflowInstanceMetrics.recordWorkflowInstanceGenerateTime(200L);
136+
Timer timer = Metrics.globalRegistry.find("ds.workflow.instance.generate.duration").timer();
137+
assertNotNull(timer, "Workflow instance generate timer should be registered");
138+
assertEquals(1, timer.count(), "Timer should have recorded one event");
139+
}
140+
141+
@Test
142+
void testRegisterWorkflowInstanceRunningGauge() {
143+
WorkflowInstanceMetrics.registerWorkflowInstanceRunningGauge(() -> 10);
144+
assertNotNull(Metrics.globalRegistry.find("ds.workflow.instance.running").gauge(),
145+
"Running gauge should be registered");
146+
}
147+
148+
@Test
149+
void testRegisterWorkflowInstanceResubmitGauge() {
150+
WorkflowInstanceMetrics.registerWorkflowInstanceResubmitGauge(() -> 3);
151+
assertNotNull(Metrics.globalRegistry.find("ds.workflow.instance.resubmit").gauge(),
152+
"Resubmit gauge should be registered");
153+
}
154+
155+
@Test
156+
void testCleanUpWorkflowInstanceCountMetricsByDefinitionCode() {
157+
String defCode = "99999";
158+
WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode(
159+
WorkflowExecutionStatus.SUCCESS, defCode);
160+
Counter counterBefore = Metrics.globalRegistry.find("ds.workflow.instance.count")
161+
.tag("state", "success")
162+
.tag("workflow.definition.code", defCode)
163+
.counter();
164+
assertNotNull(counterBefore, "Counter should exist before cleanup");
165+
166+
WorkflowInstanceMetrics.cleanUpWorkflowInstanceCountMetricsByDefinitionCode(99999L);
167+
168+
Counter counterAfter = Metrics.globalRegistry.find("ds.workflow.instance.count")
169+
.tag("state", "success")
170+
.tag("workflow.definition.code", defCode)
171+
.counter();
172+
assertNull(counterAfter, "Counter should be removed after cleanup");
173+
}
174+
175+
}

0 commit comments

Comments
 (0)