-
Notifications
You must be signed in to change notification settings - Fork 25.6k
allocation: time compute rounds by start time over last convergence #137418
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 7 commits
ff3d2a0
77e545a
40b3895
c16062c
078db98
f966381
6b12b33
377dc97
33af652
14b15c9
840f788
b3042a4
99e0488
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -90,6 +90,7 @@ public class DesiredBalanceComputer { | |
| private long numIterationsSinceLastConverged; | ||
| private long lastConvergedTimeMillis; | ||
| private long lastNotConvergedLogMessageTimeMillis; | ||
| private long firstComputeSinceConvergedTimeMillis; | ||
| private Level convergenceLogMsgLevel; | ||
| private ShardRouting lastTrackedUnassignedShard; | ||
|
|
||
|
|
@@ -106,6 +107,7 @@ public DesiredBalanceComputer( | |
| this.numIterationsSinceLastConverged = 0; | ||
| this.lastConvergedTimeMillis = timeProvider.relativeTimeInMillis(); | ||
| this.lastNotConvergedLogMessageTimeMillis = lastConvergedTimeMillis; | ||
| this.firstComputeSinceConvergedTimeMillis = lastConvergedTimeMillis; | ||
| this.convergenceLogMsgLevel = Level.DEBUG; | ||
| clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value); | ||
| clusterSettings.initializeAndWatch( | ||
|
|
@@ -330,7 +332,12 @@ public DesiredBalance compute( | |
| final int iterationCountReportInterval = computeIterationCountReportInterval(routingAllocation); | ||
| final long timeWarningInterval = progressLogInterval.millis(); | ||
| final long computationStartedTime = timeProvider.relativeTimeInMillis(); | ||
| long nextReportTime = Math.max(lastNotConvergedLogMessageTimeMillis, lastConvergedTimeMillis) + timeWarningInterval; | ||
| if (lastConvergedTimeMillis > firstComputeSinceConvergedTimeMillis) { | ||
|
||
| // mark our first effort at compute since a convergence, | ||
| // so that a logging period for non-convergence warning are based from this time | ||
| firstComputeSinceConvergedTimeMillis = computationStartedTime; | ||
| } | ||
| long nextReportTime = Math.max(lastNotConvergedLogMessageTimeMillis, firstComputeSinceConvergedTimeMillis) + timeWarningInterval; | ||
|
|
||
| int i = 0; | ||
| boolean hasChanges = false; | ||
|
|
@@ -391,24 +398,29 @@ public DesiredBalance compute( | |
| convergenceLogMsgLevel, | ||
| () -> Strings.format( | ||
| """ | ||
| Desired balance computation for [%d] converged after [%s] and [%d] iterations, \ | ||
| resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""", | ||
| Desired balance computation for [%d] converged after [%s] and [%d] iterations this round, \ | ||
| resumed computation [%s] ago with [%d] iterations over [%d] rounds since the last convergence \ | ||
| [%s] ago""", | ||
|
||
| desiredBalanceInput.index(), | ||
| TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(), | ||
| TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(), | ||
| iterations, | ||
| TimeValue.timeValueMillis(currentTime - firstComputeSinceConvergedTimeMillis).toString(), | ||
| numIterationsSinceLastConverged, | ||
| numComputeCallsSinceLastConverged, | ||
| iterations, | ||
| TimeValue.timeValueMillis(currentTime - computationStartedTime).toString() | ||
| TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString() | ||
| ) | ||
| ); | ||
| } else { | ||
| logger.log( | ||
| convergenceLogMsgLevel, | ||
| () -> Strings.format( | ||
| "Desired balance computation for [%d] converged after [%s] and [%d] iterations", | ||
| """ | ||
| Desired balance computation for [%d] converged after [%s] and [%d] iterations this round, \ | ||
| last convergence was [%s] ago""", | ||
| desiredBalanceInput.index(), | ||
| TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(), | ||
| numIterationsSinceLastConverged | ||
| TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(), | ||
| iterations, | ||
| TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString() | ||
| ) | ||
| ); | ||
| } | ||
|
|
@@ -452,24 +464,29 @@ public DesiredBalance compute( | |
| logLevel, | ||
| () -> Strings.format( | ||
| """ | ||
| Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations, \ | ||
| resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""", | ||
| Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations this round, \ | ||
| resumed computation [%s] ago with [%d] iterations over [%d] rounds since the last convergence \ | ||
| [%s] ago""", | ||
| desiredBalanceInput.index(), | ||
| TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(), | ||
| TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(), | ||
| iterations, | ||
| TimeValue.timeValueMillis(currentTime - firstComputeSinceConvergedTimeMillis).toString(), | ||
| numIterationsSinceLastConverged, | ||
| numComputeCallsSinceLastConverged, | ||
| iterations, | ||
| TimeValue.timeValueMillis(currentTime - computationStartedTime).toString() | ||
| TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString() | ||
|
||
| ) | ||
| ); | ||
| } else { | ||
| logger.log( | ||
| logLevel, | ||
| () -> Strings.format( | ||
| "Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations", | ||
| """ | ||
| Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations this round, \ | ||
| last convergence was [%s] ago""", | ||
| desiredBalanceInput.index(), | ||
| TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(), | ||
| numIterationsSinceLastConverged | ||
| TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(), | ||
| iterations, | ||
| TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString() | ||
| ) | ||
| ); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -119,6 +119,45 @@ public class DesiredBalanceComputerTests extends ESAllocationTestCase { | |
|
|
||
| static final String TEST_INDEX = "test-index"; | ||
|
|
||
| public void testShouldNotLogLongBalanceComputation() { | ||
|
||
| final var clusterSettings = new ClusterSettings( | ||
| Settings.builder().put(DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(5L)).build(), | ||
| ClusterSettings.BUILT_IN_CLUSTER_SETTINGS | ||
| ); | ||
| final var timeInMillis = new AtomicLong(-1L); | ||
| final var desiredBalance = new AtomicReference<DesiredBalance>(DesiredBalance.BECOME_MASTER_INITIAL); | ||
|
|
||
| final var computer = new DesiredBalanceComputer( | ||
| clusterSettings, | ||
| TimeProviderUtils.create(timeInMillis::incrementAndGet), | ||
| new BalancedShardsAllocator(Settings.EMPTY), | ||
| TEST_ONLY_EXPLAINER | ||
| ); | ||
|
|
||
| final var allocation = new RoutingAllocation( | ||
| randomAllocationDeciders(Settings.EMPTY, clusterSettings), | ||
| createInitialClusterState(1, 1, 0), | ||
| ClusterInfo.EMPTY, | ||
| SnapshotShardSizeInfo.EMPTY, | ||
| 0L | ||
| ); | ||
|
|
||
| computer.compute(desiredBalance.get(), DesiredBalanceInput.create(1, allocation), queue(), ignore -> true); | ||
|
|
||
| timeInMillis.set(60 * 60 * 1000L); | ||
|
|
||
| assertLoggerExpectationsFor(() -> { | ||
| computer.compute(desiredBalance.get(), DesiredBalanceInput.create(2, allocation), queue(), ignore -> true); | ||
| }, | ||
| new MockLog.UnseenEventExpectation( | ||
| "should NOT log long balance computation", | ||
| DesiredBalanceComputer.class.getCanonicalName(), | ||
| Level.INFO, | ||
| "* still not converged after *" | ||
| ) | ||
| ); | ||
| } | ||
|
|
||
| public void testComputeBalance() { | ||
| var desiredBalanceComputer = createDesiredBalanceComputer(); | ||
| var clusterState = createInitialClusterState(3); | ||
|
|
@@ -1348,7 +1387,7 @@ public void testShouldLogComputationIteration() { | |
| "Should not report long computation too early", | ||
| DesiredBalanceComputer.class.getCanonicalName(), | ||
| Level.INFO, | ||
| "Desired balance computation for [*] is still not converged after [*] and [*] iterations" | ||
| "Desired balance computation for [*] is still not converged after [*] and [*] iterations *" | ||
| ) | ||
| ); | ||
|
|
||
|
|
@@ -1359,7 +1398,8 @@ public void testShouldLogComputationIteration() { | |
| "Should report long computation based on iteration count", | ||
| DesiredBalanceComputer.class.getCanonicalName(), | ||
| Level.INFO, | ||
| "Desired balance computation for [*] is still not converged after [10s] and [1000] iterations" | ||
| "Desired balance computation for [*] is still not converged after [10s] and [1000] iterations this round, " | ||
| + "last convergence was [10s] ago" | ||
| ) | ||
| ); | ||
|
|
||
|
|
@@ -1370,7 +1410,8 @@ public void testShouldLogComputationIteration() { | |
| "Should report long computation based on time", | ||
| DesiredBalanceComputer.class.getCanonicalName(), | ||
| Level.INFO, | ||
| "Desired balance computation for [*] is still not converged after [1m] and [59] iterations" | ||
| "Desired balance computation for [*] is still not converged after [59s] and [59] iterations this round, " | ||
| + "last convergence was [1m] ago" | ||
| ) | ||
| ); | ||
| } | ||
|
|
@@ -1468,36 +1509,45 @@ record ExpectedLastConvergenceInfo(int numComputeCalls, int numTotalIterations, | |
|
|
||
| record LogExpectationData( | ||
| boolean isConverged, | ||
| String timeSinceConverged, | ||
| String currentDuration, | ||
| int currentIterations, | ||
| String timeSinceRecompute, | ||
| int totalIterations, | ||
| int totalComputeCalls, | ||
| int currentIterations, | ||
| String currentDuration | ||
| String timeSinceConverged | ||
| ) { | ||
| LogExpectationData(boolean isConverged, String timeSinceConverged, int totalIterations) { | ||
| this(isConverged, timeSinceConverged, totalIterations, 0, 0, ""); | ||
| LogExpectationData(boolean isConverged, String currentDuration, int currentIterations, String timeSinceConverged) { | ||
| this(isConverged, currentDuration, currentIterations, "", 0, 0, timeSinceConverged); | ||
| } | ||
| } | ||
|
|
||
| Function<LogExpectationData, MockLog.SeenEventExpectation> getLogExpectation = data -> { | ||
| final var singleComputeCallMsg = "Desired balance computation for [%d] " | ||
| + (data.isConverged ? "" : "is still not ") | ||
| + "converged after [%s] and [%d] iterations"; | ||
| + "converged after [%s] and [%d] iterations this round, "; | ||
| return new MockLog.SeenEventExpectation( | ||
| "expected a " + (data.isConverged ? "converged" : "not converged") + " log message", | ||
| DesiredBalanceComputer.class.getCanonicalName(), | ||
| Level.INFO, | ||
| (data.totalComputeCalls > 1 | ||
| ? Strings.format( | ||
| singleComputeCallMsg + ", resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago", | ||
| singleComputeCallMsg | ||
| + "resumed computation [%s] ago with [%d] iterations over [%d] rounds since the last convergence [%s] ago", | ||
| indexSequence.get(), | ||
| data.timeSinceConverged, | ||
| data.currentDuration, | ||
| data.currentIterations, | ||
| data.timeSinceRecompute, | ||
| data.totalIterations, | ||
| data.totalComputeCalls, | ||
| data.currentIterations, | ||
| data.currentDuration | ||
| data.timeSinceConverged | ||
| ) | ||
| : Strings.format(singleComputeCallMsg, indexSequence.get(), data.timeSinceConverged, data.totalIterations)) | ||
| : Strings.format( | ||
| singleComputeCallMsg + "last convergence was [%s] ago", | ||
| indexSequence.get(), | ||
| data.currentDuration, | ||
| data.currentIterations, | ||
| data.timeSinceConverged | ||
| )) | ||
| ); | ||
| }; | ||
|
|
||
|
|
@@ -1515,7 +1565,7 @@ record LogExpectationData( | |
| // Converges right away, verify the debug level convergence message. | ||
| assertLoggerExpectationsFor( | ||
| getComputeRunnableForIsFreshPredicate.apply(ignored -> true), | ||
| getLogExpectation.apply(new LogExpectationData(true, "3ms", 2)) | ||
| getLogExpectation.apply(new LogExpectationData(true, "2ms", 2, "3ms")) | ||
| ); | ||
| assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED); | ||
| final var lastConvergenceTimestampMillis = computer.getLastConvergedTimeMillis(); | ||
|
|
@@ -1526,7 +1576,7 @@ record LogExpectationData( | |
| // This INFO is triggered from the interval since last convergence timestamp. | ||
| assertLoggerExpectationsFor( | ||
| getComputeRunnableForIsFreshPredicate.apply(ignored -> iterationCounter.get() < 6), | ||
| getLogExpectation.apply(new LogExpectationData(false, "5ms", 4)) | ||
| getLogExpectation.apply(new LogExpectationData(false, "5ms", 5, "6ms")) | ||
| ); | ||
| assertFinishReason.accept(DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT); | ||
| assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(1, 6, lastConvergenceTimestampMillis)); | ||
|
|
@@ -1535,33 +1585,58 @@ record LogExpectationData( | |
| // The next INFO is triggered from the interval since last INFO message logged, and then another after the interval period. | ||
| assertLoggerExpectationsFor( | ||
| getComputeRunnableForIsFreshPredicate.apply(ignored -> iterationCounter.get() < 8), | ||
| getLogExpectation.apply(new LogExpectationData(false, "10ms", 8, 2, 2, "2ms")), | ||
| getLogExpectation.apply(new LogExpectationData(false, "15ms", 13, 2, 7, "7ms")) | ||
| getLogExpectation.apply(new LogExpectationData(false, "3ms", 3, "10ms", 9, 2, "11ms")) | ||
| ); | ||
| assertFinishReason.accept(DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT); | ||
| assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(2, 14, lastConvergenceTimestampMillis)); | ||
|
|
||
| assertLoggerExpectationsFor( | ||
| getComputeRunnableForIsFreshPredicate.apply(ignored -> true), | ||
| getLogExpectation.apply(new LogExpectationData(false, "20ms", 17, 3, 3, "3ms")), | ||
| getLogExpectation.apply(new LogExpectationData(false, "25ms", 22, 3, 8, "8ms")), | ||
| getLogExpectation.apply(new LogExpectationData(true, "27ms", 24, 3, 10, "10ms")) | ||
| getLogExpectation.apply(new LogExpectationData(false, "1ms", 1, "17ms", 15, 3, "18ms")), | ||
| getLogExpectation.apply(new LogExpectationData(false, "6ms", 6, "22ms", 20, 3, "23ms")), | ||
| getLogExpectation.apply(new LogExpectationData(true, "10ms", 10, "26ms", 24, 3, "27ms")) | ||
|
||
| ); | ||
| assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED); | ||
|
|
||
| // First INFO is triggered from interval since last converged, second is triggered from the inverval since the last INFO log. | ||
| assertLoggerExpectationsFor( | ||
| getComputeRunnableForIsFreshPredicate.apply(ignored -> true), | ||
| getLogExpectation.apply(new LogExpectationData(false, "5ms", 4)), | ||
| getLogExpectation.apply(new LogExpectationData(false, "10ms", 9)), | ||
| getLogExpectation.apply(new LogExpectationData(true, "11ms", 10)) | ||
| getLogExpectation.apply(new LogExpectationData(false, "5ms", 5, "6ms")), | ||
| getLogExpectation.apply(new LogExpectationData(true, "10ms", 10, "11ms")) | ||
| ); | ||
| assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED); | ||
|
|
||
| // Verify the final assignment mappings after converging. | ||
| final var index = clusterState.metadata().getProject(Metadata.DEFAULT_PROJECT_ID).index(TEST_INDEX).getIndex(); | ||
| final var expectedAssignmentsMap = Map.of(new ShardId(index, 0), new ShardAssignment(Set.of("node-0"), 1, 0, 0)); | ||
| assertDesiredAssignments(desiredBalance.get(), expectedAssignmentsMap); | ||
|
|
||
| // Verify that if some time elapses and then another computation starts then we do not count the idle time | ||
| timeInMillis.addAndGet(100L); | ||
| iterationCounter.set(0); | ||
| requiredIterations.set(2); | ||
| assertLoggerExpectationsFor( | ||
| getComputeRunnableForIsFreshPredicate.apply(ignored -> iterationCounter.get() < 2), | ||
| new MockLog.UnseenEventExpectation( | ||
| "no log messages", | ||
| DesiredBalanceComputer.class.getCanonicalName(), | ||
| Level.INFO, | ||
| "* still not converged after *" | ||
|
||
| ) | ||
| ); | ||
|
|
||
| // test a non-convergence and convergence message with a significant time delta, | ||
| // to check the time-since-convergence and time-since-last-compute order | ||
| iterationCounter.set(0); | ||
| timeInMillis.addAndGet(100L); | ||
| getComputeRunnableForIsFreshPredicate.apply(ignored -> false).run(); | ||
| timeInMillis.addAndGet(100L); | ||
|
|
||
| assertLoggerExpectationsFor( | ||
| getComputeRunnableForIsFreshPredicate.apply(ignored -> true), | ||
| getLogExpectation.apply(new LogExpectationData(false, "1ms", 1, "103ms", 2, 2, "204ms")), | ||
| getLogExpectation.apply(new LogExpectationData(true, "2ms", 2, "104ms", 3, 2, "205ms")) | ||
| ); | ||
| } | ||
|
|
||
| @TestLogging( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming nit: could we include the word "started" in this name somehow? We're tracking the start of the first computation since we converged.