Skip to content

Commit 80f792f

Browse files
committed
jmh
1 parent 18ec741 commit 80f792f

File tree

5 files changed

+204
-85
lines changed

5 files changed

+204
-85
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.benchmark.common.util.concurrent;
11+
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.common.util.concurrent.EsExecutors;
14+
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
15+
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
16+
import org.elasticsearch.common.util.concurrent.ThreadContext;
17+
import org.openjdk.jmh.annotations.Benchmark;
18+
import org.openjdk.jmh.annotations.BenchmarkMode;
19+
import org.openjdk.jmh.annotations.Fork;
20+
import org.openjdk.jmh.annotations.Measurement;
21+
import org.openjdk.jmh.annotations.Mode;
22+
import org.openjdk.jmh.annotations.OutputTimeUnit;
23+
import org.openjdk.jmh.annotations.Param;
24+
import org.openjdk.jmh.annotations.Scope;
25+
import org.openjdk.jmh.annotations.State;
26+
import org.openjdk.jmh.annotations.Warmup;
27+
import org.openjdk.jmh.infra.Blackhole;
28+
29+
import java.util.Random;
30+
import java.util.concurrent.CountDownLatch;
31+
import java.util.concurrent.Executors;
32+
import java.util.concurrent.TimeUnit;
33+
34+
@Warmup(iterations = 0)
35+
@Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
36+
@BenchmarkMode(Mode.AverageTime)
37+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
38+
@Fork(1)
39+
@State(Scope.Thread)
40+
public class ThreadPoolUtilizationBenchmark {
41+
private final Random random = new Random();
42+
@Param({ "8", "16", "32", "64" })
43+
private int poolSize;
44+
@Param({ "10000" })
45+
private int tasksNum;
46+
@Param({ "1" })
47+
private int workerTimeMinMs;
48+
@Param({ "5" })
49+
private int workerTimeMaxMs;
50+
private TaskExecutionTimeTrackingEsThreadPoolExecutor executor;
51+
52+
public EsThreadPoolExecutor newExecutor(boolean tracking) {
53+
var conf = EsExecutors.TaskTrackingConfig.builder();
54+
if (tracking) {
55+
conf.trackOngoingTasks().trackUtilization();
56+
}
57+
return EsExecutors.newFixed(
58+
"bench",
59+
poolSize,
60+
tasksNum,
61+
Executors.defaultThreadFactory(),
62+
new ThreadContext(Settings.EMPTY),
63+
conf.build()
64+
);
65+
}
66+
67+
private void runTasks(EsThreadPoolExecutor executor, Blackhole bh) throws InterruptedException {
68+
try {
69+
var completedTasks = new CountDownLatch(tasksNum);
70+
for (var i = 0; i < tasksNum; i++) {
71+
executor.execute(() -> {
72+
try {
73+
Thread.sleep(random.nextInt(workerTimeMinMs, workerTimeMaxMs));
74+
} catch (InterruptedException e) {
75+
throw new RuntimeException(e);
76+
} finally {
77+
completedTasks.countDown();
78+
}
79+
});
80+
}
81+
completedTasks.await();
82+
} finally {
83+
executor.shutdown();
84+
executor.awaitTermination(0, TimeUnit.MILLISECONDS);
85+
}
86+
}
87+
88+
@Benchmark
89+
public void trackingExecutor(Blackhole bh) throws InterruptedException {
90+
runTasks(newExecutor(true), bh);
91+
}
92+
93+
@Benchmark
94+
public void nonTrackingExecutor(Blackhole bh) throws InterruptedException {
95+
runTasks(newExecutor(false), bh);
96+
}
97+
}

