Skip to content

Commit 8345da4

Browse files
committed
tests
1 parent 80f792f commit 8345da4

File tree

7 files changed

+273
-274
lines changed

7 files changed

+273
-274
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java

Lines changed: 55 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,37 +22,44 @@
2222
import org.openjdk.jmh.annotations.OutputTimeUnit;
2323
import org.openjdk.jmh.annotations.Param;
2424
import org.openjdk.jmh.annotations.Scope;
25+
import org.openjdk.jmh.annotations.Setup;
2526
import org.openjdk.jmh.annotations.State;
27+
import org.openjdk.jmh.annotations.TearDown;
2628
import org.openjdk.jmh.annotations.Warmup;
2729
import org.openjdk.jmh.infra.Blackhole;
2830

29-
import java.util.Random;
31+
import java.time.Duration;
3032
import java.util.concurrent.CountDownLatch;
3133
import java.util.concurrent.Executors;
3234
import java.util.concurrent.TimeUnit;
3335

34-
@Warmup(iterations = 0)
36+
@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS)
3537
@Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
3638
@BenchmarkMode(Mode.AverageTime)
3739
@OutputTimeUnit(TimeUnit.MILLISECONDS)
3840
@Fork(1)
3941
@State(Scope.Thread)
4042
public class ThreadPoolUtilizationBenchmark {
41-
private final Random random = new Random();
42-
@Param({ "8", "16", "32", "64" })
43+
44+
@Param({ "false", "true" })
45+
private boolean trackUtilization;
46+
47+
@Param({ "4", "8", "16" })
4348
private int poolSize;
44-
@Param({ "10000" })
49+
50+
@Param({ "1000000" })
4551
private int tasksNum;
46-
@Param({ "1" })
47-
private int workerTimeMinMs;
48-
@Param({ "5" })
49-
private int workerTimeMaxMs;
50-
private TaskExecutionTimeTrackingEsThreadPoolExecutor executor;
5152

52-
public EsThreadPoolExecutor newExecutor(boolean tracking) {
53+
@Param({ "10" }) // 10ms is aggressive interval, it increases frame updates on FramedTimeTracker, normally we run at 30/60
54+
// seconds
55+
private int utilizationIntervalMs;
56+
57+
private EsThreadPoolExecutor executor;
58+
59+
private EsThreadPoolExecutor newExecutor(boolean tracking) {
5360
var conf = EsExecutors.TaskTrackingConfig.builder();
5461
if (tracking) {
55-
conf.trackOngoingTasks().trackUtilization();
62+
conf.trackExecutionTime(0.3).trackUtilization(Duration.ofMillis(utilizationIntervalMs));
5663
}
5764
return EsExecutors.newFixed(
5865
"bench",
@@ -64,34 +71,48 @@ public EsThreadPoolExecutor newExecutor(boolean tracking) {
6471
);
6572
}
6673

67-
private void runTasks(EsThreadPoolExecutor executor, Blackhole bh) throws InterruptedException {
68-
try {
69-
var completedTasks = new CountDownLatch(tasksNum);
70-
for (var i = 0; i < tasksNum; i++) {
71-
executor.execute(() -> {
72-
try {
73-
Thread.sleep(random.nextInt(workerTimeMinMs, workerTimeMaxMs));
74-
} catch (InterruptedException e) {
75-
throw new RuntimeException(e);
76-
} finally {
77-
completedTasks.countDown();
78-
}
79-
});
74+
@Setup
75+
public void setup() {
76+
if (trackUtilization) {
77+
var exec = newExecutor(true);
78+
if (exec instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor trackingExecutor) {
79+
if (trackingExecutor.trackingConfig().trackUtilization() == false) {
80+
throw new IllegalStateException("utilization tracking must be enabled");
81+
} else {
82+
executor = trackingExecutor;
83+
}
84+
} else {
85+
throw new IllegalStateException("must be tracking executor");
86+
}
87+
} else {
88+
var exec = newExecutor(false);
89+
if (exec instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor) {
90+
throw new IllegalStateException("must be non-tracking executor");
8091
}
81-
completedTasks.await();
82-
} finally {
83-
executor.shutdown();
84-
executor.awaitTermination(0, TimeUnit.MILLISECONDS);
92+
executor = exec;
8593
}
8694
}
8795

88-
@Benchmark
89-
public void trackingExecutor(Blackhole bh) throws InterruptedException {
90-
runTasks(newExecutor(true), bh);
96+
@TearDown
97+
public void tearDown() throws InterruptedException {
98+
executor.shutdown();
99+
executor.awaitTermination(0, TimeUnit.MILLISECONDS);
91100
}
92101

93102
@Benchmark
94-
public void nonTrackingExecutor(Blackhole bh) throws InterruptedException {
95-
runTasks(newExecutor(false), bh);
103+
public void run(Blackhole bh) throws InterruptedException {
104+
var completedTasks = new CountDownLatch(tasksNum);
105+
for (var i = 0; i < tasksNum; i++) {
106+
executor.execute(() -> {
107+
// busy cycles for cpu
108+
var r = 0;
109+
for (var j = 0; j < 1000; j++) {
110+
r += j * 2;
111+
}
112+
bh.consume(r);
113+
completedTasks.countDown();
114+
});
115+
}
116+
completedTasks.await();
96117
}
97118
}

server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 33 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.core.SuppressForbidden;
1818
import org.elasticsearch.node.Node;
1919

20+
import java.time.Duration;
2021
import java.util.List;
2122
import java.util.Optional;
2223
import java.util.concurrent.AbstractExecutorService;
@@ -576,71 +577,45 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
576577
}
577578
}
578579

579-
public static class TaskTrackingConfig {
580+
/**
581+
* @param trackExecutionTime Whether to track execution stats
582+
* @param trackUtilization enables thread-pool utilization metrics
583+
* @param utilizationInterval when utilization is enabled, specifies interval for measurement
584+
* @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks
585+
* @param trackMaxQueueLatency Whether to track max queue latency.
586+
* @param executionTimeEwmaAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage).
587+
*/
588+
public record TaskTrackingConfig(
589+
boolean trackExecutionTime,
590+
boolean trackUtilization,
591+
Duration utilizationInterval,
592+
boolean trackOngoingTasks,
593+
boolean trackMaxQueueLatency,
594+
double executionTimeEwmaAlpha
595+
) {
596+
580597
// This is a random starting point alpha.
581598
public static final double DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST = 0.3;
582-
583-
private final boolean trackExecutionTime;
584-
private final boolean trackUtilization;
585-
private final boolean trackOngoingTasks;
586-
private final boolean trackMaxQueueLatency;
587-
private final double executionTimeEwmaAlpha;
599+
public static final Duration DEFAULT_UTILIZATION_INTERVAL = Duration.ofSeconds(30);
588600

589601
public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig(
590602
false,
591603
false,
604+
DEFAULT_UTILIZATION_INTERVAL,
592605
false,
593606
false,
594607
DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST
595608
);
609+
596610
public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig(
597611
true,
598612
true,
613+
DEFAULT_UTILIZATION_INTERVAL,
599614
false,
600615
false,
601616
DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST
602617
);
603618

604-
/**
605-
* @param trackExecutionTime Whether to track execution stats
606-
* @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks
607-
* @param trackMaxQueueLatency Whether to track max queue latency.
608-
* @param executionTimeEWMAAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage).
609-
*/
610-
private TaskTrackingConfig(
611-
boolean trackExecutionTime,
612-
boolean trackUtilization,
613-
boolean trackOngoingTasks,
614-
boolean trackMaxQueueLatency,
615-
double executionTimeEWMAAlpha
616-
) {
617-
this.trackExecutionTime = trackExecutionTime;
618-
this.trackUtilization = trackUtilization;
619-
this.trackOngoingTasks = trackOngoingTasks;
620-
this.trackMaxQueueLatency = trackMaxQueueLatency;
621-
this.executionTimeEwmaAlpha = executionTimeEWMAAlpha;
622-
}
623-
624-
public boolean trackExecutionTime() {
625-
return trackExecutionTime;
626-
}
627-
628-
public boolean trackOngoingTasks() {
629-
return trackOngoingTasks;
630-
}
631-
632-
public boolean trackMaxQueueLatency() {
633-
return trackMaxQueueLatency;
634-
}
635-
636-
public boolean TrackUtilization() {
637-
return trackUtilization;
638-
}
639-
640-
public double getExecutionTimeEwmaAlpha() {
641-
return executionTimeEwmaAlpha;
642-
}
643-
644619
public static Builder builder() {
645620
return new Builder();
646621
}
@@ -651,6 +626,7 @@ public static class Builder {
651626
private boolean trackOngoingTasks = false;
652627
private boolean trackMaxQueueLatency = false;
653628
private double ewmaAlpha = DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST;
629+
private Duration utilizationInterval = DEFAULT_UTILIZATION_INTERVAL;
654630

655631
public Builder() {}
656632

@@ -660,8 +636,9 @@ public Builder trackExecutionTime(double alpha) {
660636
return this;
661637
}
662638

663-
public Builder trackUtilization() {
639+
public Builder trackUtilization(Duration interval) {
664640
trackUtilization = true;
641+
utilizationInterval = interval;
665642
return this;
666643
}
667644

@@ -676,7 +653,14 @@ public Builder trackMaxQueueLatency() {
676653
}
677654

678655
public TaskTrackingConfig build() {
679-
return new TaskTrackingConfig(trackExecutionTime, trackUtilization, trackOngoingTasks, trackMaxQueueLatency, ewmaAlpha);
656+
return new TaskTrackingConfig(
657+
trackExecutionTime,
658+
trackUtilization,
659+
utilizationInterval,
660+
trackOngoingTasks,
661+
trackMaxQueueLatency,
662+
ewmaAlpha
663+
);
680664
}
681665
}
682666
}

0 commit comments

Comments
 (0)