Skip to content

Commit 18ec741

Browse files
committed
framed-time-tracker
1 parent 80faf64 commit 18ec741

File tree

2 files changed

+140
-0
lines changed

2 files changed

+140
-0
lines changed

server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.atomic.LongAccumulator;
3131
import java.util.concurrent.atomic.LongAdder;
3232
import java.util.function.Function;
33+
import java.util.function.Supplier;
3334

3435
import static org.elasticsearch.threadpool.ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE_TIME;
3536
import static org.elasticsearch.threadpool.ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION;
@@ -279,4 +280,67 @@ public synchronized double pollUtilization() {
279280
return utilizationSinceLastPoll;
280281
}
281282
}
283+
284+
static class FramedExecutionTime {
285+
private final Supplier<Long> timeNow;
286+
long ongoingTasks;
287+
long interval;
288+
long currentFrame;
289+
long previousFrame;
290+
long currentTime;
291+
long previousTime;
292+
293+
// for testing
294+
FramedExecutionTime(long interval, Supplier<Long> timeNow) {
295+
assert interval > 0;
296+
this.interval = interval;
297+
this.timeNow = timeNow;
298+
}
299+
300+
FramedExecutionTime(long interval) {
301+
assert interval > 0;
302+
this.interval = interval;
303+
this.timeNow = System::nanoTime;
304+
}
305+
306+
/**
307+
* update current and previous frames to current time
308+
*/
309+
synchronized void updateFrame() {
310+
updateFrame0(timeNow.get());
311+
}
312+
313+
private void updateFrame0(long nowTime) {
314+
var now = nowTime / interval;
315+
if (currentFrame < now) {
316+
if (currentFrame == now - 1) {
317+
previousTime = currentTime;
318+
} else {
319+
previousTime = ongoingTasks * interval;
320+
}
321+
currentTime = ongoingTasks * interval;
322+
currentFrame = now;
323+
previousFrame = now - 1;
324+
}
325+
}
326+
327+
synchronized void startTask(long startTime) {
328+
updateFrame0(startTime);
329+
// assume task will run indefinitely, in this case at least till end of interval
330+
currentTime += (currentFrame + 1) * interval - startTime;
331+
++ongoingTasks;
332+
}
333+
334+
synchronized void endTask(long endTime) {
335+
updateFrame0(endTime);
336+
// we already assumed that task will run till end of interval, here we subtract whats left
337+
currentTime -= (currentFrame + 1) * interval - endTime;
338+
--ongoingTasks;
339+
}
340+
341+
synchronized long getPreviousFrameExecutionTime() {
342+
updateFrame0(timeNow.get());
343+
return previousTime;
344+
}
345+
}
282346
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.common.util.concurrent;
11+
12+
import org.elasticsearch.test.ESTestCase;
13+
14+
import java.util.function.Supplier;
15+
16+
import static org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedExecutionTime;
17+
18+
public class FramedExecutionTimeTests extends ESTestCase {
19+
20+
private final FramedExecutionTime framedExecutionTime;
21+
private final FakeTime fakeTime;
22+
23+
public FramedExecutionTimeTests() {
24+
fakeTime = new FakeTime();
25+
framedExecutionTime = new FramedExecutionTime(1L, fakeTime);
26+
}
27+
28+
public void testNoTasks() {
29+
framedExecutionTime.updateFrame();
30+
assertEquals(0, framedExecutionTime.getPreviousFrameExecutionTime());
31+
fakeTime.time += between(1, 100);
32+
assertEquals(0, framedExecutionTime.getPreviousFrameExecutionTime());
33+
}
34+
35+
public void testSingleFrameTask() {
36+
framedExecutionTime.interval = 100;
37+
framedExecutionTime.startTask(10);
38+
framedExecutionTime.endTask(20);
39+
fakeTime.time += framedExecutionTime.interval;
40+
assertEquals(10, framedExecutionTime.getPreviousFrameExecutionTime());
41+
}
42+
43+
public void testTwoFrameTask() {
44+
framedExecutionTime.interval = 100;
45+
var startTime = between(0, 100);
46+
var endTime = startTime + framedExecutionTime.interval;
47+
framedExecutionTime.startTask(startTime);
48+
framedExecutionTime.endTask(endTime);
49+
assertEquals(framedExecutionTime.interval - startTime, framedExecutionTime.getPreviousFrameExecutionTime());
50+
}
51+
52+
public void testMultiFrameTask() {
53+
framedExecutionTime.interval = 10;
54+
framedExecutionTime.startTask(1);
55+
framedExecutionTime.endTask(between(3, 100) * 10L);
56+
assertEquals(framedExecutionTime.interval, framedExecutionTime.getPreviousFrameExecutionTime());
57+
}
58+
59+
public void testOngoingTask() {
60+
framedExecutionTime.interval = 10;
61+
framedExecutionTime.startTask(0);
62+
for (int i = 0; i < between(10, 100); i++) {
63+
fakeTime.time += framedExecutionTime.interval;
64+
assertEquals(framedExecutionTime.interval, framedExecutionTime.getPreviousFrameExecutionTime());
65+
}
66+
}
67+
68+
static class FakeTime implements Supplier<Long> {
69+
long time = 0;
70+
71+
@Override
72+
public Long get() {
73+
return time;
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)