server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,7 @@ public static class TaskTrackingConfig {
581581
public static final double DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST = 0.3;
582582

583583
private final boolean trackExecutionTime;
584+
private final boolean trackUtilization;
584585
private final boolean trackOngoingTasks;
585586
private final boolean trackMaxQueueLatency;
586587
private final double executionTimeEwmaAlpha;
@@ -589,9 +590,11 @@ public static class TaskTrackingConfig {
589590
false,
590591
false,
591592
false,
593+
false,
592594
DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST
593595
);
594596
public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig(
597+
true,
595598
true,
596599
false,
597600
false,
@@ -606,11 +609,13 @@ public static class TaskTrackingConfig {
606609
*/
607610
private TaskTrackingConfig(
608611
boolean trackExecutionTime,
612+
boolean trackUtilization,
609613
boolean trackOngoingTasks,
610614
boolean trackMaxQueueLatency,
611615
double executionTimeEWMAAlpha
612616
) {
613617
this.trackExecutionTime = trackExecutionTime;
618+
this.trackUtilization = trackUtilization;
614619
this.trackOngoingTasks = trackOngoingTasks;
615620
this.trackMaxQueueLatency = trackMaxQueueLatency;
616621
this.executionTimeEwmaAlpha = executionTimeEWMAAlpha;
@@ -628,6 +633,10 @@ public boolean trackMaxQueueLatency() {
628633
return trackMaxQueueLatency;
629634
}
630635

636+
public boolean TrackUtilization() {
637+
return trackUtilization;
638+
}
639+
631640
public double getExecutionTimeEwmaAlpha() {
632641
return executionTimeEwmaAlpha;
633642
}
@@ -638,6 +647,7 @@ public static Builder builder() {
638647

639648
public static class Builder {
640649
private boolean trackExecutionTime = false;
650+
private boolean trackUtilization = false;
641651
private boolean trackOngoingTasks = false;
642652
private boolean trackMaxQueueLatency = false;
643653
private double ewmaAlpha = DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST;
@@ -650,6 +660,11 @@ public Builder trackExecutionTime(double alpha) {
650660
return this;
651661
}
652662

663+
public Builder trackUtilization() {
664+
trackUtilization = true;
665+
return this;
666+
}
667+
653668
public Builder trackOngoingTasks() {
654669
trackOngoingTasks = true;
655670
return this;
@@ -661,7 +676,7 @@ public Builder trackMaxQueueLatency() {
661676
}
662677

663678
public TaskTrackingConfig build() {
664-
return new TaskTrackingConfig(trackExecutionTime, trackOngoingTasks, trackMaxQueueLatency, ewmaAlpha);
679+
return new TaskTrackingConfig(trackExecutionTime, trackUtilization, trackOngoingTasks, trackMaxQueueLatency, ewmaAlpha);
665680
}
666681
}
667682
}

server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
5050
private final Map<Runnable, Long> ongoingTasks = new ConcurrentHashMap<>();
5151
private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS);
5252
private final boolean trackMaxQueueLatency;
53+
private final boolean trackUtilization;
5354
private LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator(Long::max, 0);
5455

5556
public enum UtilizationTrackingPurpose {
@@ -59,8 +60,9 @@ public enum UtilizationTrackingPurpose {
5960

6061
private volatile UtilizationTracker apmUtilizationTracker = new UtilizationTracker();
6162
private volatile UtilizationTracker allocationUtilizationTracker = new UtilizationTracker();
63+
private final FramedTimeTracker framedTimeTracker= new FramedTimeTracker(1_000);
6264

63-
TaskExecutionTimeTrackingEsThreadPoolExecutor(
65+
public TaskExecutionTimeTrackingEsThreadPoolExecutor(
6466
String name,
6567
int corePoolSize,
6668
int maximumPoolSize,
@@ -79,6 +81,7 @@ public enum UtilizationTrackingPurpose {
7981
this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0);
8082
this.trackOngoingTasks = trackingConfig.trackOngoingTasks();
8183
this.trackMaxQueueLatency = trackingConfig.trackMaxQueueLatency();
84+
this.trackUtilization = trackingConfig.TrackUtilization();
8285
}
8386

8487
public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadPoolName) {
@@ -190,6 +193,9 @@ protected void beforeExecute(Thread t, Runnable r) {
190193
if (trackMaxQueueLatency) {
191194
maxQueueLatencyMillisSinceLastPoll.accumulate(queueLatencyMillis);
192195
}
196+
if (trackUtilization) {
197+
framedTimeTracker.startTask(System.nanoTime());
198+
}
193199
}
194200

195201
@Override
@@ -214,6 +220,9 @@ protected void afterExecute(Runnable r, Throwable t) {
214220
executionEWMA.addValue(taskExecutionNanos);
215221
totalExecutionTime.add(taskExecutionNanos);
216222
}
223+
if (trackUtilization) {
224+
framedTimeTracker.endTask(System.nanoTime());
225+
}
217226
} finally {
218227
// if trackOngoingTasks is false -> ongoingTasks must be empty
219228
assert trackOngoingTasks || ongoingTasks.isEmpty();
@@ -281,23 +290,22 @@ public synchronized double pollUtilization() {
281290
}
282291
}
283292

284-
static class FramedExecutionTime {
293+
static class FramedTimeTracker {
285294
private final Supplier<Long> timeNow;
286295
long ongoingTasks;
287296
long interval;
288297
long currentFrame;
289-
long previousFrame;
290298
long currentTime;
291299
long previousTime;
292300

293301
// for testing
294-
FramedExecutionTime(long interval, Supplier<Long> timeNow) {
302+
FramedTimeTracker(long interval, Supplier<Long> timeNow) {
295303
assert interval > 0;
296304
this.interval = interval;
297305
this.timeNow = timeNow;
298306
}
299307

300-
FramedExecutionTime(long interval) {
308+
FramedTimeTracker(long interval) {
301309
assert interval > 0;
302310
this.interval = interval;
303311
this.timeNow = System::nanoTime;
@@ -314,13 +322,12 @@ private void updateFrame0(long nowTime) {
314322
var now = nowTime / interval;
315323
if (currentFrame < now) {
316324
if (currentFrame == now - 1) {
317-
previousTime = currentTime;
325+
previousTime = currentTime; //
318326
} else {
319327
previousTime = ongoingTasks * interval;
320328
}
321329
currentTime = ongoingTasks * interval;
322330
currentFrame = now;
323-
previousFrame = now - 1;
324331
}
325332
}
326333

@@ -338,7 +345,7 @@ synchronized void endTask(long endTime) {
338345
--ongoingTasks;
339346
}
340347

341-
synchronized long getPreviousFrameExecutionTime() {
348+
synchronized long previousFrameTime() {
342349
updateFrame0(timeNow.get());
343350
return previousTime;
344351
}

server/src/test/java/org/elasticsearch/common/util/concurrent/FramedExecutionTimeTests.java

Lines changed: 0 additions & 76 deletions
This file was deleted.

0 commit comments

Comments
 (0)