From 18ec741bab67a306dcd7eda3f756bbbfa0025049 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Thu, 24 Jul 2025 20:28:01 -0700 Subject: [PATCH 01/20] framed-time-tracker --- ...utionTimeTrackingEsThreadPoolExecutor.java | 64 ++++++++++++++++ .../concurrent/FramedExecutionTimeTests.java | 76 +++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 server/src/test/java/org/elasticsearch/common/util/concurrent/FramedExecutionTimeTests.java diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 762a8c280b7f3..251d117ad725c 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; +import java.util.function.Supplier; import static org.elasticsearch.threadpool.ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE_TIME; import static org.elasticsearch.threadpool.ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION; @@ -279,4 +280,67 @@ public synchronized double pollUtilization() { return utilizationSinceLastPoll; } } + + static class FramedExecutionTime { + private final Supplier timeNow; + long ongoingTasks; + long interval; + long currentFrame; + long previousFrame; + long currentTime; + long previousTime; + + // for testing + FramedExecutionTime(long interval, Supplier timeNow) { + assert interval > 0; + this.interval = interval; + this.timeNow = timeNow; + } + + FramedExecutionTime(long interval) { + assert interval > 0; + this.interval = interval; + this.timeNow = System::nanoTime; + } + + /** + * update current and previous frames to current time + */ + synchronized void updateFrame() { + updateFrame0(timeNow.get()); + } + + private void updateFrame0(long nowTime) { + var now = nowTime / interval; + if (currentFrame < now) { + if (currentFrame == now - 1) { + previousTime = currentTime; + } else { + previousTime = ongoingTasks * interval; + } + currentTime = ongoingTasks * interval; + currentFrame = now; + previousFrame = now - 1; + } + } + + synchronized void startTask(long startTime) { + updateFrame0(startTime); + // assume task will run indefinitely, in this case at least till end of interval + currentTime += (currentFrame + 1) * interval - startTime; + ++ongoingTasks; + } + + synchronized void endTask(long endTime) { + updateFrame0(endTime); + // we already assumed that task will run till end of interval, here we subtract whats left + currentTime -= (currentFrame + 1) * interval - endTime; + --ongoingTasks; + } + + synchronized long getPreviousFrameExecutionTime() { + updateFrame0(timeNow.get()); + return previousTime; + } + } } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedExecutionTimeTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedExecutionTimeTests.java new file mode 100644 index 0000000000000..b2d27f99b933d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedExecutionTimeTests.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.test.ESTestCase; + +import java.util.function.Supplier; + +import static org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedExecutionTime; + +public class FramedExecutionTimeTests extends ESTestCase { + + private final FramedExecutionTime framedExecutionTime; + private final FakeTime fakeTime; + + public FramedExecutionTimeTests() { + fakeTime = new FakeTime(); + framedExecutionTime = new FramedExecutionTime(1L, fakeTime); + } + + public void testNoTasks() { + framedExecutionTime.updateFrame(); + assertEquals(0, framedExecutionTime.getPreviousFrameExecutionTime()); + fakeTime.time += between(1, 100); + assertEquals(0, framedExecutionTime.getPreviousFrameExecutionTime()); + } + + public void testSingleFrameTask() { + framedExecutionTime.interval = 100; + framedExecutionTime.startTask(10); + framedExecutionTime.endTask(20); + fakeTime.time += framedExecutionTime.interval; + assertEquals(10, framedExecutionTime.getPreviousFrameExecutionTime()); + } + + public void testTwoFrameTask() { + framedExecutionTime.interval = 100; + var startTime = between(0, 100); + var endTime = startTime + framedExecutionTime.interval; + framedExecutionTime.startTask(startTime); + framedExecutionTime.endTask(endTime); + assertEquals(framedExecutionTime.interval - startTime, framedExecutionTime.getPreviousFrameExecutionTime()); + } + + public void testMultiFrameTask() { + framedExecutionTime.interval = 10; + framedExecutionTime.startTask(1); + framedExecutionTime.endTask(between(3, 100) * 10L); + assertEquals(framedExecutionTime.interval, framedExecutionTime.getPreviousFrameExecutionTime()); + } + + public void testOngoingTask() { + framedExecutionTime.interval = 10; + framedExecutionTime.startTask(0); + for (int i = 0; i < between(10, 100); i++) { + fakeTime.time += framedExecutionTime.interval; + assertEquals(framedExecutionTime.interval, framedExecutionTime.getPreviousFrameExecutionTime()); + } + } + + static class FakeTime implements Supplier { + long time = 0; + + @Override + public Long get() { + return time; + } + } +} From 80f792f88a38e5679a5cf4445c2a09133eedd0e8 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Fri, 25 Jul 2025 12:48:24 -0700 Subject: [PATCH 02/20] jmh --- .../ThreadPoolUtilizationBenchmark.java | 97 +++++++++++++++++++ .../common/util/concurrent/EsExecutors.java | 17 +++- ...utionTimeTrackingEsThreadPoolExecutor.java | 23 +++-- .../concurrent/FramedExecutionTimeTests.java | 76 --------------- .../concurrent/FramedTimeTrackerTests.java | 76 +++++++++++++++ 5 files changed, 204 insertions(+), 85 deletions(-) create mode 100644 benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java delete mode 100644 server/src/test/java/org/elasticsearch/common/util/concurrent/FramedExecutionTimeTests.java create mode 100644 server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java new file mode 100644 index 0000000000000..0e38f40149855 --- /dev/null +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java @@ -0,0 +1,97 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.benchmark.common.util.concurrent; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +@Warmup(iterations = 0) +@Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(1) +@State(Scope.Thread) +public class ThreadPoolUtilizationBenchmark { + private final Random random = new Random(); + @Param({ "8", "16", "32", "64" }) + private int poolSize; + @Param({ "10000" }) + private int tasksNum; + @Param({ "1" }) + private int workerTimeMinMs; + @Param({ "5" }) + private int workerTimeMaxMs; + private TaskExecutionTimeTrackingEsThreadPoolExecutor executor; + + public EsThreadPoolExecutor newExecutor(boolean tracking) { + var conf = EsExecutors.TaskTrackingConfig.builder(); + if (tracking) { + conf.trackOngoingTasks().trackUtilization(); + } + return EsExecutors.newFixed( + "bench", + poolSize, + tasksNum, + Executors.defaultThreadFactory(), + new ThreadContext(Settings.EMPTY), + conf.build() + ); + } + + private void runTasks(EsThreadPoolExecutor executor, Blackhole bh) throws InterruptedException { + try { + var completedTasks = new CountDownLatch(tasksNum); + for (var i = 0; i < tasksNum; i++) { + executor.execute(() -> { + try { + Thread.sleep(random.nextInt(workerTimeMinMs, workerTimeMaxMs)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + completedTasks.countDown(); + } + }); + } + completedTasks.await(); + } finally { + executor.shutdown(); + executor.awaitTermination(0, TimeUnit.MILLISECONDS); + } + } + + @Benchmark + public void trackingExecutor(Blackhole bh) throws InterruptedException { + runTasks(newExecutor(true), bh); + } + + @Benchmark + public void nonTrackingExecutor(Blackhole bh) throws InterruptedException { + runTasks(newExecutor(false), bh); + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index c39ce209bf875..e4c10e656e736 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -581,6 +581,7 @@ public static class TaskTrackingConfig { public static final double DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST = 0.3; private final boolean trackExecutionTime; + private final boolean trackUtilization; private final boolean trackOngoingTasks; private final boolean trackMaxQueueLatency; private final double executionTimeEwmaAlpha; @@ -589,9 +590,11 @@ public static class TaskTrackingConfig { false, false, false, + false, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST ); public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig( + true, true, false, false, @@ -606,11 +609,13 @@ public static class TaskTrackingConfig { */ private TaskTrackingConfig( boolean trackExecutionTime, + boolean trackUtilization, boolean trackOngoingTasks, boolean trackMaxQueueLatency, double executionTimeEWMAAlpha ) { this.trackExecutionTime = trackExecutionTime; + this.trackUtilization = trackUtilization; this.trackOngoingTasks = trackOngoingTasks; this.trackMaxQueueLatency = trackMaxQueueLatency; this.executionTimeEwmaAlpha = executionTimeEWMAAlpha; @@ -628,6 +633,10 @@ public boolean trackMaxQueueLatency() { return trackMaxQueueLatency; } + public boolean TrackUtilization() { + return trackUtilization; + } + public double getExecutionTimeEwmaAlpha() { return executionTimeEwmaAlpha; } @@ -638,6 +647,7 @@ public static Builder builder() { public static class Builder { private boolean trackExecutionTime = false; + private boolean trackUtilization = false; private boolean trackOngoingTasks = false; private boolean trackMaxQueueLatency = false; private double ewmaAlpha = DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST; @@ -650,6 +660,11 @@ public Builder trackExecutionTime(double alpha) { return this; } + public Builder trackUtilization() { + trackUtilization = true; + return this; + } + public Builder trackOngoingTasks() { trackOngoingTasks = true; return this; @@ -661,7 +676,7 @@ public Builder trackMaxQueueLatency() { } public TaskTrackingConfig build() { - return new TaskTrackingConfig(trackExecutionTime, trackOngoingTasks, trackMaxQueueLatency, ewmaAlpha); + return new TaskTrackingConfig(trackExecutionTime, trackUtilization, trackOngoingTasks, trackMaxQueueLatency, ewmaAlpha); } } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 251d117ad725c..f5f92a1edbbdc 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -50,6 +50,7 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea private final Map ongoingTasks = new ConcurrentHashMap<>(); private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS); private final boolean trackMaxQueueLatency; + private final boolean trackUtilization; private LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator(Long::max, 0); public enum UtilizationTrackingPurpose { @@ -59,8 +60,9 @@ public enum UtilizationTrackingPurpose { private volatile UtilizationTracker apmUtilizationTracker = new UtilizationTracker(); private volatile UtilizationTracker allocationUtilizationTracker = new UtilizationTracker(); + private final FramedTimeTracker framedTimeTracker= new FramedTimeTracker(1_000); - TaskExecutionTimeTrackingEsThreadPoolExecutor( + public TaskExecutionTimeTrackingEsThreadPoolExecutor( String name, int corePoolSize, int maximumPoolSize, @@ -79,6 +81,7 @@ public enum UtilizationTrackingPurpose { this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0); this.trackOngoingTasks = trackingConfig.trackOngoingTasks(); this.trackMaxQueueLatency = trackingConfig.trackMaxQueueLatency(); + this.trackUtilization = trackingConfig.TrackUtilization(); } public List setupMetrics(MeterRegistry meterRegistry, String threadPoolName) { @@ -190,6 +193,9 @@ protected void beforeExecute(Thread t, Runnable r) { if (trackMaxQueueLatency) { maxQueueLatencyMillisSinceLastPoll.accumulate(queueLatencyMillis); } + if (trackUtilization) { + framedTimeTracker.startTask(System.nanoTime()); + } } @Override @@ -214,6 +220,9 @@ protected void afterExecute(Runnable r, Throwable t) { executionEWMA.addValue(taskExecutionNanos); totalExecutionTime.add(taskExecutionNanos); } + if (trackUtilization) { + framedTimeTracker.endTask(System.nanoTime()); + } } finally { // if trackOngoingTasks is false -> ongoingTasks must be empty assert trackOngoingTasks || ongoingTasks.isEmpty(); @@ -281,23 +290,22 @@ public synchronized double pollUtilization() { } } - static class FramedExecutionTime { + static class FramedTimeTracker { private final Supplier timeNow; long ongoingTasks; long interval; long currentFrame; - long previousFrame; long currentTime; long previousTime; // for testing - FramedExecutionTime(long interval, Supplier timeNow) { + FramedTimeTracker(long interval, Supplier timeNow) { assert interval > 0; this.interval = interval; this.timeNow = timeNow; } - FramedExecutionTime(long interval) { + FramedTimeTracker(long interval) { assert interval > 0; this.interval = interval; this.timeNow = System::nanoTime; @@ -314,13 +322,12 @@ private void updateFrame0(long nowTime) { var now = nowTime / interval; if (currentFrame < now) { if (currentFrame == now - 1) { - previousTime = currentTime; + previousTime = currentTime; // } else { previousTime = ongoingTasks * interval; } currentTime = ongoingTasks * interval; currentFrame = now; - previousFrame = now - 1; } } @@ -338,7 +345,7 @@ synchronized void endTask(long endTime) { --ongoingTasks; } - synchronized long getPreviousFrameExecutionTime() { + synchronized long previousFrameTime() { updateFrame0(timeNow.get()); return previousTime; } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedExecutionTimeTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedExecutionTimeTests.java deleted file mode 100644 index b2d27f99b933d..0000000000000 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedExecutionTimeTests.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.common.util.concurrent; - -import org.elasticsearch.test.ESTestCase; - -import java.util.function.Supplier; - -import static org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedExecutionTime; - -public class FramedExecutionTimeTests extends ESTestCase { - - private final FramedExecutionTime framedExecutionTime; - private final FakeTime fakeTime; - - public FramedExecutionTimeTests() { - fakeTime = new FakeTime(); - framedExecutionTime = new FramedExecutionTime(1L, fakeTime); - } - - public void testNoTasks() { - framedExecutionTime.updateFrame(); - assertEquals(0, framedExecutionTime.getPreviousFrameExecutionTime()); - fakeTime.time += between(1, 100); - assertEquals(0, framedExecutionTime.getPreviousFrameExecutionTime()); - } - - public void testSingleFrameTask() { - framedExecutionTime.interval = 100; - framedExecutionTime.startTask(10); - framedExecutionTime.endTask(20); - fakeTime.time += framedExecutionTime.interval; - assertEquals(10, framedExecutionTime.getPreviousFrameExecutionTime()); - } - - public void testTwoFrameTask() { - framedExecutionTime.interval = 100; - var startTime = between(0, 100); - var endTime = startTime + framedExecutionTime.interval; - framedExecutionTime.startTask(startTime); - framedExecutionTime.endTask(endTime); - assertEquals(framedExecutionTime.interval - startTime, framedExecutionTime.getPreviousFrameExecutionTime()); - } - - public void testMultiFrameTask() { - framedExecutionTime.interval = 10; - framedExecutionTime.startTask(1); - framedExecutionTime.endTask(between(3, 100) * 10L); - assertEquals(framedExecutionTime.interval, framedExecutionTime.getPreviousFrameExecutionTime()); - } - - public void testOngoingTask() { - framedExecutionTime.interval = 10; - framedExecutionTime.startTask(0); - for (int i = 0; i < between(10, 100); i++) { - fakeTime.time += framedExecutionTime.interval; - assertEquals(framedExecutionTime.interval, framedExecutionTime.getPreviousFrameExecutionTime()); - } - } - - static class FakeTime implements Supplier { - long time = 0; - - @Override - public Long get() { - return time; - } - } -} diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java new file mode 100644 index 0000000000000..0b2075524ee1b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.test.ESTestCase; + +import java.util.function.Supplier; + +import static org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedTimeTracker; + +public class FramedTimeTrackerTests extends ESTestCase { + + private final FramedTimeTracker framedTimeTracker; + private final FakeTime fakeTime; + + public FramedTimeTrackerTests() { + fakeTime = new FakeTime(); + framedTimeTracker = new FramedTimeTracker(1L, fakeTime); + } + + public void testNoTasks() { + framedTimeTracker.updateFrame(); + assertEquals(0, framedTimeTracker.previousFrameTime()); + fakeTime.time += between(1, 100); + assertEquals(0, framedTimeTracker.previousFrameTime()); + } + + public void testSingleFrameTask() { + framedTimeTracker.interval = 100; + framedTimeTracker.startTask(10); + framedTimeTracker.endTask(20); + fakeTime.time += framedTimeTracker.interval; + assertEquals(10, framedTimeTracker.previousFrameTime()); + } + + public void testTwoFrameTask() { + framedTimeTracker.interval = 100; + var startTime = between(0, 100); + var endTime = startTime + framedTimeTracker.interval; + framedTimeTracker.startTask(startTime); + framedTimeTracker.endTask(endTime); + assertEquals(framedTimeTracker.interval - startTime, framedTimeTracker.previousFrameTime()); + } + + public void testMultiFrameTask() { + framedTimeTracker.interval = 10; + framedTimeTracker.startTask(1); + framedTimeTracker.endTask(between(3, 100) * 10L); + assertEquals(framedTimeTracker.interval, framedTimeTracker.previousFrameTime()); + } + + public void testOngoingTask() { + framedTimeTracker.interval = 10; + framedTimeTracker.startTask(0); + for (int i = 0; i < between(10, 100); i++) { + fakeTime.time += framedTimeTracker.interval; + assertEquals(framedTimeTracker.interval, framedTimeTracker.previousFrameTime()); + } + } + + static class FakeTime implements Supplier { + long time = 0; + + @Override + public Long get() { + return time; + } + } +} From 8345da4fe16ceb0f59f23720527d01983fad4b07 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Fri, 25 Jul 2025 22:35:30 -0700 Subject: [PATCH 03/20] tests --- .../ThreadPoolUtilizationBenchmark.java | 89 +++++++---- .../common/util/concurrent/EsExecutors.java | 82 ++++------ ...utionTimeTrackingEsThreadPoolExecutor.java | 145 ++++++++---------- .../DefaultBuiltInExecutorBuilders.java | 2 + .../concurrent/FramedTimeTrackerTests.java | 82 +++++++--- ...TimeTrackingEsThreadPoolExecutorTests.java | 64 ++++++++ .../threadpool/ThreadPoolTests.java | 83 ---------- 7 files changed, 273 insertions(+), 274 deletions(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java index 0e38f40149855..2635545a784af 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java @@ -22,37 +22,44 @@ import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; -import java.util.Random; +import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -@Warmup(iterations = 0) +@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) @Fork(1) @State(Scope.Thread) public class ThreadPoolUtilizationBenchmark { - private final Random random = new Random(); - @Param({ "8", "16", "32", "64" }) + + @Param({ "false", "true" }) + private boolean trackUtilization; + + @Param({ "4", "8", "16" }) private int poolSize; - @Param({ "10000" }) + + @Param({ "1000000" }) private int tasksNum; - @Param({ "1" }) - private int workerTimeMinMs; - @Param({ "5" }) - private int workerTimeMaxMs; - private TaskExecutionTimeTrackingEsThreadPoolExecutor executor; - public EsThreadPoolExecutor newExecutor(boolean tracking) { + @Param({ "10" }) // 10ms is aggressive interval, it increases frame updates on FramedTimeTracker, normally we run at 30/60 + // seconds + private int utilizationIntervalMs; + + private EsThreadPoolExecutor executor; + + private EsThreadPoolExecutor newExecutor(boolean tracking) { var conf = EsExecutors.TaskTrackingConfig.builder(); if (tracking) { - conf.trackOngoingTasks().trackUtilization(); + conf.trackExecutionTime(0.3).trackUtilization(Duration.ofMillis(utilizationIntervalMs)); } return EsExecutors.newFixed( "bench", @@ -64,34 +71,48 @@ public EsThreadPoolExecutor newExecutor(boolean tracking) { ); } - private void runTasks(EsThreadPoolExecutor executor, Blackhole bh) throws InterruptedException { - try { - var completedTasks = new CountDownLatch(tasksNum); - for (var i = 0; i < tasksNum; i++) { - executor.execute(() -> { - try { - Thread.sleep(random.nextInt(workerTimeMinMs, workerTimeMaxMs)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally { - completedTasks.countDown(); - } - }); + @Setup + public void setup() { + if (trackUtilization) { + var exec = newExecutor(true); + if (exec instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor trackingExecutor) { + if (trackingExecutor.trackingConfig().trackUtilization() == false) { + throw new IllegalStateException("utilization tracking must be enabled"); + } else { + executor = trackingExecutor; + } + } else { + throw new IllegalStateException("must be tracking executor"); + } + } else { + var exec = newExecutor(false); + if (exec instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor) { + throw new IllegalStateException("must be non-tracking executor"); } - completedTasks.await(); - } finally { - executor.shutdown(); - executor.awaitTermination(0, TimeUnit.MILLISECONDS); + executor = exec; } } - @Benchmark - public void trackingExecutor(Blackhole bh) throws InterruptedException { - runTasks(newExecutor(true), bh); + @TearDown + public void tearDown() throws InterruptedException { + executor.shutdown(); + executor.awaitTermination(0, TimeUnit.MILLISECONDS); } @Benchmark - public void nonTrackingExecutor(Blackhole bh) throws InterruptedException { - runTasks(newExecutor(false), bh); + public void run(Blackhole bh) throws InterruptedException { + var completedTasks = new CountDownLatch(tasksNum); + for (var i = 0; i < tasksNum; i++) { + executor.execute(() -> { + // busy cycles for cpu + var r = 0; + for (var j = 0; j < 1000; j++) { + r += j * 2; + } + bh.consume(r); + completedTasks.countDown(); + }); + } + completedTasks.await(); } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index e4c10e656e736..e65d4da8bfe5d 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -17,6 +17,7 @@ import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.node.Node; +import java.time.Duration; import java.util.List; import java.util.Optional; import java.util.concurrent.AbstractExecutorService; @@ -576,71 +577,45 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { } } - public static class TaskTrackingConfig { + /** + * @param trackExecutionTime Whether to track execution stats + * @param trackUtilization enables thread-pool utilization metrics + * @param utilizationInterval when utilization is enabled, specifies interval for measurement + * @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks + * @param trackMaxQueueLatency Whether to track max queue latency. + * @param executionTimeEwmaAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage). + */ + public record TaskTrackingConfig( + boolean trackExecutionTime, + boolean trackUtilization, + Duration utilizationInterval, + boolean trackOngoingTasks, + boolean trackMaxQueueLatency, + double executionTimeEwmaAlpha + ) { + // This is a random starting point alpha. public static final double DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST = 0.3; - - private final boolean trackExecutionTime; - private final boolean trackUtilization; - private final boolean trackOngoingTasks; - private final boolean trackMaxQueueLatency; - private final double executionTimeEwmaAlpha; + public static final Duration DEFAULT_UTILIZATION_INTERVAL = Duration.ofSeconds(30); public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig( false, false, + DEFAULT_UTILIZATION_INTERVAL, false, false, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST ); + public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig( true, true, + DEFAULT_UTILIZATION_INTERVAL, false, false, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST ); - /** - * @param trackExecutionTime Whether to track execution stats - * @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks - * @param trackMaxQueueLatency Whether to track max queue latency. - * @param executionTimeEWMAAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage). - */ - private TaskTrackingConfig( - boolean trackExecutionTime, - boolean trackUtilization, - boolean trackOngoingTasks, - boolean trackMaxQueueLatency, - double executionTimeEWMAAlpha - ) { - this.trackExecutionTime = trackExecutionTime; - this.trackUtilization = trackUtilization; - this.trackOngoingTasks = trackOngoingTasks; - this.trackMaxQueueLatency = trackMaxQueueLatency; - this.executionTimeEwmaAlpha = executionTimeEWMAAlpha; - } - - public boolean trackExecutionTime() { - return trackExecutionTime; - } - - public boolean trackOngoingTasks() { - return trackOngoingTasks; - } - - public boolean trackMaxQueueLatency() { - return trackMaxQueueLatency; - } - - public boolean TrackUtilization() { - return trackUtilization; - } - - public double getExecutionTimeEwmaAlpha() { - return executionTimeEwmaAlpha; - } - public static Builder builder() { return new Builder(); } @@ -651,6 +626,7 @@ public static class Builder { private boolean trackOngoingTasks = false; private boolean trackMaxQueueLatency = false; private double ewmaAlpha = DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST; + private Duration utilizationInterval = DEFAULT_UTILIZATION_INTERVAL; public Builder() {} @@ -660,8 +636,9 @@ public Builder trackExecutionTime(double alpha) { return this; } - public Builder trackUtilization() { + public Builder trackUtilization(Duration interval) { trackUtilization = true; + utilizationInterval = interval; return this; } @@ -676,7 +653,14 @@ public Builder trackMaxQueueLatency() { } public TaskTrackingConfig build() { - return new TaskTrackingConfig(trackExecutionTime, trackUtilization, trackOngoingTasks, trackMaxQueueLatency, ewmaAlpha); + return new TaskTrackingConfig( + trackExecutionTime, + trackUtilization, + utilizationInterval, + trackOngoingTasks, + trackMaxQueueLatency, + ewmaAlpha + ); } } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index f5f92a1edbbdc..5d3e363892d4d 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -45,23 +45,13 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea private final Function runnableWrapper; private final ExponentiallyWeightedMovingAverage executionEWMA; private final LongAdder totalExecutionTime = new LongAdder(); - private final boolean trackOngoingTasks; // The set of currently running tasks and the timestamp of when they started execution in the Executor. private final Map ongoingTasks = new ConcurrentHashMap<>(); private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS); - private final boolean trackMaxQueueLatency; - private final boolean trackUtilization; + private final TaskTrackingConfig trackingConfig; + private final FramedTimeTracker framedTimeTracker; private LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator(Long::max, 0); - public enum UtilizationTrackingPurpose { - APM, - ALLOCATION, - } - - private volatile UtilizationTracker apmUtilizationTracker = new UtilizationTracker(); - private volatile UtilizationTracker allocationUtilizationTracker = new UtilizationTracker(); - private final FramedTimeTracker framedTimeTracker= new FramedTimeTracker(1_000); - public TaskExecutionTimeTrackingEsThreadPoolExecutor( String name, int corePoolSize, @@ -78,10 +68,9 @@ public TaskExecutionTimeTrackingEsThreadPoolExecutor( super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder); this.runnableWrapper = runnableWrapper; - this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0); - this.trackOngoingTasks = trackingConfig.trackOngoingTasks(); - this.trackMaxQueueLatency = trackingConfig.trackMaxQueueLatency(); - this.trackUtilization = trackingConfig.TrackUtilization(); + this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.executionTimeEwmaAlpha(), 0); + this.trackingConfig = trackingConfig; + this.framedTimeTracker = new FramedTimeTracker(trackingConfig.utilizationInterval().getNano()); } public List setupMetrics(MeterRegistry meterRegistry, String threadPoolName) { @@ -109,7 +98,7 @@ public List setupMetrics(MeterRegistry meterRegistry, String threadP ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION, "fraction of maximum thread time utilized for " + threadPoolName, "fraction", - () -> new DoubleWithAttributes(pollUtilization(UtilizationTrackingPurpose.APM), Map.of()) + () -> new DoubleWithAttributes(utilization(), Map.of()) ) ); } @@ -151,34 +140,33 @@ public int getCurrentQueueSize() { } public long getMaxQueueLatencyMillisSinceLastPollAndReset() { - if (trackMaxQueueLatency == false) { + if (trackingConfig.trackMaxQueueLatency() == false) { return 0; } return maxQueueLatencyMillisSinceLastPoll.getThenReset(); } + public TaskTrackingConfig trackingConfig() { + return trackingConfig; + } + /** - * Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called. - * There are two periodic pulling mechanisms that access utilization reporting: {@link UtilizationTrackingPurpose} distinguishes the - * caller. + * Returns thread-pool utilization from last completed time interval(frame) {@link TaskTrackingConfig#utilizationInterval()}. + * Utilization is measured as {@code all-threads-total-execution-time / (total-thread-count * interval)}. + * This metric is updated once on per interval, and returns last completed measurement. For example: + * if interval is 30 seconds, at clock time 00:30-01:00 it will return utilization from 00:00-00:30. + * Thou there is no synchronization with clocks and system time. * - * @return the utilization as a fraction, in the range [0, 1]. This may return >1 if a task completed in the time range but started - * earlier, contributing a larger execution time. + * If caller needs longer intervals it should poll on every tracker-interval and aggregate on it's own. Another option is to extend + * framedTimeTracker to remember multiple past frames, and return aggregated view from here. */ - public double pollUtilization(UtilizationTrackingPurpose utilizationTrackingPurpose) { - switch (utilizationTrackingPurpose) { - case APM: - return apmUtilizationTracker.pollUtilization(); - case ALLOCATION: - return allocationUtilizationTracker.pollUtilization(); - default: - throw new IllegalStateException("No operation defined for [" + utilizationTrackingPurpose + "]"); - } + public double utilization() { + return (double) framedTimeTracker.previousFrameTime() / (double) getMaximumPoolSize() / (double) framedTimeTracker.interval; } @Override protected void beforeExecute(Thread t, Runnable r) { - if (trackOngoingTasks) { + if (trackingConfig.trackOngoingTasks()) { ongoingTasks.put(r, System.nanoTime()); } @@ -190,11 +178,11 @@ protected void beforeExecute(Thread t, Runnable r) { var queueLatencyMillis = TimeUnit.NANOSECONDS.toMillis(taskQueueLatency); queueLatencyMillisHistogram.addObservation(queueLatencyMillis); - if (trackMaxQueueLatency) { + if (trackingConfig.trackMaxQueueLatency()) { maxQueueLatencyMillisSinceLastPoll.accumulate(queueLatencyMillis); } - if (trackUtilization) { - framedTimeTracker.startTask(System.nanoTime()); + if (trackingConfig.trackUtilization()) { + framedTimeTracker.startTask(); } } @@ -220,13 +208,13 @@ protected void afterExecute(Runnable r, Throwable t) { executionEWMA.addValue(taskExecutionNanos); totalExecutionTime.add(taskExecutionNanos); } - if (trackUtilization) { - framedTimeTracker.endTask(System.nanoTime()); + if (trackingConfig.trackUtilization()) { + framedTimeTracker.endTask(); } } finally { // if trackOngoingTasks is false -> ongoingTasks must be empty - assert trackOngoingTasks || ongoingTasks.isEmpty(); - if (trackOngoingTasks) { + assert trackingConfig.trackOngoingTasks() || ongoingTasks.isEmpty(); + if (trackingConfig.trackOngoingTasks()) { ongoingTasks.remove(r); } } @@ -250,7 +238,7 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) { * task is reflected in at least one of those two values. */ public Map getOngoingTasks() { - return trackOngoingTasks ? Map.copyOf(ongoingTasks) : Map.of(); + return trackingConfig.trackOngoingTasks() ? Map.copyOf(ongoingTasks) : Map.of(); } // Used for testing @@ -260,64 +248,47 @@ public double getExecutionEwmaAlpha() { // Used for testing public boolean trackingMaxQueueLatency() { - return trackMaxQueueLatency; + return trackingConfig.trackMaxQueueLatency(); } /** - * Supports periodic polling for thread pool utilization. Tracks state since the last polling request so that the average utilization - * since the last poll can be calculated for the next polling request. + * Tracks treads execution in continuous, non-overlapping, and even time frames. Provides accurate total execution time measurement + * for past frames, specifically previous frame (now - 1 frame) to measure utilization. * - * Uses the difference of {@link #totalExecutionTime} since the last polling request to determine how much activity has occurred. + * Can be extended to remember multiple past frames. */ - private class UtilizationTracker { - long lastPollTime = System.nanoTime(); - long lastTotalExecutionTime = 0; - - public synchronized double pollUtilization() { - final long currentTotalExecutionTimeNanos = totalExecutionTime.sum(); - final long currentPollTimeNanos = System.nanoTime(); - - final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime; - final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime; - - final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize(); - final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos; - - lastTotalExecutionTime = currentTotalExecutionTimeNanos; - lastPollTime = currentPollTimeNanos; - - return utilizationSinceLastPoll; - } - } - static class FramedTimeTracker { + final long interval; private final Supplier timeNow; long ongoingTasks; - long interval; long currentFrame; long currentTime; long previousTime; // for testing - FramedTimeTracker(long interval, Supplier timeNow) { - assert interval > 0; - this.interval = interval; + FramedTimeTracker(long intervalNano, Supplier timeNow) { + assert intervalNano > 0; + this.interval = intervalNano; this.timeNow = timeNow; } - FramedTimeTracker(long interval) { - assert interval > 0; - this.interval = interval; + FramedTimeTracker(long intervalNano) { + assert intervalNano > 0; + this.interval = intervalNano; this.timeNow = System::nanoTime; } - /** - * update current and previous frames to current time - */ synchronized void updateFrame() { updateFrame0(timeNow.get()); } + /** + * Update frames to current time. When it's called first time or after a long period (>> interval) current and previous frames + * are going to be stale. But we know all ongoing tasks and can assume they still running, unless explicitly ended. For any + * ongoing task we always assume they will run indefinitely and apply "credit" to currentTime, if task is finished in + * currentFrame we deduct remaining balance. That means currentFrame can overestimate usage, but when current decay to previous + * it is always accurate, because task can end only in currentFrame. + */ private void updateFrame0(long nowTime) { var now = nowTime / interval; if (currentFrame < now) { @@ -331,17 +302,25 @@ private void updateFrame0(long nowTime) { } } - synchronized void startTask(long startTime) { - updateFrame0(startTime); - // assume task will run indefinitely, in this case at least till end of interval - currentTime += (currentFrame + 1) * interval - startTime; + /** + * Start tracking new task, assume that task runs indefinitely, or at least till end of frame. + * If task finishes sooner than end of interval {@link FramedTimeTracker#endTask()} will deduct remaining time. + */ + synchronized void startTask() { + var now = timeNow.get(); + updateFrame0(now); + currentTime += (currentFrame + 1) * interval - now; ++ongoingTasks; } - synchronized void endTask(long endTime) { - updateFrame0(endTime); + /** + * Stop task tracking. We already assumed that task runs till end of frame, here we deduct not used time. + */ + synchronized void endTask() { + var now = timeNow.get(); + updateFrame0(now); // we already assumed that task will run till end of interval, here we subtract whats left - currentTime -= (currentFrame + 1) * interval - endTime; + currentTime -= (currentFrame + 1) * interval - now; --ongoingTasks; } diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index 4ebcae1cc2ac0..194c3feeced36 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -16,6 +16,7 @@ import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.threadpool.internal.BuiltInExecutorBuilders; +import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -60,6 +61,7 @@ public Map getBuilders(Settings settings, int allocated .trackOngoingTasks() .trackMaxQueueLatency() .trackExecutionTime(indexAutoscalingEWMA) + .trackUtilization(Duration.ofSeconds(30)) .build() ) ); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java index 0b2075524ee1b..7fbf05f7c701c 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java @@ -10,61 +10,93 @@ package org.elasticsearch.common.util.concurrent; import org.elasticsearch.test.ESTestCase; +import org.junit.Before; import java.util.function.Supplier; +import static java.util.stream.IntStream.range; import static org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedTimeTracker; public class FramedTimeTrackerTests extends ESTestCase { - private final FramedTimeTracker framedTimeTracker; - private final FakeTime fakeTime; + private FakeTime fakeTime; - public FramedTimeTrackerTests() { + FramedTimeTracker newTracker(long interval) { + return new FramedTimeTracker(interval, fakeTime); + } + + @Before + public void setup() { fakeTime = new FakeTime(); - framedTimeTracker = new FramedTimeTracker(1L, fakeTime); } public void testNoTasks() { - framedTimeTracker.updateFrame(); - assertEquals(0, framedTimeTracker.previousFrameTime()); + var tracker = newTracker(1); + tracker.updateFrame(); + assertEquals(0, tracker.previousFrameTime()); fakeTime.time += between(1, 100); - assertEquals(0, framedTimeTracker.previousFrameTime()); + assertEquals(0, tracker.previousFrameTime()); } public void testSingleFrameTask() { - framedTimeTracker.interval = 100; - framedTimeTracker.startTask(10); - framedTimeTracker.endTask(20); - fakeTime.time += framedTimeTracker.interval; - assertEquals(10, framedTimeTracker.previousFrameTime()); + var tracker = newTracker(100); + fakeTime.time += 10; + tracker.startTask(); + fakeTime.time += 10; + tracker.endTask(); + fakeTime.time += tracker.interval; + assertEquals(10, tracker.previousFrameTime()); } public void testTwoFrameTask() { - framedTimeTracker.interval = 100; + var tracker = newTracker(100); var startTime = between(0, 100); - var endTime = startTime + framedTimeTracker.interval; - framedTimeTracker.startTask(startTime); - framedTimeTracker.endTask(endTime); - assertEquals(framedTimeTracker.interval - startTime, framedTimeTracker.previousFrameTime()); + var taskDuration = tracker.interval; + fakeTime.time += startTime; + tracker.startTask(); + fakeTime.time += taskDuration; + tracker.endTask(); + assertEquals(tracker.interval - startTime, tracker.previousFrameTime()); } public void testMultiFrameTask() { - framedTimeTracker.interval = 10; - framedTimeTracker.startTask(1); - framedTimeTracker.endTask(between(3, 100) * 10L); - assertEquals(framedTimeTracker.interval, framedTimeTracker.previousFrameTime()); + var interval = 10; + var tracker = newTracker(interval); + tracker.startTask(); + var taskDuration = between(3, 100) * interval; + fakeTime.time += taskDuration; + tracker.endTask(); + assertEquals(tracker.interval, tracker.previousFrameTime()); } public void testOngoingTask() { - framedTimeTracker.interval = 10; - framedTimeTracker.startTask(0); + var interval = 10; + var tracker = newTracker(interval); + tracker.startTask(); for (int i = 0; i < between(10, 100); i++) { - fakeTime.time += framedTimeTracker.interval; - assertEquals(framedTimeTracker.interval, framedTimeTracker.previousFrameTime()); + fakeTime.time += tracker.interval; + assertEquals(tracker.interval, tracker.previousFrameTime()); } } + public void testMultipleTasks() { + var interval = between(1, 100) * 2; // using integer division by 2 below + var tracker = newTracker(interval); + var halfIntervalTasks = between(1, 10); + var notEndingTasks = between(1, 10); + + range(0, halfIntervalTasks + notEndingTasks).forEach(t -> tracker.startTask()); + fakeTime.time += interval / 2; + range(0, halfIntervalTasks).forEach(t -> tracker.endTask()); + fakeTime.time += interval / 2; + var firstFrameTotalTime = interval * halfIntervalTasks / 2 + interval * notEndingTasks; + assertEquals(firstFrameTotalTime, tracker.previousFrameTime()); + + fakeTime.time += interval; + var secondFrameTotalTime = interval * notEndingTasks; + assertEquals(secondFrameTotalTime, tracker.previousFrameTime()); + } + static class FakeTime implements Supplier { long time = 0; diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index 505c26409a702..197013cf4d988 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -17,13 +17,18 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import java.time.Duration; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.DoubleStream; +import static java.util.stream.IntStream.range; import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -226,6 +231,65 @@ public void testGetOngoingTasks() throws Exception { executor.awaitTermination(10, TimeUnit.SECONDS); } + public void testUtilization() throws InterruptedException { + final var interval = Duration.ofMillis(100); + + final Consumer trySleep = (d) -> { + try { + Thread.sleep(d); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + + final Runnable waitTillNextFrame = () -> { + var now = System.nanoTime(); + var waitTillNext = (now / interval.getNano() + 1) * interval.getNano() - now; + trySleep.accept(Duration.ofNanos(waitTillNext)); + }; + + final Function sleepTaskFn = (d) -> () -> trySleep.accept(d); + + var executor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) EsExecutors.newFixed( + "utilization", + 4, + 4, + Executors.defaultThreadFactory(), + new ThreadContext(Settings.EMPTY), + EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(0.3).trackUtilization(interval).build() + ); + + try { + // warm-up executor, reduces utilization metric jitter + waitTillNextFrame.run(); + range(0, 4).forEach(i -> executor.submit(sleepTaskFn.apply(Duration.ofMillis(1)))); + + // create 4 tasks that use different thread usage per interval and start at the beginning of the frame + // 1. small task with 10% + // 2. medium task with 30% + // 3. larger task with 50% + // 4 long running task that spans over multiple intervals(3), ie 100% utilization per frame + // total utilization for 4-threads-pool = (10 + 30 + 50 + 100) / (4 * 100) = 190/400 = 0.475 + // assuming overhead can be up to 10%, final range would be 0.475 - 0.575 + waitTillNextFrame.run(); + + DoubleStream.of(0.1, 0.3, 0.5, 3.).forEach(loadFactor -> { + var sleepTime = (long) (interval.getNano() * loadFactor); + executor.submit(sleepTaskFn.apply(Duration.ofNanos(sleepTime))); + }); + + waitTillNextFrame.run(); + assertEquals(0.475, executor.utilization(), 0.1); + + // the long running task should still use 100% of a single thread, so 1/4=25% utilization + waitTillNextFrame.run(); + assertEquals(0.25, executor.utilization(), 0.1); + } finally { + executor.shutdown(); + executor.awaitTermination(0, TimeUnit.SECONDS); + } + } + public void testQueueLatencyHistogramMetrics() { RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); final var threadPoolName = randomIdentifier(); diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index 2cd166e002637..25bb95e9532be 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -20,7 +20,6 @@ import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; -import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose; import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.Node; import org.elasticsearch.telemetry.InstrumentType; @@ -38,7 +37,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT; @@ -49,10 +47,8 @@ import static org.elasticsearch.threadpool.ThreadPool.getMaxSnapshotThreadPoolSize; import static org.elasticsearch.threadpool.ThreadPool.halfAllocatedProcessorsMaxFive; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.lessThan; public class ThreadPoolTests extends ESTestCase { @@ -489,85 +485,6 @@ public void testScheduledFixedDelayForceExecution() { } } - public void testDetailedUtilizationMetric() throws Exception { - final RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); - final BuiltInExecutorBuilders builtInExecutorBuilders = new DefaultBuiltInExecutorBuilders(); - - final ThreadPool threadPool = new ThreadPool( - Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), - meterRegistry, - builtInExecutorBuilders - ); - try { - // write thread pool is tracked - final String threadPoolName = ThreadPool.Names.WRITE; - final MetricAsserter metricAsserter = new MetricAsserter(meterRegistry, threadPoolName); - final ThreadPool.Info threadPoolInfo = threadPool.info(threadPoolName); - final TaskExecutionTimeTrackingEsThreadPoolExecutor executor = asInstanceOf( - TaskExecutionTimeTrackingEsThreadPoolExecutor.class, - threadPool.executor(threadPoolName) - ); - - final long beforePreviousCollectNanos = System.nanoTime(); - meterRegistry.getRecorder().collect(); - double allocationUtilization = executor.pollUtilization(UtilizationTrackingPurpose.ALLOCATION); - final long afterPreviousCollectNanos = System.nanoTime(); - - var metricValue = metricAsserter.assertLatestMetricValueMatches( - InstrumentType.DOUBLE_GAUGE, - ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, - Measurement::getDouble, - equalTo(0.0d) - ); - logger.info("---> Utilization metric data points, APM: " + metricValue + ", Allocation: " + allocationUtilization); - assertThat(allocationUtilization, equalTo(0.0d)); - - final AtomicLong minimumDurationNanos = new AtomicLong(Long.MAX_VALUE); - final long beforeStartNanos = System.nanoTime(); - final CyclicBarrier barrier = new CyclicBarrier(2); - Future future = executor.submit(() -> { - long innerStartTimeNanos = System.nanoTime(); - safeSleep(100); - safeAwait(barrier); - minimumDurationNanos.set(System.nanoTime() - innerStartTimeNanos); - }); - safeAwait(barrier); - safeGet(future); - final long maxDurationNanos = System.nanoTime() - beforeStartNanos; - - // Wait for TaskExecutionTimeTrackingEsThreadPoolExecutor#afterExecute to run - assertBusy(() -> assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L))); - - final long beforeMetricsCollectedNanos = System.nanoTime(); - meterRegistry.getRecorder().collect(); - allocationUtilization = executor.pollUtilization(UtilizationTrackingPurpose.ALLOCATION); - final long afterMetricsCollectedNanos = System.nanoTime(); - - // Calculate upper bound on utilisation metric - final long minimumPollIntervalNanos = beforeMetricsCollectedNanos - afterPreviousCollectNanos; - final long minimumMaxExecutionTimeNanos = minimumPollIntervalNanos * threadPoolInfo.getMax(); - final double maximumUtilization = (double) maxDurationNanos / minimumMaxExecutionTimeNanos; - - // Calculate lower bound on utilisation metric - final long maximumPollIntervalNanos = afterMetricsCollectedNanos - beforePreviousCollectNanos; - final long maximumMaxExecutionTimeNanos = maximumPollIntervalNanos * threadPoolInfo.getMax(); - final double minimumUtilization = (double) minimumDurationNanos.get() / maximumMaxExecutionTimeNanos; - - logger.info("Utilization must be in [{}, {}]", minimumUtilization, maximumUtilization); - Matcher matcher = allOf(greaterThan(minimumUtilization), lessThan(maximumUtilization)); - metricValue = metricAsserter.assertLatestMetricValueMatches( - InstrumentType.DOUBLE_GAUGE, - ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, - Measurement::getDouble, - matcher - ); - logger.info("---> Utilization metric data points, APM: " + metricValue + ", Allocation: " + allocationUtilization); - assertThat(allocationUtilization, matcher); - } finally { - ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); - } - } - public void testThreadCountMetrics() throws Exception { final RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); final BuiltInExecutorBuilders builtInExecutorBuilders = new DefaultBuiltInExecutorBuilders(); From cb531b979e2745f38eaceed2126e7174387edd8c Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Fri, 25 Jul 2025 23:04:54 -0700 Subject: [PATCH 04/20] Update docs/changelog/131898.yaml --- docs/changelog/131898.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/131898.yaml diff --git a/docs/changelog/131898.yaml b/docs/changelog/131898.yaml new file mode 100644 index 0000000000000..f90985795879c --- /dev/null +++ b/docs/changelog/131898.yaml @@ -0,0 +1,5 @@ +pr: 131898 +summary: Time framed thread-pool utilization +area: Allocation +type: enhancement +issues: [] From ab04cc68977548b10b29978c75dad669166a6da4 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Fri, 25 Jul 2025 23:12:13 -0700 Subject: [PATCH 05/20] assertion fix --- .../TaskExecutionTimeTrackingEsThreadPoolExecutor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 5d3e363892d4d..be9ec17192f06 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -267,13 +267,13 @@ static class FramedTimeTracker { // for testing FramedTimeTracker(long intervalNano, Supplier timeNow) { - assert intervalNano > 0; + assert intervalNano >= 0; this.interval = intervalNano; this.timeNow = timeNow; } FramedTimeTracker(long intervalNano) { - assert intervalNano > 0; + assert intervalNano >= 0; this.interval = intervalNano; this.timeNow = System::nanoTime; } From 454a871e307a1cb55b4129aa3fa7ce4c40e07218 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Fri, 25 Jul 2025 23:53:36 -0700 Subject: [PATCH 06/20] duration toNanos --- .../TaskExecutionTimeTrackingEsThreadPoolExecutor.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index be9ec17192f06..cda2a85d86c1a 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -70,7 +70,7 @@ public TaskExecutionTimeTrackingEsThreadPoolExecutor( this.runnableWrapper = runnableWrapper; this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.executionTimeEwmaAlpha(), 0); this.trackingConfig = trackingConfig; - this.framedTimeTracker = new FramedTimeTracker(trackingConfig.utilizationInterval().getNano()); + this.framedTimeTracker = new FramedTimeTracker(trackingConfig.utilizationInterval().toNanos()); } public List setupMetrics(MeterRegistry meterRegistry, String threadPoolName) { @@ -267,13 +267,13 @@ static class FramedTimeTracker { // for testing FramedTimeTracker(long intervalNano, Supplier timeNow) { - assert intervalNano >= 0; + assert intervalNano > 0; this.interval = intervalNano; this.timeNow = timeNow; } FramedTimeTracker(long intervalNano) { - assert intervalNano >= 0; + assert intervalNano > 0; this.interval = intervalNano; this.timeNow = System::nanoTime; } From a87a7c47e4f4c946264bbcf5a51edc77632cf2b3 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Sat, 26 Jul 2025 16:05:03 -0700 Subject: [PATCH 07/20] comments --- .../common/util/concurrent/EsExecutors.java | 1 + ...utionTimeTrackingEsThreadPoolExecutor.java | 20 ++++++++++++------- .../DefaultBuiltInExecutorBuilders.java | 1 - 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index e65d4da8bfe5d..14e2e59bbf3df 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -632,6 +632,7 @@ public Builder() {} public Builder trackExecutionTime(double alpha) { trackExecutionTime = true; + trackUtilization = true; ewmaAlpha = alpha; return this; } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index cda2a85d86c1a..94d2d2b580afa 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -153,7 +153,7 @@ public TaskTrackingConfig trackingConfig() { /** * Returns thread-pool utilization from last completed time interval(frame) {@link TaskTrackingConfig#utilizationInterval()}. * Utilization is measured as {@code all-threads-total-execution-time / (total-thread-count * interval)}. - * This metric is updated once on per interval, and returns last completed measurement. For example: + * This metric is updated once per interval, and returns last completed measurement. For example: * if interval is 30 seconds, at clock time 00:30-01:00 it will return utilization from 00:00-00:30. * Thou there is no synchronization with clocks and system time. * @@ -283,11 +283,15 @@ synchronized void updateFrame() { } /** - * Update frames to current time. When it's called first time or after a long period (>> interval) current and previous frames - * are going to be stale. But we know all ongoing tasks and can assume they still running, unless explicitly ended. For any - * ongoing task we always assume they will run indefinitely and apply "credit" to currentTime, if task is finished in - * currentFrame we deduct remaining balance. That means currentFrame can overestimate usage, but when current decay to previous - * it is always accurate, because task can end only in currentFrame. + * Update frames to current time. There are no guaranties that it will be invoked frequently. + * For example when there are no tasks and no requests for previousFrameTime. + * + * When it's invoked frequently, at least once per frame, we move currentTime into previousTime. + * That concludes currentTime and it's accurate. + * + * When it's invoked infrequently, once in multiple frames, current and previous frames are going to be stale. + * Which is ok, that means there were no changes in tasks(start/end), all ongoing tasks are still running. + * That means ongoing tasks fully utilized previous frames. And we can accurately tell previous frame usage. */ private void updateFrame0(long nowTime) { var now = nowTime / interval; @@ -319,11 +323,13 @@ synchronized void startTask() { synchronized void endTask() { var now = timeNow.get(); updateFrame0(now); - // we already assumed that task will run till end of interval, here we subtract whats left currentTime -= (currentFrame + 1) * interval - now; --ongoingTasks; } + /** + * Returns previous frame total execution time. + */ synchronized long previousFrameTime() { updateFrame0(timeNow.get()); return previousTime; diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index 194c3feeced36..a625f15f0b557 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -61,7 +61,6 @@ public Map getBuilders(Settings settings, int allocated .trackOngoingTasks() .trackMaxQueueLatency() .trackExecutionTime(indexAutoscalingEWMA) - .trackUtilization(Duration.ofSeconds(30)) .build() ) ); From 8850789cd64b2bc52a047b325fc9218466623b0a Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Sat, 26 Jul 2025 23:19:56 +0000 Subject: [PATCH 08/20] [CI] Auto commit changes from spotless --- .../elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index a625f15f0b557..4ebcae1cc2ac0 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -16,7 +16,6 @@ import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.threadpool.internal.BuiltInExecutorBuilders; -import java.time.Duration; import java.util.HashMap; import java.util.Map; From 3d66e8659750276af109167e54914f0961cdbd58 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 28 Jul 2025 18:34:30 +1000 Subject: [PATCH 09/20] Micro-ize the benchmark --- .../ThreadPoolUtilizationBenchmark.java | 120 ++++++++---------- ...utionTimeTrackingEsThreadPoolExecutor.java | 12 +- 2 files changed, 57 insertions(+), 75 deletions(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java index 2635545a784af..06ad70a111dd4 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java @@ -9,14 +9,11 @@ package org.elasticsearch.benchmark.common.util.concurrent; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; @@ -24,95 +21,80 @@ import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; -import java.time.Duration; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) -@BenchmarkMode(Mode.AverageTime) -@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Threads(12) +@Warmup(iterations = 3, time = 200, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 5, time = 600, timeUnit = TimeUnit.MILLISECONDS) +@BenchmarkMode(Mode.SampleTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) @Fork(1) -@State(Scope.Thread) public class ThreadPoolUtilizationBenchmark { - @Param({ "false", "true" }) - private boolean trackUtilization; + @Param({ "0", "100", "1000" }) + private int callIntervalTicks; - @Param({ "4", "8", "16" }) - private int poolSize; - - @Param({ "1000000" }) - private int tasksNum; - - @Param({ "10" }) // 10ms is aggressive interval, it increases frame updates on FramedTimeTracker, normally we run at 30/60 - // seconds + /** + * This makes very little difference, all the overhead is in the synchronization + */ + @Param({ "10" }) private int utilizationIntervalMs; - private EsThreadPoolExecutor executor; + @State(Scope.Thread) + public static class TaskState { + boolean running = false; - private EsThreadPoolExecutor newExecutor(boolean tracking) { - var conf = EsExecutors.TaskTrackingConfig.builder(); - if (tracking) { - conf.trackExecutionTime(0.3).trackUtilization(Duration.ofMillis(utilizationIntervalMs)); + boolean shouldStart() { + return (running = running == false); } - return EsExecutors.newFixed( - "bench", - poolSize, - tasksNum, - Executors.defaultThreadFactory(), - new ThreadContext(Settings.EMPTY), - conf.build() - ); } + private TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedTimeTracker timeTracker; + @Setup public void setup() { - if (trackUtilization) { - var exec = newExecutor(true); - if (exec instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor trackingExecutor) { - if (trackingExecutor.trackingConfig().trackUtilization() == false) { - throw new IllegalStateException("utilization tracking must be enabled"); - } else { - executor = trackingExecutor; - } - } else { - throw new IllegalStateException("must be tracking executor"); - } + timeTracker = new TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedTimeTracker( + TimeUnit.MILLISECONDS.toNanos(utilizationIntervalMs), + System::nanoTime + ); + } + + @Benchmark + public void baseline() { + Blackhole.consumeCPU(callIntervalTicks); + } + + @Group("ReadAndWrite") + @Benchmark + public void startAndStopTasks(TaskState state) { + Blackhole.consumeCPU(callIntervalTicks); + if (state.shouldStart()) { + timeTracker.startTask(); } else { - var exec = newExecutor(false); - if (exec instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor) { - throw new IllegalStateException("must be non-tracking executor"); - } - executor = exec; + timeTracker.endTask(); } } - @TearDown - public void tearDown() throws InterruptedException { - executor.shutdown(); - executor.awaitTermination(0, TimeUnit.MILLISECONDS); + @Benchmark + @Group("ReadAndWrite") + public void readPrevious(Blackhole blackhole) { + Blackhole.consumeCPU(callIntervalTicks); + blackhole.consume(timeTracker.previousFrameTime()); } @Benchmark - public void run(Blackhole bh) throws InterruptedException { - var completedTasks = new CountDownLatch(tasksNum); - for (var i = 0; i < tasksNum; i++) { - executor.execute(() -> { - // busy cycles for cpu - var r = 0; - for (var j = 0; j < 1000; j++) { - r += j * 2; - } - bh.consume(r); - completedTasks.countDown(); - }); + @Group("JustWrite") + public void startAndStopTasksOnly(TaskState state) { + Blackhole.consumeCPU(callIntervalTicks); + if (state.shouldStart()) { + timeTracker.startTask(); + } else { + timeTracker.endTask(); } - completedTasks.await(); } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 94d2d2b580afa..e7e855e5bfbee 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -257,7 +257,7 @@ public boolean trackingMaxQueueLatency() { * * Can be extended to remember multiple past frames. */ - static class FramedTimeTracker { + public static class FramedTimeTracker { final long interval; private final Supplier timeNow; long ongoingTasks; @@ -266,7 +266,7 @@ static class FramedTimeTracker { long previousTime; // for testing - FramedTimeTracker(long intervalNano, Supplier timeNow) { + public FramedTimeTracker(long intervalNano, Supplier timeNow) { assert intervalNano > 0; this.interval = intervalNano; this.timeNow = timeNow; @@ -278,7 +278,7 @@ static class FramedTimeTracker { this.timeNow = System::nanoTime; } - synchronized void updateFrame() { + public synchronized void updateFrame() { updateFrame0(timeNow.get()); } @@ -310,7 +310,7 @@ private void updateFrame0(long nowTime) { * Start tracking new task, assume that task runs indefinitely, or at least till end of frame. * If task finishes sooner than end of interval {@link FramedTimeTracker#endTask()} will deduct remaining time. */ - synchronized void startTask() { + public synchronized void startTask() { var now = timeNow.get(); updateFrame0(now); currentTime += (currentFrame + 1) * interval - now; @@ -320,7 +320,7 @@ synchronized void startTask() { /** * Stop task tracking. We already assumed that task runs till end of frame, here we deduct not used time. */ - synchronized void endTask() { + public synchronized void endTask() { var now = timeNow.get(); updateFrame0(now); currentTime -= (currentFrame + 1) * interval - now; @@ -330,7 +330,7 @@ synchronized void endTask() { /** * Returns previous frame total execution time. */ - synchronized long previousFrameTime() { + public synchronized long previousFrameTime() { updateFrame0(timeNow.get()); return previousTime; } From efa48b46148ae51bef3572f670dd46cfb3d7a4b6 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 28 Jul 2025 18:54:56 +1000 Subject: [PATCH 10/20] Use average instead of sample --- .../util/concurrent/ThreadPoolUtilizationBenchmark.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java index 06ad70a111dd4..c412ee2f2a2ee 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java @@ -30,13 +30,13 @@ @Threads(12) @Warmup(iterations = 3, time = 200, timeUnit = TimeUnit.MILLISECONDS) @Measurement(iterations = 5, time = 600, timeUnit = TimeUnit.MILLISECONDS) -@BenchmarkMode(Mode.SampleTime) +@BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MICROSECONDS) @State(Scope.Benchmark) @Fork(1) public class ThreadPoolUtilizationBenchmark { - @Param({ "0", "100", "1000" }) + @Param({ "0", "10000", "100000" }) private int callIntervalTicks; /** From 33173fb05c1df1e508736aedc7a938b3245f5269 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Tue, 29 Jul 2025 00:02:05 -0700 Subject: [PATCH 11/20] non-locking counting --- .../ThreadPoolUtilizationBenchmark.java | 42 ++++----------- ...utionTimeTrackingEsThreadPoolExecutor.java | 52 +++++++++++-------- .../concurrent/FramedTimeTrackerTests.java | 2 +- ...TimeTrackingEsThreadPoolExecutorTests.java | 4 +- 4 files changed, 43 insertions(+), 57 deletions(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java index c412ee2f2a2ee..dc694fe9fed0f 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java @@ -30,13 +30,13 @@ @Threads(12) @Warmup(iterations = 3, time = 200, timeUnit = TimeUnit.MILLISECONDS) @Measurement(iterations = 5, time = 600, timeUnit = TimeUnit.MILLISECONDS) -@BenchmarkMode(Mode.AverageTime) +@BenchmarkMode(Mode.SampleTime) @OutputTimeUnit(TimeUnit.MICROSECONDS) @State(Scope.Benchmark) @Fork(1) public class ThreadPoolUtilizationBenchmark { - @Param({ "0", "10000", "100000" }) + @Param({ "10000" }) private int callIntervalTicks; /** @@ -44,16 +44,6 @@ public class ThreadPoolUtilizationBenchmark { */ @Param({ "10" }) private int utilizationIntervalMs; - - @State(Scope.Thread) - public static class TaskState { - boolean running = false; - - boolean shouldStart() { - return (running = running == false); - } - } - private TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedTimeTracker timeTracker; @Setup @@ -69,32 +59,20 @@ public void baseline() { Blackhole.consumeCPU(callIntervalTicks); } - @Group("ReadAndWrite") + @Group("StartAndEnd") @Benchmark public void startAndStopTasks(TaskState state) { + timeTracker.startTask(); Blackhole.consumeCPU(callIntervalTicks); - if (state.shouldStart()) { - timeTracker.startTask(); - } else { - timeTracker.endTask(); - } + timeTracker.endTask(); } - @Benchmark - @Group("ReadAndWrite") - public void readPrevious(Blackhole blackhole) { - Blackhole.consumeCPU(callIntervalTicks); - blackhole.consume(timeTracker.previousFrameTime()); - } + @State(Scope.Thread) + public static class TaskState { + boolean running = false; - @Benchmark - @Group("JustWrite") - public void startAndStopTasksOnly(TaskState state) { - Blackhole.consumeCPU(callIntervalTicks); - if (state.shouldStart()) { - timeTracker.startTask(); - } else { - timeTracker.endTask(); + boolean shouldStart() { + return (running = running == false); } } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index e7e855e5bfbee..ce671b9fca8e8 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -27,6 +27,8 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; @@ -260,10 +262,11 @@ public boolean trackingMaxQueueLatency() { public static class FramedTimeTracker { final long interval; private final Supplier timeNow; - long ongoingTasks; - long currentFrame; - long currentTime; - long previousTime; + private final AtomicLong ongoingTasks = new AtomicLong(); + private final AtomicLong currentFrame = new AtomicLong(); + private final AtomicLong currentTime = new AtomicLong(); + private final AtomicLong previousTime = new AtomicLong(); + private final AtomicBoolean updatingFrame = new AtomicBoolean(); // for testing public FramedTimeTracker(long intervalNano, Supplier timeNow) { @@ -278,10 +281,6 @@ public FramedTimeTracker(long intervalNano, Supplier timeNow) { this.timeNow = System::nanoTime; } - public synchronized void updateFrame() { - updateFrame0(timeNow.get()); - } - /** * Update frames to current time. There are no guaranties that it will be invoked frequently. * For example when there are no tasks and no requests for previousFrameTime. @@ -295,14 +294,23 @@ public synchronized void updateFrame() { */ private void updateFrame0(long nowTime) { var now = nowTime / interval; - if (currentFrame < now) { - if (currentFrame == now - 1) { - previousTime = currentTime; // + var current = currentFrame.get(); + if (current < now) { + if (updatingFrame.compareAndSet(false, true)) { + var tasks = ongoingTasks.get(); + if (current == now - 1) { + previousTime.set(currentTime.get()); + } else { + previousTime.set(tasks * interval); + } + currentTime.set(tasks * interval); + currentFrame.set(now); + updatingFrame.set(false); } else { - previousTime = ongoingTasks * interval; + while (currentFrame.get() != now) { + Thread.onSpinWait(); + } } - currentTime = ongoingTasks * interval; - currentFrame = now; } } @@ -310,29 +318,29 @@ private void updateFrame0(long nowTime) { * Start tracking new task, assume that task runs indefinitely, or at least till end of frame. * If task finishes sooner than end of interval {@link FramedTimeTracker#endTask()} will deduct remaining time. */ - public synchronized void startTask() { + public void startTask() { var now = timeNow.get(); updateFrame0(now); - currentTime += (currentFrame + 1) * interval - now; - ++ongoingTasks; + ongoingTasks.incrementAndGet(); + currentTime.updateAndGet((t) -> t + (currentFrame.get() + 1) * interval - now); } /** * Stop task tracking. We already assumed that task runs till end of frame, here we deduct not used time. */ - public synchronized void endTask() { + public void endTask() { var now = timeNow.get(); updateFrame0(now); - currentTime -= (currentFrame + 1) * interval - now; - --ongoingTasks; + ongoingTasks.decrementAndGet(); + currentTime.updateAndGet((t) -> t - (currentFrame.get() + 1) * interval + now); } /** * Returns previous frame total execution time. */ - public synchronized long previousFrameTime() { + public long previousFrameTime() { updateFrame0(timeNow.get()); - return previousTime; + return previousTime.get(); } } } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java index 7fbf05f7c701c..348d7f2ccb97f 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java @@ -32,7 +32,7 @@ public void setup() { public void testNoTasks() { var tracker = newTracker(1); - tracker.updateFrame(); + tracker.previousFrameTime(); assertEquals(0, tracker.previousFrameTime()); fakeTime.time += between(1, 100); assertEquals(0, tracker.previousFrameTime()); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index 197013cf4d988..e6890262aed39 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -244,7 +244,7 @@ public void testUtilization() throws InterruptedException { final Runnable waitTillNextFrame = () -> { var now = System.nanoTime(); - var waitTillNext = (now / interval.getNano() + 1) * interval.getNano() - now; + var waitTillNext = (now / interval.toNanos() + 1) * interval.toNanos() - now; trySleep.accept(Duration.ofNanos(waitTillNext)); }; @@ -274,7 +274,7 @@ public void testUtilization() throws InterruptedException { waitTillNextFrame.run(); DoubleStream.of(0.1, 0.3, 0.5, 3.).forEach(loadFactor -> { - var sleepTime = (long) (interval.getNano() * loadFactor); + var sleepTime = (long) (interval.toNanos() * loadFactor); executor.submit(sleepTaskFn.apply(Duration.ofNanos(sleepTime))); }); From f3c81db6b7538ee55c86b4a466420f97a101d280 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Tue, 29 Jul 2025 08:41:29 -0700 Subject: [PATCH 12/20] rwlock --- ...utionTimeTrackingEsThreadPoolExecutor.java | 35 +++++++++++++------ .../concurrent/FramedTimeTrackerTests.java | 12 +++---- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index ce671b9fca8e8..e5f2ac03f60ba 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -27,10 +27,10 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.function.Supplier; @@ -260,13 +260,13 @@ public boolean trackingMaxQueueLatency() { * Can be extended to remember multiple past frames. */ public static class FramedTimeTracker { - final long interval; + private final long interval; private final Supplier timeNow; + private final ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock(); private final AtomicLong ongoingTasks = new AtomicLong(); private final AtomicLong currentFrame = new AtomicLong(); private final AtomicLong currentTime = new AtomicLong(); private final AtomicLong previousTime = new AtomicLong(); - private final AtomicBoolean updatingFrame = new AtomicBoolean(); // for testing public FramedTimeTracker(long intervalNano, Supplier timeNow) { @@ -281,6 +281,10 @@ public FramedTimeTracker(long intervalNano, Supplier timeNow) { this.timeNow = System::nanoTime; } + public long interval() { + return interval; + } + /** * Update frames to current time. There are no guaranties that it will be invoked frequently. * For example when there are no tasks and no requests for previousFrameTime. @@ -296,7 +300,10 @@ private void updateFrame0(long nowTime) { var now = nowTime / interval; var current = currentFrame.get(); if (current < now) { - if (updatingFrame.compareAndSet(false, true)) { + rwlock.readLock().unlock(); + rwlock.writeLock().lock(); + current = currentFrame.get(); // make sure it didnt change during lock acquisition + if (current < now) { var tasks = ongoingTasks.get(); if (current == now - 1) { previousTime.set(currentTime.get()); @@ -305,12 +312,9 @@ private void updateFrame0(long nowTime) { } currentTime.set(tasks * interval); currentFrame.set(now); - updatingFrame.set(false); - } else { - while (currentFrame.get() != now) { - Thread.onSpinWait(); - } } + rwlock.readLock().lock(); + rwlock.writeLock().unlock(); } } @@ -319,28 +323,37 @@ private void updateFrame0(long nowTime) { * If task finishes sooner than end of interval {@link FramedTimeTracker#endTask()} will deduct remaining time. */ public void startTask() { + rwlock.readLock().lock(); var now = timeNow.get(); updateFrame0(now); ongoingTasks.incrementAndGet(); currentTime.updateAndGet((t) -> t + (currentFrame.get() + 1) * interval - now); + rwlock.readLock().unlock(); } /** * Stop task tracking. We already assumed that task runs till end of frame, here we deduct not used time. */ public void endTask() { + rwlock.readLock().lock(); var now = timeNow.get(); updateFrame0(now); ongoingTasks.decrementAndGet(); currentTime.updateAndGet((t) -> t - (currentFrame.get() + 1) * interval + now); + rwlock.readLock().unlock(); } /** * Returns previous frame total execution time. */ public long previousFrameTime() { - updateFrame0(timeNow.get()); - return previousTime.get(); + try { + rwlock.readLock().lock(); + updateFrame0(timeNow.get()); + return previousTime.get(); + } finally { + rwlock.readLock().unlock(); + } } } } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java index 348d7f2ccb97f..be68e80bc24eb 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java @@ -44,19 +44,19 @@ public void testSingleFrameTask() { tracker.startTask(); fakeTime.time += 10; tracker.endTask(); - fakeTime.time += tracker.interval; + fakeTime.time += tracker.interval(); assertEquals(10, tracker.previousFrameTime()); } public void testTwoFrameTask() { var tracker = newTracker(100); var startTime = between(0, 100); - var taskDuration = tracker.interval; + var taskDuration = tracker.interval(); fakeTime.time += startTime; tracker.startTask(); fakeTime.time += taskDuration; tracker.endTask(); - assertEquals(tracker.interval - startTime, tracker.previousFrameTime()); + assertEquals(tracker.interval() - startTime, tracker.previousFrameTime()); } public void testMultiFrameTask() { @@ -66,7 +66,7 @@ public void testMultiFrameTask() { var taskDuration = between(3, 100) * interval; fakeTime.time += taskDuration; tracker.endTask(); - assertEquals(tracker.interval, tracker.previousFrameTime()); + assertEquals(tracker.interval(), tracker.previousFrameTime()); } public void testOngoingTask() { @@ -74,8 +74,8 @@ public void testOngoingTask() { var tracker = newTracker(interval); tracker.startTask(); for (int i = 0; i < between(10, 100); i++) { - fakeTime.time += tracker.interval; - assertEquals(tracker.interval, tracker.previousFrameTime()); + fakeTime.time += tracker.interval(); + assertEquals(tracker.interval(), tracker.previousFrameTime()); } } From 5771c467ad29badf080e14a7beef71184885dcbb Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Tue, 29 Jul 2025 11:45:26 -0700 Subject: [PATCH 13/20] cleanup --- .../util/concurrent/ThreadPoolUtilizationBenchmark.java | 6 +++--- .../TaskExecutionTimeTrackingEsThreadPoolExecutor.java | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java index dc694fe9fed0f..2474321beb588 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java @@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit; -@Threads(12) +@Threads(Threads.MAX) @Warmup(iterations = 3, time = 200, timeUnit = TimeUnit.MILLISECONDS) @Measurement(iterations = 5, time = 600, timeUnit = TimeUnit.MILLISECONDS) @BenchmarkMode(Mode.SampleTime) @@ -36,13 +36,13 @@ @Fork(1) public class ThreadPoolUtilizationBenchmark { - @Param({ "10000" }) + @Param({ "1000" }) private int callIntervalTicks; /** * This makes very little difference, all the overhead is in the synchronization */ - @Param({ "10" }) + @Param({ "30000" }) private int utilizationIntervalMs; private TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedTimeTracker timeTracker; diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index e5f2ac03f60ba..e021a8fc439d2 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -157,7 +157,7 @@ public TaskTrackingConfig trackingConfig() { * Utilization is measured as {@code all-threads-total-execution-time / (total-thread-count * interval)}. * This metric is updated once per interval, and returns last completed measurement. For example: * if interval is 30 seconds, at clock time 00:30-01:00 it will return utilization from 00:00-00:30. - * Thou there is no synchronization with clocks and system time. + * There is no synchronization with clocks and system time. * * If caller needs longer intervals it should poll on every tracker-interval and aggregate on it's own. Another option is to extend * framedTimeTracker to remember multiple past frames, and return aggregated view from here. @@ -254,7 +254,7 @@ public boolean trackingMaxQueueLatency() { } /** - * Tracks treads execution in continuous, non-overlapping, and even time frames. Provides accurate total execution time measurement + * Tracks threads execution in continuous, non-overlapping, and even time frames. Provides accurate total execution time measurement * for past frames, specifically previous frame (now - 1 frame) to measure utilization. * * Can be extended to remember multiple past frames. @@ -327,7 +327,7 @@ public void startTask() { var now = timeNow.get(); updateFrame0(now); ongoingTasks.incrementAndGet(); - currentTime.updateAndGet((t) -> t + (currentFrame.get() + 1) * interval - now); + currentTime.addAndGet((now + 1) * interval - now); rwlock.readLock().unlock(); } @@ -339,7 +339,7 @@ public void endTask() { var now = timeNow.get(); updateFrame0(now); ongoingTasks.decrementAndGet(); - currentTime.updateAndGet((t) -> t - (currentFrame.get() + 1) * interval + now); + currentTime.addAndGet(-((now + 1) * interval - now)); rwlock.readLock().unlock(); } From 12ef5e45819c363b4055613b68278456b1f5fa4a Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Tue, 29 Jul 2025 15:10:48 -0700 Subject: [PATCH 14/20] back to syncronized --- .../ThreadPoolUtilizationBenchmark.java | 17 ++---- ...utionTimeTrackingEsThreadPoolExecutor.java | 61 +++++++------------ 2 files changed, 25 insertions(+), 53 deletions(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java index 2474321beb588..09da5a06642dd 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java @@ -29,20 +29,20 @@ @Threads(Threads.MAX) @Warmup(iterations = 3, time = 200, timeUnit = TimeUnit.MILLISECONDS) -@Measurement(iterations = 5, time = 600, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 1, time = 60, timeUnit = TimeUnit.SECONDS) @BenchmarkMode(Mode.SampleTime) @OutputTimeUnit(TimeUnit.MICROSECONDS) @State(Scope.Benchmark) @Fork(1) public class ThreadPoolUtilizationBenchmark { - @Param({ "1000" }) + @Param({ "10000" }) private int callIntervalTicks; /** * This makes very little difference, all the overhead is in the synchronization */ - @Param({ "30000" }) + @Param({ "100" }) private int utilizationIntervalMs; private TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedTimeTracker timeTracker; @@ -61,18 +61,9 @@ public void baseline() { @Group("StartAndEnd") @Benchmark - public void startAndStopTasks(TaskState state) { + public void startAndStopTasks() { timeTracker.startTask(); Blackhole.consumeCPU(callIntervalTicks); timeTracker.endTask(); } - - @State(Scope.Thread) - public static class TaskState { - boolean running = false; - - boolean shouldStart() { - return (running = running == false); - } - } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index e021a8fc439d2..e88ca142fd237 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -260,13 +260,12 @@ public boolean trackingMaxQueueLatency() { * Can be extended to remember multiple past frames. */ public static class FramedTimeTracker { - private final long interval; + final long interval; private final Supplier timeNow; - private final ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock(); - private final AtomicLong ongoingTasks = new AtomicLong(); - private final AtomicLong currentFrame = new AtomicLong(); - private final AtomicLong currentTime = new AtomicLong(); - private final AtomicLong previousTime = new AtomicLong(); + private long ongoingTasks; + private long currentFrame; + private long currentTime; + private long previousTime; // for testing public FramedTimeTracker(long intervalNano, Supplier timeNow) { @@ -298,23 +297,14 @@ public long interval() { */ private void updateFrame0(long nowTime) { var now = nowTime / interval; - var current = currentFrame.get(); - if (current < now) { - rwlock.readLock().unlock(); - rwlock.writeLock().lock(); - current = currentFrame.get(); // make sure it didnt change during lock acquisition - if (current < now) { - var tasks = ongoingTasks.get(); - if (current == now - 1) { - previousTime.set(currentTime.get()); - } else { - previousTime.set(tasks * interval); - } - currentTime.set(tasks * interval); - currentFrame.set(now); + if (currentFrame < now) { + if (currentFrame == now - 1) { + previousTime = currentTime; // + } else { + previousTime = ongoingTasks * interval; } - rwlock.readLock().lock(); - rwlock.writeLock().unlock(); + currentTime = ongoingTasks * interval; + currentFrame = now; } } @@ -322,38 +312,29 @@ private void updateFrame0(long nowTime) { * Start tracking new task, assume that task runs indefinitely, or at least till end of frame. * If task finishes sooner than end of interval {@link FramedTimeTracker#endTask()} will deduct remaining time. */ - public void startTask() { - rwlock.readLock().lock(); + public synchronized void startTask() { var now = timeNow.get(); updateFrame0(now); - ongoingTasks.incrementAndGet(); - currentTime.addAndGet((now + 1) * interval - now); - rwlock.readLock().unlock(); + currentTime += (currentFrame + 1) * interval - now; + ++ongoingTasks; } /** * Stop task tracking. We already assumed that task runs till end of frame, here we deduct not used time. */ - public void endTask() { - rwlock.readLock().lock(); + public synchronized void endTask() { var now = timeNow.get(); updateFrame0(now); - ongoingTasks.decrementAndGet(); - currentTime.addAndGet(-((now + 1) * interval - now)); - rwlock.readLock().unlock(); + currentTime -= (currentFrame + 1) * interval - now; + --ongoingTasks; } /** * Returns previous frame total execution time. */ - public long previousFrameTime() { - try { - rwlock.readLock().lock(); - updateFrame0(timeNow.get()); - return previousTime.get(); - } finally { - rwlock.readLock().unlock(); - } + public synchronized long previousFrameTime() { + updateFrame0(timeNow.get()); + return previousTime; } } } From d006ed59441f0fcb964c7338073c113cad6a5959 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 29 Jul 2025 22:21:47 +0000 Subject: [PATCH 15/20] [CI] Auto commit changes from spotless --- .../TaskExecutionTimeTrackingEsThreadPoolExecutor.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index e88ca142fd237..4ce554d8eb37e 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -27,10 +27,8 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.function.Supplier; From 3816a5007b1ae783b09e7873b197ab32c0f58067 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Wed, 30 Jul 2025 14:45:42 -0700 Subject: [PATCH 16/20] non-locking frame windows --- .../ThreadPoolUtilizationBenchmark.java | 6 +- ...utionTimeTrackingEsThreadPoolExecutor.java | 153 +++++++++++++----- 2 files changed, 118 insertions(+), 41 deletions(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java index 09da5a06642dd..1948777ed415b 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java @@ -14,7 +14,6 @@ import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Group; -import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; @@ -22,14 +21,11 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Threads; -import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; import java.util.concurrent.TimeUnit; @Threads(Threads.MAX) -@Warmup(iterations = 3, time = 200, timeUnit = TimeUnit.MILLISECONDS) -@Measurement(iterations = 1, time = 60, timeUnit = TimeUnit.SECONDS) @BenchmarkMode(Mode.SampleTime) @OutputTimeUnit(TimeUnit.MICROSECONDS) @State(Scope.Benchmark) @@ -42,7 +38,7 @@ public class ThreadPoolUtilizationBenchmark { /** * This makes very little difference, all the overhead is in the synchronization */ - @Param({ "100" }) + @Param({ "10" }) private int utilizationIntervalMs; private TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedTimeTracker timeTracker; diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 4ce554d8eb37e..040f3fe945296 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -27,6 +27,9 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; @@ -258,12 +261,11 @@ public boolean trackingMaxQueueLatency() { * Can be extended to remember multiple past frames. */ public static class FramedTimeTracker { - final long interval; + private final long interval; private final Supplier timeNow; - private long ongoingTasks; - private long currentFrame; - private long currentTime; - private long previousTime; + private final AtomicReference frameWindowRef = new AtomicReference<>(new FrameWindow()); + private final AtomicBoolean updatingFrame = new AtomicBoolean(); + private final AtomicLong currentFrameNum = new AtomicLong(); // for testing public FramedTimeTracker(long intervalNano, Supplier timeNow) { @@ -283,56 +285,135 @@ public long interval() { } /** - * Update frames to current time. There are no guaranties that it will be invoked frequently. - * For example when there are no tasks and no requests for previousFrameTime. - * - * When it's invoked frequently, at least once per frame, we move currentTime into previousTime. - * That concludes currentTime and it's accurate. - * - * When it's invoked infrequently, once in multiple frames, current and previous frames are going to be stale. - * Which is ok, that means there were no changes in tasks(start/end), all ongoing tasks are still running. - * That means ongoing tasks fully utilized previous frames. And we can accurately tell previous frame usage. + * Returns current FrameWindow. If window is stale, it will slide to current time. + * @param now - current frame */ - private void updateFrame0(long nowTime) { - var now = nowTime / interval; - if (currentFrame < now) { - if (currentFrame == now - 1) { - previousTime = currentTime; // + private FrameWindow getWindow(long now) { + var current = currentFrameNum.get(); + // first time in new frame + if (current < now) { + // only one thread will perform frame update, others spinWait + if (updatingFrame.compareAndSet(false, true)) { + final var moveOffset = now - current; + final var newWindow = frameWindowRef.get().moveBy(moveOffset); + frameWindowRef.set(newWindow); + currentFrameNum.set(now); + updatingFrame.set(false); } else { - previousTime = ongoingTasks * interval; + while (updatingFrame.get()) { + Thread.onSpinWait(); + } + // an edge case when all the following happen: + // 1. window was stale, at least 1 frame + // 2. two or more threads try to update window + // 3. it's happening at the end of the frame, beginning new frame + // for example, lets say interval is 10 + // and there are two concurrent calls getWindow(9)->frame0 and getWindow(10)->frame1 + // both need to update window, but those are different windows, + // two things might happen: + // 1. getWindow(9) updates window and uses it, but getWindow(10) need to update window again + // 2. getWindow(10) updates window, then getWindow(9) will see a newer window, so we record task in a newer frame, + // basically rounding-up frame when it's happening. + if (currentFrameNum.get() < now) { + return getWindow(now); + } } - currentTime = ongoingTasks * interval; - currentFrame = now; } + return frameWindowRef.get(); } /** * Start tracking new task, assume that task runs indefinitely, or at least till end of frame. * If task finishes sooner than end of interval {@link FramedTimeTracker#endTask()} will deduct remaining time. */ - public synchronized void startTask() { - var now = timeNow.get(); - updateFrame0(now); - currentTime += (currentFrame + 1) * interval - now; - ++ongoingTasks; + public void startTask() { + final var nowTime = timeNow.get(); + final var now = nowTime / interval; + final var frameWindow = getWindow(now); + frameWindow.now().ongoingTasks.increment(); + frameWindow.now().startEndDiff.add((now + 1) * interval - nowTime); } /** * Stop task tracking. We already assumed that task runs till end of frame, here we deduct not used time. */ - public synchronized void endTask() { - var now = timeNow.get(); - updateFrame0(now); - currentTime -= (currentFrame + 1) * interval - now; - --ongoingTasks; + public void endTask() { + final var nowTime = timeNow.get(); + final var now = nowTime / interval; + final var frameWindow = getWindow(now); + frameWindow.now().ongoingTasks.decrement(); + frameWindow.now().startEndDiff.add(-((now + 1) * interval - nowTime)); } /** - * Returns previous frame total execution time. + * Returns previous frame total execution time */ - public synchronized long previousFrameTime() { - updateFrame0(timeNow.get()); - return previousTime; + public long previousFrameTime() { + final var now = timeNow.get() / interval; + final var frameWindow = getWindow(now); + // total time is sum of ongoing tasks in frame N-1 and all starts and ends in N frame + // so for the previous frame (now-1), it would be (now-2) ongoing tasks + (now -1) start/end tasks + final var ongoingTasks = frameWindow.now(-2).ongoingTasks.sum(); + final var startEndDiff = frameWindow.now(-1).startEndDiff.sum(); + return ongoingTasks * interval + startEndDiff; + } + + /** + * A single frame that tracks how many tasks are still running at the end of frame + * and diffs from task start and end. + */ + record Frame(LongAdder ongoingTasks, LongAdder startEndDiff) { + Frame() { + this(new LongAdder(), new LongAdder()); + } + } + + /** + * A frame window represent 3 consecutive frames. frames[0] is now, frames[1] is now-1. + */ + record FrameWindow(Frame[] frames) { + FrameWindow() { + this(new Frame(), new Frame(), new Frame()); + } + + FrameWindow(Frame past2, Frame past1, Frame now) { + this(new Frame[] { now, past1, past2 }); + } + + FrameWindow { + assert frames.length == 3; + } + + /** + * Creates a new window by sliding current by moveFrames. If new window overlaps with current Frames are reused. + * So there is no risk of losing data when start/end update frame in a past window. + */ + FrameWindow moveBy(long moveFrames) { + // a new frame always starts with previous ongoing tasks + final var ongoingTasks = now().ongoingTasks.sum(); + final FrameWindow newWindow; + if (moveFrames == 1) { + newWindow = new FrameWindow(now(-1), now(), new Frame()); + } else if (moveFrames == 2) { + newWindow = new FrameWindow(now(), new Frame(), new Frame()); + } else { + newWindow = new FrameWindow(); + } + // propagate ongoing tasks to all new frames + for (var newFrame = 0; newFrame < Math.min(moveFrames, 3); newFrame++) { + newWindow.frames[newFrame].ongoingTasks.add(ongoingTasks); + } + return newWindow; + } + + Frame now() { + return frames[0]; + } + + Frame now(int offset) { + assert offset >= -2 && offset <= 0; + return frames[-offset]; + } } } } From 225280a96d6e9e4bfe4f597b19adba6e59639fbd Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Wed, 30 Jul 2025 14:56:46 -0700 Subject: [PATCH 17/20] nits --- .../TaskExecutionTimeTrackingEsThreadPoolExecutor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 040f3fe945296..f438e10bef3e4 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -385,8 +385,8 @@ record FrameWindow(Frame[] frames) { } /** - * Creates a new window by sliding current by moveFrames. If new window overlaps with current Frames are reused. - * So there is no risk of losing data when start/end update frame in a past window. + * Creates a new window by sliding current by moveFrames. If new window overlaps with current window Frames are reused. + * So there is no risk of losing data when start/endTask updates counters in the past window frame. */ FrameWindow moveBy(long moveFrames) { // a new frame always starts with previous ongoing tasks From 36374fc3383ee5aa7e31364709ae288d66b8b9cd Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Wed, 30 Jul 2025 20:06:35 -0700 Subject: [PATCH 18/20] window and frames --- .../ThreadPoolUtilizationBenchmark.java | 14 +- .../common/util/concurrent/EsExecutors.java | 20 ++- ...utionTimeTrackingEsThreadPoolExecutor.java | 127 +++++++++--------- .../concurrent/FramedTimeTrackerTests.java | 93 ++++++------- ...TimeTrackingEsThreadPoolExecutorTests.java | 3 +- 5 files changed, 140 insertions(+), 117 deletions(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java index 1948777ed415b..a01bf77905f17 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java @@ -14,6 +14,7 @@ import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; @@ -23,11 +24,13 @@ import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.infra.Blackhole; +import java.time.Duration; import java.util.concurrent.TimeUnit; @Threads(Threads.MAX) @BenchmarkMode(Mode.SampleTime) @OutputTimeUnit(TimeUnit.MICROSECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.MINUTES) @State(Scope.Benchmark) @Fork(1) public class ThreadPoolUtilizationBenchmark { @@ -38,14 +41,19 @@ public class ThreadPoolUtilizationBenchmark { /** * This makes very little difference, all the overhead is in the synchronization */ - @Param({ "10" }) - private int utilizationIntervalMs; + @Param({ "1000" }) + private int frameDurationMs; + + @Param({"10000"}) + private int reportingDurationMs; + private TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedTimeTracker timeTracker; @Setup public void setup() { timeTracker = new TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedTimeTracker( - TimeUnit.MILLISECONDS.toNanos(utilizationIntervalMs), + Duration.ofMillis(reportingDurationMs).toNanos(), + Duration.ofMillis(frameDurationMs).toNanos(), System::nanoTime ); } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 14e2e59bbf3df..cd155754f8d62 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -579,16 +579,17 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { /** * @param trackExecutionTime Whether to track execution stats - * @param trackUtilization enables thread-pool utilization metrics - * @param utilizationInterval when utilization is enabled, specifies interval for measurement + * @param trackUtilization Enables thread-pool utilization metrics + * @param utilizationReportingInterval When utilization is enabled, specifies interval for measurement * @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks - * @param trackMaxQueueLatency Whether to track max queue latency. - * @param executionTimeEwmaAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage). + * @param trackMaxQueueLatency Whether to track max queue latency + * @param executionTimeEwmaAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage) */ public record TaskTrackingConfig( boolean trackExecutionTime, boolean trackUtilization, - Duration utilizationInterval, + Duration utilizationReportingInterval, + Duration utilizationSamplingInterval, boolean trackOngoingTasks, boolean trackMaxQueueLatency, double executionTimeEwmaAlpha @@ -597,11 +598,13 @@ public record TaskTrackingConfig( // This is a random starting point alpha. public static final double DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST = 0.3; public static final Duration DEFAULT_UTILIZATION_INTERVAL = Duration.ofSeconds(30); + public static final Duration DEFAULT_UTILIZATION_SAMPLING_INTERVAL = Duration.ofSeconds(1); public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig( false, false, DEFAULT_UTILIZATION_INTERVAL, + DEFAULT_UTILIZATION_SAMPLING_INTERVAL, false, false, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST @@ -611,6 +614,7 @@ public record TaskTrackingConfig( true, true, DEFAULT_UTILIZATION_INTERVAL, + DEFAULT_UTILIZATION_SAMPLING_INTERVAL, false, false, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST @@ -627,6 +631,7 @@ public static class Builder { private boolean trackMaxQueueLatency = false; private double ewmaAlpha = DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST; private Duration utilizationInterval = DEFAULT_UTILIZATION_INTERVAL; + private Duration utilizationSamplingInterval = DEFAULT_UTILIZATION_SAMPLING_INTERVAL; public Builder() {} @@ -637,9 +642,11 @@ public Builder trackExecutionTime(double alpha) { return this; } - public Builder trackUtilization(Duration interval) { + public Builder trackUtilization(Duration interval, Duration samplingInterval) { + assert interval.dividedBy(samplingInterval) > 0 : "interval should be same or larger than sampling interval"; trackUtilization = true; utilizationInterval = interval; + utilizationSamplingInterval = samplingInterval; return this; } @@ -658,6 +665,7 @@ public TaskTrackingConfig build() { trackExecutionTime, trackUtilization, utilizationInterval, + utilizationSamplingInterval, trackOngoingTasks, trackMaxQueueLatency, ewmaAlpha diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index f438e10bef3e4..f1284677beb32 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -19,6 +19,7 @@ import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.threadpool.ThreadPool; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -73,7 +74,10 @@ public TaskExecutionTimeTrackingEsThreadPoolExecutor( this.runnableWrapper = runnableWrapper; this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.executionTimeEwmaAlpha(), 0); this.trackingConfig = trackingConfig; - this.framedTimeTracker = new FramedTimeTracker(trackingConfig.utilizationInterval().toNanos()); + this.framedTimeTracker = new FramedTimeTracker( + trackingConfig.utilizationReportingInterval(), + trackingConfig.utilizationSamplingInterval() + ); } public List setupMetrics(MeterRegistry meterRegistry, String threadPoolName) { @@ -154,7 +158,7 @@ public TaskTrackingConfig trackingConfig() { } /** - * Returns thread-pool utilization from last completed time interval(frame) {@link TaskTrackingConfig#utilizationInterval()}. + * Returns thread-pool utilization from last completed time interval(frame) {@link TaskTrackingConfig#utilizationReportingInterval()}. * Utilization is measured as {@code all-threads-total-execution-time / (total-thread-count * interval)}. * This metric is updated once per interval, and returns last completed measurement. For example: * if interval is 30 seconds, at clock time 00:30-01:00 it will return utilization from 00:00-00:30. @@ -164,7 +168,7 @@ public TaskTrackingConfig trackingConfig() { * framedTimeTracker to remember multiple past frames, and return aggregated view from here. */ public double utilization() { - return (double) framedTimeTracker.previousFrameTime() / (double) getMaximumPoolSize() / (double) framedTimeTracker.interval; + return (double) framedTimeTracker.totalTime() / (double) getMaximumPoolSize() / (double) framedTimeTracker.reportingInterval(); } @Override @@ -255,33 +259,35 @@ public boolean trackingMaxQueueLatency() { } /** - * Tracks threads execution in continuous, non-overlapping, and even time frames. Provides accurate total execution time measurement - * for past frames, specifically previous frame (now - 1 frame) to measure utilization. - * - * Can be extended to remember multiple past frames. + * Tracks threads execution in continuous, non-overlapping, and even time frames. */ public static class FramedTimeTracker { - private final long interval; + private final long reportingInterval; + private final long frameDuration; private final Supplier timeNow; - private final AtomicReference frameWindowRef = new AtomicReference<>(new FrameWindow()); + private final AtomicReference frameWindowRef; private final AtomicBoolean updatingFrame = new AtomicBoolean(); private final AtomicLong currentFrameNum = new AtomicLong(); // for testing - public FramedTimeTracker(long intervalNano, Supplier timeNow) { - assert intervalNano > 0; - this.interval = intervalNano; + public FramedTimeTracker(long reportingInterval, long frameDuration, Supplier timeNow) { + assert reportingInterval / frameDuration > 0; + this.reportingInterval = reportingInterval; + this.frameDuration = frameDuration; this.timeNow = timeNow; + this.frameWindowRef = new AtomicReference<>(FrameWindow.empty((int) (reportingInterval/frameDuration))); } - FramedTimeTracker(long intervalNano) { - assert intervalNano > 0; - this.interval = intervalNano; - this.timeNow = System::nanoTime; + FramedTimeTracker(Duration reportingInterval, Duration frameInterval) { + this( + reportingInterval.toNanos(), + frameInterval.toNanos(), + System::nanoTime + ); } - public long interval() { - return interval; + public long reportingInterval() { + return reportingInterval; } /** @@ -328,10 +334,10 @@ private FrameWindow getWindow(long now) { */ public void startTask() { final var nowTime = timeNow.get(); - final var now = nowTime / interval; + final var now = nowTime / frameDuration; final var frameWindow = getWindow(now); - frameWindow.now().ongoingTasks.increment(); - frameWindow.now().startEndDiff.add((now + 1) * interval - nowTime); + frameWindow.frames[0].ongoingTasks.increment(); + frameWindow.frames[0].startEndDiff.add((now + 1) * frameDuration - nowTime); } /** @@ -339,23 +345,27 @@ public void startTask() { */ public void endTask() { final var nowTime = timeNow.get(); - final var now = nowTime / interval; + final var now = nowTime / frameDuration; final var frameWindow = getWindow(now); - frameWindow.now().ongoingTasks.decrement(); - frameWindow.now().startEndDiff.add(-((now + 1) * interval - nowTime)); + frameWindow.frames[0].ongoingTasks.decrement(); + frameWindow.frames[0].startEndDiff.add(-((now + 1) * frameDuration - nowTime)); } /** - * Returns previous frame total execution time + * Returns total execution time from last interval. */ - public long previousFrameTime() { - final var now = timeNow.get() / interval; + public long totalTime() { + final var now = timeNow.get() / frameDuration; final var frameWindow = getWindow(now); // total time is sum of ongoing tasks in frame N-1 and all starts and ends in N frame // so for the previous frame (now-1), it would be (now-2) ongoing tasks + (now -1) start/end tasks - final var ongoingTasks = frameWindow.now(-2).ongoingTasks.sum(); - final var startEndDiff = frameWindow.now(-1).startEndDiff.sum(); - return ongoingTasks * interval + startEndDiff; + var totalTime = 0L; + for (var i = 1; i < frameWindow.frames.length - 1; i++) { // first and last frames are not used, see FrameWindow description + final var ongoingTasks = frameWindow.frames[i + 1].ongoingTasks.sum(); + final var startEndDiff = frameWindow.frames[i].startEndDiff.sum(); + totalTime += ongoingTasks * frameDuration + startEndDiff; + } + return totalTime; } /** @@ -369,19 +379,20 @@ record Frame(LongAdder ongoingTasks, LongAdder startEndDiff) { } /** - * A frame window represent 3 consecutive frames. frames[0] is now, frames[1] is now-1. + * A frame window of consecutive frames. frames[0] is now, frames[1] is now-1. + * To calculate frame time usage we need to know ongoing-tasks from previous frame and all starts and ends in current frame. + * That means we cannot calculate time for first and last frames in the array. First one is in-progress, last one has no information + * about previous ongoing tasks. So completed frames are between 1..length-2. */ record FrameWindow(Frame[] frames) { - FrameWindow() { - this(new Frame(), new Frame(), new Frame()); - } - - FrameWindow(Frame past2, Frame past1, Frame now) { - this(new Frame[] { now, past1, past2 }); - } - FrameWindow { - assert frames.length == 3; + static FrameWindow empty(int size) { + // first and last frames are incomplete, adding two more frames + final var frames = new Frame[size + 2]; + for (var i = 0; i < frames.length; i++) { + frames[i] = new Frame(); + } + return new FrameWindow(frames); } /** @@ -389,30 +400,22 @@ record FrameWindow(Frame[] frames) { * So there is no risk of losing data when start/endTask updates counters in the past window frame. */ FrameWindow moveBy(long moveFrames) { - // a new frame always starts with previous ongoing tasks - final var ongoingTasks = now().ongoingTasks.sum(); - final FrameWindow newWindow; - if (moveFrames == 1) { - newWindow = new FrameWindow(now(-1), now(), new Frame()); - } else if (moveFrames == 2) { - newWindow = new FrameWindow(now(), new Frame(), new Frame()); - } else { - newWindow = new FrameWindow(); + if (moveFrames == 0) { + return this; } - // propagate ongoing tasks to all new frames - for (var newFrame = 0; newFrame < Math.min(moveFrames, 3); newFrame++) { - newWindow.frames[newFrame].ongoingTasks.add(ongoingTasks); + final var newFramesNum = (int) Math.min(frames.length, moveFrames); // non-overlapping frames with current window + final var newFrames = new Frame[frames.length]; + // copy overlapping frames to the end of array + System.arraycopy(frames, 0, newFrames, newFramesNum, frames.length - newFramesNum); + // initialize new frames in the beginning of array + // a new frame always starts with last known ongoing tasks + final var ongoingTasks = frames[0].ongoingTasks.sum(); + for (var i=0; i= -2 && offset <= 0; - return frames[-offset]; + return new FrameWindow(newFrames); } } } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java index be68e80bc24eb..cc49b11a463c8 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java @@ -21,8 +21,15 @@ public class FramedTimeTrackerTests extends ESTestCase { private FakeTime fakeTime; - FramedTimeTracker newTracker(long interval) { - return new FramedTimeTracker(interval, fakeTime); + private final int frame = 10; + private final int windowLen = 30; + private final int window = windowLen * frame; + + /** + * creates new tracker with frame interval=1 and and windowSize = 100 frames + */ + FramedTimeTracker newTracker() { + return new FramedTimeTracker(window, frame, fakeTime); } @Before @@ -31,70 +38,66 @@ public void setup() { } public void testNoTasks() { - var tracker = newTracker(1); - tracker.previousFrameTime(); - assertEquals(0, tracker.previousFrameTime()); - fakeTime.time += between(1, 100); - assertEquals(0, tracker.previousFrameTime()); + var tracker = newTracker(); + assertEquals(0, tracker.totalTime()); + fakeTime.time += randomNonNegativeInt(); + assertEquals(0, tracker.totalTime()); } - public void testSingleFrameTask() { - var tracker = newTracker(100); - fakeTime.time += 10; - tracker.startTask(); - fakeTime.time += 10; - tracker.endTask(); - fakeTime.time += tracker.interval(); - assertEquals(10, tracker.previousFrameTime()); - } - - public void testTwoFrameTask() { - var tracker = newTracker(100); - var startTime = between(0, 100); - var taskDuration = tracker.interval(); - fakeTime.time += startTime; + public void testSingleWindow() { + var tracker = newTracker(); + var startOffset = between(0, window / 2); + fakeTime.time += startOffset; tracker.startTask(); + var taskDuration = between(0, window / 2); fakeTime.time += taskDuration; tracker.endTask(); - assertEquals(tracker.interval() - startTime, tracker.previousFrameTime()); + fakeTime.time += frame; + assertEquals(taskDuration, tracker.totalTime()); } - public void testMultiFrameTask() { - var interval = 10; - var tracker = newTracker(interval); + public void testMultiWindow() { + var tracker = newTracker(); + var startTime = between(0, frame); + var taskDuration = between(1, 10) * window; + fakeTime.time += startTime; tracker.startTask(); - var taskDuration = between(3, 100) * interval; fakeTime.time += taskDuration; tracker.endTask(); - assertEquals(tracker.interval(), tracker.previousFrameTime()); + fakeTime.time += frame; + assertEquals("must run for the whole window, except last frame", window - (frame - startTime), (int) tracker.totalTime()); } public void testOngoingTask() { - var interval = 10; - var tracker = newTracker(interval); + var tracker = newTracker(); tracker.startTask(); - for (int i = 0; i < between(10, 100); i++) { - fakeTime.time += tracker.interval(); - assertEquals(tracker.interval(), tracker.previousFrameTime()); + // fill first window + for (int i = 0; i < windowLen; i++) { + assertEquals(frame * i, tracker.totalTime()); + fakeTime.time += frame; + } + // after first window is filled, it's always full then + for (int i = 0; i < between(0, 1000); i++) { + assertEquals(window, tracker.totalTime()); + fakeTime.time += frame; } } public void testMultipleTasks() { - var interval = between(1, 100) * 2; // using integer division by 2 below - var tracker = newTracker(interval); - var halfIntervalTasks = between(1, 10); + var tracker = newTracker(); + var halfWindowTasks = between(1, 10); var notEndingTasks = between(1, 10); - range(0, halfIntervalTasks + notEndingTasks).forEach(t -> tracker.startTask()); - fakeTime.time += interval / 2; - range(0, halfIntervalTasks).forEach(t -> tracker.endTask()); - fakeTime.time += interval / 2; - var firstFrameTotalTime = interval * halfIntervalTasks / 2 + interval * notEndingTasks; - assertEquals(firstFrameTotalTime, tracker.previousFrameTime()); + range(0, halfWindowTasks + notEndingTasks).forEach(t -> tracker.startTask()); + fakeTime.time += window / 2; + range(0, halfWindowTasks).forEach(t -> tracker.endTask()); + fakeTime.time += window / 2; + var firstFrameTotalTime = window * halfWindowTasks / 2 + window * notEndingTasks; + assertEquals(firstFrameTotalTime, tracker.totalTime()); - fakeTime.time += interval; - var secondFrameTotalTime = interval * notEndingTasks; - assertEquals(secondFrameTotalTime, tracker.previousFrameTime()); + fakeTime.time += window; + var secondFrameTotalTime = window * notEndingTasks; + assertEquals(secondFrameTotalTime, tracker.totalTime()); } static class FakeTime implements Supplier { diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index e6890262aed39..cd75dd3b0bf7e 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -233,6 +233,7 @@ public void testGetOngoingTasks() throws Exception { public void testUtilization() throws InterruptedException { final var interval = Duration.ofMillis(100); + final var samplingInterval = Duration.ofMillis(10); final Consumer trySleep = (d) -> { try { @@ -256,7 +257,7 @@ public void testUtilization() throws InterruptedException { 4, Executors.defaultThreadFactory(), new ThreadContext(Settings.EMPTY), - EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(0.3).trackUtilization(interval).build() + EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(0.3).trackUtilization(interval, samplingInterval).build() ); try { From 3dd755ec08f4a7ce133072a09e2239ec6c47458c Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 31 Jul 2025 03:20:11 +0000 Subject: [PATCH 19/20] [CI] Auto commit changes from spotless --- .../concurrent/ThreadPoolUtilizationBenchmark.java | 2 +- .../TaskExecutionTimeTrackingEsThreadPoolExecutor.java | 10 +++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java index a01bf77905f17..62aed0f196690 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java @@ -44,7 +44,7 @@ public class ThreadPoolUtilizationBenchmark { @Param({ "1000" }) private int frameDurationMs; - @Param({"10000"}) + @Param({ "10000" }) private int reportingDurationMs; private TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedTimeTracker timeTracker; diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index f1284677beb32..37a1ea9279647 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -275,15 +275,11 @@ public FramedTimeTracker(long reportingInterval, long frameDuration, Supplier(FrameWindow.empty((int) (reportingInterval/frameDuration))); + this.frameWindowRef = new AtomicReference<>(FrameWindow.empty((int) (reportingInterval / frameDuration))); } FramedTimeTracker(Duration reportingInterval, Duration frameInterval) { - this( - reportingInterval.toNanos(), - frameInterval.toNanos(), - System::nanoTime - ); + this(reportingInterval.toNanos(), frameInterval.toNanos(), System::nanoTime); } public long reportingInterval() { @@ -410,7 +406,7 @@ FrameWindow moveBy(long moveFrames) { // initialize new frames in the beginning of array // a new frame always starts with last known ongoing tasks final var ongoingTasks = frames[0].ongoingTasks.sum(); - for (var i=0; i Date: Wed, 30 Jul 2025 21:09:37 -0700 Subject: [PATCH 20/20] fix --- .../common/util/concurrent/EsExecutors.java | 3 ++- .../common/util/concurrent/FramedTimeTrackerTests.java | 9 ++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index cd155754f8d62..ee18d71922445 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -580,7 +580,8 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { /** * @param trackExecutionTime Whether to track execution stats * @param trackUtilization Enables thread-pool utilization metrics - * @param utilizationReportingInterval When utilization is enabled, specifies interval for measurement + * @param utilizationReportingInterval When utilization is enabled, specifies interval for reporting utilization, default 30 seconds + * @param utilizationSamplingInterval When utilization is enabled, specifies sample interval, default 1 second * @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks * @param trackMaxQueueLatency Whether to track max queue latency * @param executionTimeEwmaAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java index cc49b11a463c8..6af5542630441 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java @@ -19,11 +19,10 @@ public class FramedTimeTrackerTests extends ESTestCase { - private FakeTime fakeTime; - private final int frame = 10; private final int windowLen = 30; private final int window = windowLen * frame; + private FakeTime fakeTime; /** * creates new tracker with frame interval=1 and and windowSize = 100 frames @@ -46,10 +45,10 @@ public void testNoTasks() { public void testSingleWindow() { var tracker = newTracker(); - var startOffset = between(0, window / 2); + var startOffset = between(0, window / 2 - 1); fakeTime.time += startOffset; tracker.startTask(); - var taskDuration = between(0, window / 2); + var taskDuration = between(0, window / 2 - 1); fakeTime.time += taskDuration; tracker.endTask(); fakeTime.time += frame; @@ -58,7 +57,7 @@ public void testSingleWindow() { public void testMultiWindow() { var tracker = newTracker(); - var startTime = between(0, frame); + var startTime = between(0, frame - 1); var taskDuration = between(1, 10) * window; fakeTime.time += startTime; tracker.startTask();