diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java new file mode 100644 index 0000000000000..62aed0f196690 --- /dev/null +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.benchmark.common.util.concurrent; + +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.infra.Blackhole; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +@Threads(Threads.MAX) +@BenchmarkMode(Mode.SampleTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.MINUTES) +@State(Scope.Benchmark) +@Fork(1) +public class ThreadPoolUtilizationBenchmark { + + @Param({ "10000" }) + private int callIntervalTicks; + + /** + * This makes very little difference, all the overhead is in the synchronization + */ + @Param({ "1000" }) + private int frameDurationMs; + + @Param({ "10000" }) + private int reportingDurationMs; + + private TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedTimeTracker timeTracker; + + @Setup + public void setup() { + timeTracker = new TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedTimeTracker( + Duration.ofMillis(reportingDurationMs).toNanos(), + Duration.ofMillis(frameDurationMs).toNanos(), + System::nanoTime + ); + } + + @Benchmark + public void baseline() { + Blackhole.consumeCPU(callIntervalTicks); + } + + @Group("StartAndEnd") + @Benchmark + public void startAndStopTasks() { + timeTracker.startTask(); + Blackhole.consumeCPU(callIntervalTicks); + timeTracker.endTask(); + } +} diff --git a/docs/changelog/131898.yaml b/docs/changelog/131898.yaml new file mode 100644 index 0000000000000..f90985795879c --- /dev/null +++ b/docs/changelog/131898.yaml @@ -0,0 +1,5 @@ +pr: 131898 +summary: Time framed thread-pool utilization +area: Allocation +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index c39ce209bf875..ee18d71922445 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -17,6 +17,7 @@ import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.node.Node; +import java.time.Duration; import java.util.List; import java.util.Optional; import java.util.concurrent.AbstractExecutorService; @@ -576,80 +577,80 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { } } - public static class TaskTrackingConfig { + /** + * @param trackExecutionTime Whether to track execution stats + * @param trackUtilization Enables thread-pool utilization metrics + * @param utilizationReportingInterval When utilization is enabled, specifies interval for reporting utilization, default 30 seconds + * @param utilizationSamplingInterval When utilization is enabled, specifies sample interval, default 1 second + * @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks + * @param trackMaxQueueLatency Whether to track max queue latency + * @param executionTimeEwmaAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage) + */ + public record TaskTrackingConfig( + boolean trackExecutionTime, + boolean trackUtilization, + Duration utilizationReportingInterval, + Duration utilizationSamplingInterval, + boolean trackOngoingTasks, + boolean trackMaxQueueLatency, + double executionTimeEwmaAlpha + ) { + // This is a random starting point alpha. public static final double DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST = 0.3; - - private final boolean trackExecutionTime; - private final boolean trackOngoingTasks; - private final boolean trackMaxQueueLatency; - private final double executionTimeEwmaAlpha; + public static final Duration DEFAULT_UTILIZATION_INTERVAL = Duration.ofSeconds(30); + public static final Duration DEFAULT_UTILIZATION_SAMPLING_INTERVAL = Duration.ofSeconds(1); public static final TaskTrackingConfig DO_NOT_TRACK = new TaskTrackingConfig( false, false, + DEFAULT_UTILIZATION_INTERVAL, + DEFAULT_UTILIZATION_SAMPLING_INTERVAL, + false, false, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST ); + public static final TaskTrackingConfig DEFAULT = new TaskTrackingConfig( true, + true, + DEFAULT_UTILIZATION_INTERVAL, + DEFAULT_UTILIZATION_SAMPLING_INTERVAL, false, false, DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST ); - /** - * @param trackExecutionTime Whether to track execution stats - * @param trackOngoingTasks Whether to track ongoing task execution time, not just finished tasks - * @param trackMaxQueueLatency Whether to track max queue latency. - * @param executionTimeEWMAAlpha The alpha seed for execution time EWMA (ExponentiallyWeightedMovingAverage). - */ - private TaskTrackingConfig( - boolean trackExecutionTime, - boolean trackOngoingTasks, - boolean trackMaxQueueLatency, - double executionTimeEWMAAlpha - ) { - this.trackExecutionTime = trackExecutionTime; - this.trackOngoingTasks = trackOngoingTasks; - this.trackMaxQueueLatency = trackMaxQueueLatency; - this.executionTimeEwmaAlpha = executionTimeEWMAAlpha; - } - - public boolean trackExecutionTime() { - return trackExecutionTime; - } - - public boolean trackOngoingTasks() { - return trackOngoingTasks; - } - - public boolean trackMaxQueueLatency() { - return trackMaxQueueLatency; - } - - public double getExecutionTimeEwmaAlpha() { - return executionTimeEwmaAlpha; - } - public static Builder builder() { return new Builder(); } public static class Builder { private boolean trackExecutionTime = false; + private boolean trackUtilization = false; private boolean trackOngoingTasks = false; private boolean trackMaxQueueLatency = false; private double ewmaAlpha = DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST; + private Duration utilizationInterval = DEFAULT_UTILIZATION_INTERVAL; + private Duration utilizationSamplingInterval = DEFAULT_UTILIZATION_SAMPLING_INTERVAL; public Builder() {} public Builder trackExecutionTime(double alpha) { trackExecutionTime = true; + trackUtilization = true; ewmaAlpha = alpha; return this; } + public Builder trackUtilization(Duration interval, Duration samplingInterval) { + assert interval.dividedBy(samplingInterval) > 0 : "interval should be same or larger than sampling interval"; + trackUtilization = true; + utilizationInterval = interval; + utilizationSamplingInterval = samplingInterval; + return this; + } + public Builder trackOngoingTasks() { trackOngoingTasks = true; return this; @@ -661,7 +662,15 @@ public Builder trackMaxQueueLatency() { } public TaskTrackingConfig build() { - return new TaskTrackingConfig(trackExecutionTime, trackOngoingTasks, trackMaxQueueLatency, ewmaAlpha); + return new TaskTrackingConfig( + trackExecutionTime, + trackUtilization, + utilizationInterval, + utilizationSamplingInterval, + trackOngoingTasks, + trackMaxQueueLatency, + ewmaAlpha + ); } } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 762a8c280b7f3..37a1ea9279647 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -19,6 +19,7 @@ import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.threadpool.ThreadPool; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -27,9 +28,13 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; +import java.util.function.Supplier; import static org.elasticsearch.threadpool.ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE_TIME; import static org.elasticsearch.threadpool.ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION; @@ -44,22 +49,14 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea private final Function runnableWrapper; private final ExponentiallyWeightedMovingAverage executionEWMA; private final LongAdder totalExecutionTime = new LongAdder(); - private final boolean trackOngoingTasks; // The set of currently running tasks and the timestamp of when they started execution in the Executor. private final Map ongoingTasks = new ConcurrentHashMap<>(); private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS); - private final boolean trackMaxQueueLatency; + private final TaskTrackingConfig trackingConfig; + private final FramedTimeTracker framedTimeTracker; private LongAccumulator maxQueueLatencyMillisSinceLastPoll = new LongAccumulator(Long::max, 0); - public enum UtilizationTrackingPurpose { - APM, - ALLOCATION, - } - - private volatile UtilizationTracker apmUtilizationTracker = new UtilizationTracker(); - private volatile UtilizationTracker allocationUtilizationTracker = new UtilizationTracker(); - - TaskExecutionTimeTrackingEsThreadPoolExecutor( + public TaskExecutionTimeTrackingEsThreadPoolExecutor( String name, int corePoolSize, int maximumPoolSize, @@ -75,9 +72,12 @@ public enum UtilizationTrackingPurpose { super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder); this.runnableWrapper = runnableWrapper; - this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getExecutionTimeEwmaAlpha(), 0); - this.trackOngoingTasks = trackingConfig.trackOngoingTasks(); - this.trackMaxQueueLatency = trackingConfig.trackMaxQueueLatency(); + this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.executionTimeEwmaAlpha(), 0); + this.trackingConfig = trackingConfig; + this.framedTimeTracker = new FramedTimeTracker( + trackingConfig.utilizationReportingInterval(), + trackingConfig.utilizationSamplingInterval() + ); } public List setupMetrics(MeterRegistry meterRegistry, String threadPoolName) { @@ -105,7 +105,7 @@ public List setupMetrics(MeterRegistry meterRegistry, String threadP ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION, "fraction of maximum thread time utilized for " + threadPoolName, "fraction", - () -> new DoubleWithAttributes(pollUtilization(UtilizationTrackingPurpose.APM), Map.of()) + () -> new DoubleWithAttributes(utilization(), Map.of()) ) ); } @@ -147,34 +147,33 @@ public int getCurrentQueueSize() { } public long getMaxQueueLatencyMillisSinceLastPollAndReset() { - if (trackMaxQueueLatency == false) { + if (trackingConfig.trackMaxQueueLatency() == false) { return 0; } return maxQueueLatencyMillisSinceLastPoll.getThenReset(); } + public TaskTrackingConfig trackingConfig() { + return trackingConfig; + } + /** - * Returns the fraction of the maximum possible thread time that was actually used since the last time this method was called. - * There are two periodic pulling mechanisms that access utilization reporting: {@link UtilizationTrackingPurpose} distinguishes the - * caller. + * Returns thread-pool utilization from last completed time interval(frame) {@link TaskTrackingConfig#utilizationReportingInterval()}. + * Utilization is measured as {@code all-threads-total-execution-time / (total-thread-count * interval)}. + * This metric is updated once per interval, and returns last completed measurement. For example: + * if interval is 30 seconds, at clock time 00:30-01:00 it will return utilization from 00:00-00:30. + * There is no synchronization with clocks and system time. * - * @return the utilization as a fraction, in the range [0, 1]. This may return >1 if a task completed in the time range but started - * earlier, contributing a larger execution time. + * If caller needs longer intervals it should poll on every tracker-interval and aggregate on it's own. Another option is to extend + * framedTimeTracker to remember multiple past frames, and return aggregated view from here. */ - public double pollUtilization(UtilizationTrackingPurpose utilizationTrackingPurpose) { - switch (utilizationTrackingPurpose) { - case APM: - return apmUtilizationTracker.pollUtilization(); - case ALLOCATION: - return allocationUtilizationTracker.pollUtilization(); - default: - throw new IllegalStateException("No operation defined for [" + utilizationTrackingPurpose + "]"); - } + public double utilization() { + return (double) framedTimeTracker.totalTime() / (double) getMaximumPoolSize() / (double) framedTimeTracker.reportingInterval(); } @Override protected void beforeExecute(Thread t, Runnable r) { - if (trackOngoingTasks) { + if (trackingConfig.trackOngoingTasks()) { ongoingTasks.put(r, System.nanoTime()); } @@ -186,9 +185,12 @@ protected void beforeExecute(Thread t, Runnable r) { var queueLatencyMillis = TimeUnit.NANOSECONDS.toMillis(taskQueueLatency); queueLatencyMillisHistogram.addObservation(queueLatencyMillis); - if (trackMaxQueueLatency) { + if (trackingConfig.trackMaxQueueLatency()) { maxQueueLatencyMillisSinceLastPoll.accumulate(queueLatencyMillis); } + if (trackingConfig.trackUtilization()) { + framedTimeTracker.startTask(); + } } @Override @@ -213,10 +215,13 @@ protected void afterExecute(Runnable r, Throwable t) { executionEWMA.addValue(taskExecutionNanos); totalExecutionTime.add(taskExecutionNanos); } + if (trackingConfig.trackUtilization()) { + framedTimeTracker.endTask(); + } } finally { // if trackOngoingTasks is false -> ongoingTasks must be empty - assert trackOngoingTasks || ongoingTasks.isEmpty(); - if (trackOngoingTasks) { + assert trackingConfig.trackOngoingTasks() || ongoingTasks.isEmpty(); + if (trackingConfig.trackOngoingTasks()) { ongoingTasks.remove(r); } } @@ -240,7 +245,7 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) { * task is reflected in at least one of those two values. */ public Map getOngoingTasks() { - return trackOngoingTasks ? Map.copyOf(ongoingTasks) : Map.of(); + return trackingConfig.trackOngoingTasks() ? Map.copyOf(ongoingTasks) : Map.of(); } // Used for testing @@ -250,33 +255,164 @@ public double getExecutionEwmaAlpha() { // Used for testing public boolean trackingMaxQueueLatency() { - return trackMaxQueueLatency; + return trackingConfig.trackMaxQueueLatency(); } /** - * Supports periodic polling for thread pool utilization. Tracks state since the last polling request so that the average utilization - * since the last poll can be calculated for the next polling request. - * - * Uses the difference of {@link #totalExecutionTime} since the last polling request to determine how much activity has occurred. + * Tracks threads execution in continuous, non-overlapping, and even time frames. */ - private class UtilizationTracker { - long lastPollTime = System.nanoTime(); - long lastTotalExecutionTime = 0; + public static class FramedTimeTracker { + private final long reportingInterval; + private final long frameDuration; + private final Supplier timeNow; + private final AtomicReference frameWindowRef; + private final AtomicBoolean updatingFrame = new AtomicBoolean(); + private final AtomicLong currentFrameNum = new AtomicLong(); + + // for testing + public FramedTimeTracker(long reportingInterval, long frameDuration, Supplier timeNow) { + assert reportingInterval / frameDuration > 0; + this.reportingInterval = reportingInterval; + this.frameDuration = frameDuration; + this.timeNow = timeNow; + this.frameWindowRef = new AtomicReference<>(FrameWindow.empty((int) (reportingInterval / frameDuration))); + } - public synchronized double pollUtilization() { - final long currentTotalExecutionTimeNanos = totalExecutionTime.sum(); - final long currentPollTimeNanos = System.nanoTime(); + FramedTimeTracker(Duration reportingInterval, Duration frameInterval) { + this(reportingInterval.toNanos(), frameInterval.toNanos(), System::nanoTime); + } - final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime; - final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime; + public long reportingInterval() { + return reportingInterval; + } - final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize(); - final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos; + /** + * Returns current FrameWindow. If window is stale, it will slide to current time. + * @param now - current frame + */ + private FrameWindow getWindow(long now) { + var current = currentFrameNum.get(); + // first time in new frame + if (current < now) { + // only one thread will perform frame update, others spinWait + if (updatingFrame.compareAndSet(false, true)) { + final var moveOffset = now - current; + final var newWindow = frameWindowRef.get().moveBy(moveOffset); + frameWindowRef.set(newWindow); + currentFrameNum.set(now); + updatingFrame.set(false); + } else { + while (updatingFrame.get()) { + Thread.onSpinWait(); + } + // an edge case when all the following happen: + // 1. window was stale, at least 1 frame + // 2. two or more threads try to update window + // 3. it's happening at the end of the frame, beginning new frame + // for example, lets say interval is 10 + // and there are two concurrent calls getWindow(9)->frame0 and getWindow(10)->frame1 + // both need to update window, but those are different windows, + // two things might happen: + // 1. getWindow(9) updates window and uses it, but getWindow(10) need to update window again + // 2. getWindow(10) updates window, then getWindow(9) will see a newer window, so we record task in a newer frame, + // basically rounding-up frame when it's happening. + if (currentFrameNum.get() < now) { + return getWindow(now); + } + } + } + return frameWindowRef.get(); + } - lastTotalExecutionTime = currentTotalExecutionTimeNanos; - lastPollTime = currentPollTimeNanos; + /** + * Start tracking new task, assume that task runs indefinitely, or at least till end of frame. + * If task finishes sooner than end of interval {@link FramedTimeTracker#endTask()} will deduct remaining time. + */ + public void startTask() { + final var nowTime = timeNow.get(); + final var now = nowTime / frameDuration; + final var frameWindow = getWindow(now); + frameWindow.frames[0].ongoingTasks.increment(); + frameWindow.frames[0].startEndDiff.add((now + 1) * frameDuration - nowTime); + } + + /** + * Stop task tracking. We already assumed that task runs till end of frame, here we deduct not used time. + */ + public void endTask() { + final var nowTime = timeNow.get(); + final var now = nowTime / frameDuration; + final var frameWindow = getWindow(now); + frameWindow.frames[0].ongoingTasks.decrement(); + frameWindow.frames[0].startEndDiff.add(-((now + 1) * frameDuration - nowTime)); + } + + /** + * Returns total execution time from last interval. + */ + public long totalTime() { + final var now = timeNow.get() / frameDuration; + final var frameWindow = getWindow(now); + // total time is sum of ongoing tasks in frame N-1 and all starts and ends in N frame + // so for the previous frame (now-1), it would be (now-2) ongoing tasks + (now -1) start/end tasks + var totalTime = 0L; + for (var i = 1; i < frameWindow.frames.length - 1; i++) { // first and last frames are not used, see FrameWindow description + final var ongoingTasks = frameWindow.frames[i + 1].ongoingTasks.sum(); + final var startEndDiff = frameWindow.frames[i].startEndDiff.sum(); + totalTime += ongoingTasks * frameDuration + startEndDiff; + } + return totalTime; + } - return utilizationSinceLastPoll; + /** + * A single frame that tracks how many tasks are still running at the end of frame + * and diffs from task start and end. + */ + record Frame(LongAdder ongoingTasks, LongAdder startEndDiff) { + Frame() { + this(new LongAdder(), new LongAdder()); + } + } + + /** + * A frame window of consecutive frames. frames[0] is now, frames[1] is now-1. + * To calculate frame time usage we need to know ongoing-tasks from previous frame and all starts and ends in current frame. + * That means we cannot calculate time for first and last frames in the array. First one is in-progress, last one has no information + * about previous ongoing tasks. So completed frames are between 1..length-2. + */ + record FrameWindow(Frame[] frames) { + + static FrameWindow empty(int size) { + // first and last frames are incomplete, adding two more frames + final var frames = new Frame[size + 2]; + for (var i = 0; i < frames.length; i++) { + frames[i] = new Frame(); + } + return new FrameWindow(frames); + } + + /** + * Creates a new window by sliding current by moveFrames. If new window overlaps with current window Frames are reused. + * So there is no risk of losing data when start/endTask updates counters in the past window frame. + */ + FrameWindow moveBy(long moveFrames) { + if (moveFrames == 0) { + return this; + } + final var newFramesNum = (int) Math.min(frames.length, moveFrames); // non-overlapping frames with current window + final var newFrames = new Frame[frames.length]; + // copy overlapping frames to the end of array + System.arraycopy(frames, 0, newFrames, newFramesNum, frames.length - newFramesNum); + // initialize new frames in the beginning of array + // a new frame always starts with last known ongoing tasks + final var ongoingTasks = frames[0].ongoingTasks.sum(); + for (var i = 0; i < newFramesNum; i++) { + final var frame = new Frame(); + frame.ongoingTasks.add(ongoingTasks); + newFrames[i] = frame; + } + return new FrameWindow(newFrames); + } } } } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java new file mode 100644 index 0000000000000..6af5542630441 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.function.Supplier; + +import static java.util.stream.IntStream.range; +import static org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedTimeTracker; + +public class FramedTimeTrackerTests extends ESTestCase { + + private final int frame = 10; + private final int windowLen = 30; + private final int window = windowLen * frame; + private FakeTime fakeTime; + + /** + * creates new tracker with frame interval=1 and and windowSize = 100 frames + */ + FramedTimeTracker newTracker() { + return new FramedTimeTracker(window, frame, fakeTime); + } + + @Before + public void setup() { + fakeTime = new FakeTime(); + } + + public void testNoTasks() { + var tracker = newTracker(); + assertEquals(0, tracker.totalTime()); + fakeTime.time += randomNonNegativeInt(); + assertEquals(0, tracker.totalTime()); + } + + public void testSingleWindow() { + var tracker = newTracker(); + var startOffset = between(0, window / 2 - 1); + fakeTime.time += startOffset; + tracker.startTask(); + var taskDuration = between(0, window / 2 - 1); + fakeTime.time += taskDuration; + tracker.endTask(); + fakeTime.time += frame; + assertEquals(taskDuration, tracker.totalTime()); + } + + public void testMultiWindow() { + var tracker = newTracker(); + var startTime = between(0, frame - 1); + var taskDuration = between(1, 10) * window; + fakeTime.time += startTime; + tracker.startTask(); + fakeTime.time += taskDuration; + tracker.endTask(); + fakeTime.time += frame; + assertEquals("must run for the whole window, except last frame", window - (frame - startTime), (int) tracker.totalTime()); + } + + public void testOngoingTask() { + var tracker = newTracker(); + tracker.startTask(); + // fill first window + for (int i = 0; i < windowLen; i++) { + assertEquals(frame * i, tracker.totalTime()); + fakeTime.time += frame; + } + // after first window is filled, it's always full then + for (int i = 0; i < between(0, 1000); i++) { + assertEquals(window, tracker.totalTime()); + fakeTime.time += frame; + } + } + + public void testMultipleTasks() { + var tracker = newTracker(); + var halfWindowTasks = between(1, 10); + var notEndingTasks = between(1, 10); + + range(0, halfWindowTasks + notEndingTasks).forEach(t -> tracker.startTask()); + fakeTime.time += window / 2; + range(0, halfWindowTasks).forEach(t -> tracker.endTask()); + fakeTime.time += window / 2; + var firstFrameTotalTime = window * halfWindowTasks / 2 + window * notEndingTasks; + assertEquals(firstFrameTotalTime, tracker.totalTime()); + + fakeTime.time += window; + var secondFrameTotalTime = window * notEndingTasks; + assertEquals(secondFrameTotalTime, tracker.totalTime()); + } + + static class FakeTime implements Supplier { + long time = 0; + + @Override + public Long get() { + return time; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index 505c26409a702..cd75dd3b0bf7e 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -17,13 +17,18 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import java.time.Duration; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.DoubleStream; +import static java.util.stream.IntStream.range; import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_EXECUTION_TIME_EWMA_ALPHA_FOR_TEST; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -226,6 +231,66 @@ public void testGetOngoingTasks() throws Exception { executor.awaitTermination(10, TimeUnit.SECONDS); } + public void testUtilization() throws InterruptedException { + final var interval = Duration.ofMillis(100); + final var samplingInterval = Duration.ofMillis(10); + + final Consumer trySleep = (d) -> { + try { + Thread.sleep(d); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + + final Runnable waitTillNextFrame = () -> { + var now = System.nanoTime(); + var waitTillNext = (now / interval.toNanos() + 1) * interval.toNanos() - now; + trySleep.accept(Duration.ofNanos(waitTillNext)); + }; + + final Function sleepTaskFn = (d) -> () -> trySleep.accept(d); + + var executor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) EsExecutors.newFixed( + "utilization", + 4, + 4, + Executors.defaultThreadFactory(), + new ThreadContext(Settings.EMPTY), + EsExecutors.TaskTrackingConfig.builder().trackExecutionTime(0.3).trackUtilization(interval, samplingInterval).build() + ); + + try { + // warm-up executor, reduces utilization metric jitter + waitTillNextFrame.run(); + range(0, 4).forEach(i -> executor.submit(sleepTaskFn.apply(Duration.ofMillis(1)))); + + // create 4 tasks that use different thread usage per interval and start at the beginning of the frame + // 1. small task with 10% + // 2. medium task with 30% + // 3. larger task with 50% + // 4 long running task that spans over multiple intervals(3), ie 100% utilization per frame + // total utilization for 4-threads-pool = (10 + 30 + 50 + 100) / (4 * 100) = 190/400 = 0.475 + // assuming overhead can be up to 10%, final range would be 0.475 - 0.575 + waitTillNextFrame.run(); + + DoubleStream.of(0.1, 0.3, 0.5, 3.).forEach(loadFactor -> { + var sleepTime = (long) (interval.toNanos() * loadFactor); + executor.submit(sleepTaskFn.apply(Duration.ofNanos(sleepTime))); + }); + + waitTillNextFrame.run(); + assertEquals(0.475, executor.utilization(), 0.1); + + // the long running task should still use 100% of a single thread, so 1/4=25% utilization + waitTillNextFrame.run(); + assertEquals(0.25, executor.utilization(), 0.1); + } finally { + executor.shutdown(); + executor.awaitTermination(0, TimeUnit.SECONDS); + } + } + public void testQueueLatencyHistogramMetrics() { RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); final var threadPoolName = randomIdentifier(); diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index 2cd166e002637..25bb95e9532be 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -20,7 +20,6 @@ import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; -import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose; import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.Node; import org.elasticsearch.telemetry.InstrumentType; @@ -38,7 +37,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT; @@ -49,10 +47,8 @@ import static org.elasticsearch.threadpool.ThreadPool.getMaxSnapshotThreadPoolSize; import static org.elasticsearch.threadpool.ThreadPool.halfAllocatedProcessorsMaxFive; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.lessThan; public class ThreadPoolTests extends ESTestCase { @@ -489,85 +485,6 @@ public void testScheduledFixedDelayForceExecution() { } } - public void testDetailedUtilizationMetric() throws Exception { - final RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); - final BuiltInExecutorBuilders builtInExecutorBuilders = new DefaultBuiltInExecutorBuilders(); - - final ThreadPool threadPool = new ThreadPool( - Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), - meterRegistry, - builtInExecutorBuilders - ); - try { - // write thread pool is tracked - final String threadPoolName = ThreadPool.Names.WRITE; - final MetricAsserter metricAsserter = new MetricAsserter(meterRegistry, threadPoolName); - final ThreadPool.Info threadPoolInfo = threadPool.info(threadPoolName); - final TaskExecutionTimeTrackingEsThreadPoolExecutor executor = asInstanceOf( - TaskExecutionTimeTrackingEsThreadPoolExecutor.class, - threadPool.executor(threadPoolName) - ); - - final long beforePreviousCollectNanos = System.nanoTime(); - meterRegistry.getRecorder().collect(); - double allocationUtilization = executor.pollUtilization(UtilizationTrackingPurpose.ALLOCATION); - final long afterPreviousCollectNanos = System.nanoTime(); - - var metricValue = metricAsserter.assertLatestMetricValueMatches( - InstrumentType.DOUBLE_GAUGE, - ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, - Measurement::getDouble, - equalTo(0.0d) - ); - logger.info("---> Utilization metric data points, APM: " + metricValue + ", Allocation: " + allocationUtilization); - assertThat(allocationUtilization, equalTo(0.0d)); - - final AtomicLong minimumDurationNanos = new AtomicLong(Long.MAX_VALUE); - final long beforeStartNanos = System.nanoTime(); - final CyclicBarrier barrier = new CyclicBarrier(2); - Future future = executor.submit(() -> { - long innerStartTimeNanos = System.nanoTime(); - safeSleep(100); - safeAwait(barrier); - minimumDurationNanos.set(System.nanoTime() - innerStartTimeNanos); - }); - safeAwait(barrier); - safeGet(future); - final long maxDurationNanos = System.nanoTime() - beforeStartNanos; - - // Wait for TaskExecutionTimeTrackingEsThreadPoolExecutor#afterExecute to run - assertBusy(() -> assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L))); - - final long beforeMetricsCollectedNanos = System.nanoTime(); - meterRegistry.getRecorder().collect(); - allocationUtilization = executor.pollUtilization(UtilizationTrackingPurpose.ALLOCATION); - final long afterMetricsCollectedNanos = System.nanoTime(); - - // Calculate upper bound on utilisation metric - final long minimumPollIntervalNanos = beforeMetricsCollectedNanos - afterPreviousCollectNanos; - final long minimumMaxExecutionTimeNanos = minimumPollIntervalNanos * threadPoolInfo.getMax(); - final double maximumUtilization = (double) maxDurationNanos / minimumMaxExecutionTimeNanos; - - // Calculate lower bound on utilisation metric - final long maximumPollIntervalNanos = afterMetricsCollectedNanos - beforePreviousCollectNanos; - final long maximumMaxExecutionTimeNanos = maximumPollIntervalNanos * threadPoolInfo.getMax(); - final double minimumUtilization = (double) minimumDurationNanos.get() / maximumMaxExecutionTimeNanos; - - logger.info("Utilization must be in [{}, {}]", minimumUtilization, maximumUtilization); - Matcher matcher = allOf(greaterThan(minimumUtilization), lessThan(maximumUtilization)); - metricValue = metricAsserter.assertLatestMetricValueMatches( - InstrumentType.DOUBLE_GAUGE, - ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION, - Measurement::getDouble, - matcher - ); - logger.info("---> Utilization metric data points, APM: " + metricValue + ", Allocation: " + allocationUtilization); - assertThat(allocationUtilization, matcher); - } finally { - ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); - } - } - public void testThreadCountMetrics() throws Exception { final RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); final BuiltInExecutorBuilders builtInExecutorBuilders = new DefaultBuiltInExecutorBuilders();