Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public class DesiredBalanceComputer {
private long numIterationsSinceLastConverged;
private long lastConvergedTimeMillis;
private long lastNotConvergedLogMessageTimeMillis;
private long firstComputeSinceConvergedTimeMillis;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nit: could we include the word "started" in this name somehow? We're tracking the start of the first computation since we converged.

private Level convergenceLogMsgLevel;
private ShardRouting lastTrackedUnassignedShard;

Expand All @@ -106,6 +107,7 @@ public DesiredBalanceComputer(
this.numIterationsSinceLastConverged = 0;
this.lastConvergedTimeMillis = timeProvider.relativeTimeInMillis();
this.lastNotConvergedLogMessageTimeMillis = lastConvergedTimeMillis;
this.firstComputeSinceConvergedTimeMillis = lastConvergedTimeMillis;
this.convergenceLogMsgLevel = Level.DEBUG;
clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value);
clusterSettings.initializeAndWatch(
Expand Down Expand Up @@ -330,7 +332,12 @@ public DesiredBalance compute(
final int iterationCountReportInterval = computeIterationCountReportInterval(routingAllocation);
final long timeWarningInterval = progressLogInterval.millis();
final long computationStartedTime = timeProvider.relativeTimeInMillis();
long nextReportTime = Math.max(lastNotConvergedLogMessageTimeMillis, lastConvergedTimeMillis) + timeWarningInterval;
if (lastConvergedTimeMillis > firstComputeSinceConvergedTimeMillis) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the previous computation converged before the clock advanced (e.g. it took <1ms) then we would have lastConvergedTimeMillis == firstComputeSinceConvergedTimeMillis and hence wouldn't update firstComputeSinceConvergedTimeMillis here, so we'd still be counting the idle time in between that computation and the present one. Really we need to know if the previous computation converged or not regardless of how long it took.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this is a great catch -- thanks for finding this.

// mark our first effort at compute since a convergence,
// so that a logging period for non-convergence warning are based from this time
firstComputeSinceConvergedTimeMillis = computationStartedTime;
}
long nextReportTime = Math.max(lastNotConvergedLogMessageTimeMillis, firstComputeSinceConvergedTimeMillis) + timeWarningInterval;

int i = 0;
boolean hasChanges = false;
Expand Down Expand Up @@ -391,24 +398,29 @@ public DesiredBalance compute(
convergenceLogMsgLevel,
() -> Strings.format(
"""
Desired balance computation for [%d] converged after [%s] and [%d] iterations, \
resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""",
Desired balance computation for [%d] converged after [%s] and [%d] iterations this round, \
resumed computation [%s] ago with [%d] iterations over [%d] rounds since the last convergence \
[%s] ago""",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these rewordings are basically a good idea but would much rather we split them out into a separate PR and keep this one focussed on fixing the bug that counts the idle time. It's just a bit much to keep track of which test changes relate to the cosmetics and which ones are genuine behaviour changes.

desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
iterations,
TimeValue.timeValueMillis(currentTime - firstComputeSinceConvergedTimeMillis).toString(),
numIterationsSinceLastConverged,
numComputeCallsSinceLastConverged,
iterations,
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString()
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString()
)
);
} else {
logger.log(
convergenceLogMsgLevel,
() -> Strings.format(
"Desired balance computation for [%d] converged after [%s] and [%d] iterations",
"""
Desired balance computation for [%d] converged after [%s] and [%d] iterations this round, \
last convergence was [%s] ago""",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
numIterationsSinceLastConverged
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
iterations,
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString()
)
);
}
Expand Down Expand Up @@ -452,24 +464,29 @@ public DesiredBalance compute(
logLevel,
() -> Strings.format(
"""
Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations, \
resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago""",
Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations this round, \
resumed computation [%s] ago with [%d] iterations over [%d] rounds since the last convergence \
[%s] ago""",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
iterations,
TimeValue.timeValueMillis(currentTime - firstComputeSinceConvergedTimeMillis).toString(),
numIterationsSinceLastConverged,
numComputeCallsSinceLastConverged,
iterations,
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString()
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the message to report the time since the compute call began up front, and clarifies the time since last convergence.

There is now a mix of different markers, and I'm not sure the iterations are now represented quite right either: the "converged after [<duration this compute round>] and [<iterations since convergence>]" are mismatched.

How about this?

  • "still not converged after [%s] and [%d] iterations": the time and iteration count in this compute run. And maybe something about "this round"?
  • "resumed computation [%d] times with [%d] iterations since [%s]": the number of compute calls, iterations, and time since our first compute effort since last convergence (this message differs from the current template)
  • "since the last convergence [%s] ago": the time since last convergence

Copy link
Contributor Author

@schase-es schase-es Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The debug log message above is one I was curious about -- this is "Desired balance computation for [{}] interrupted after [{}] and [{}] iterations as newer cluster state received. Publishing intermediate desired balance and restarting computation." on line 419/429.

I am wondering how frequent this message is, and if it should be logged at info if enough time has passed since compute restarted.

)
);
} else {
logger.log(
logLevel,
() -> Strings.format(
"Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations",
"""
Desired balance computation for [%d] is still not converged after [%s] and [%d] iterations this round, \
last convergence was [%s] ago""",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString(),
numIterationsSinceLastConverged
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
iterations,
TimeValue.timeValueMillis(currentTime - lastConvergedTimeMillis).toString()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,45 @@ public class DesiredBalanceComputerTests extends ESAllocationTestCase {

static final String TEST_INDEX = "test-index";

public void testShouldNotLogLongBalanceComputation() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: test suites are easiest to read if they start from the simpler test cases and work down towards the more complex ones. There might be some other structure to the tests too, but in practice new test cases should often be put near the end. This one in particular is much less basic than testComputeBalance and testStopsComputingWhenStale and so on so I'd prefer it went lower down (probably somewhere near testLoggingOfComputeCallsAndIterationsSinceConvergence since they're both about logging)

final var clusterSettings = new ClusterSettings(
Settings.builder().put(DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(5L)).build(),
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS
);
final var timeInMillis = new AtomicLong(-1L);
final var desiredBalance = new AtomicReference<DesiredBalance>(DesiredBalance.BECOME_MASTER_INITIAL);

final var computer = new DesiredBalanceComputer(
clusterSettings,
TimeProviderUtils.create(timeInMillis::incrementAndGet),
new BalancedShardsAllocator(Settings.EMPTY),
TEST_ONLY_EXPLAINER
);

final var allocation = new RoutingAllocation(
randomAllocationDeciders(Settings.EMPTY, clusterSettings),
createInitialClusterState(1, 1, 0),
ClusterInfo.EMPTY,
SnapshotShardSizeInfo.EMPTY,
0L
);

computer.compute(desiredBalance.get(), DesiredBalanceInput.create(1, allocation), queue(), ignore -> true);

timeInMillis.set(60 * 60 * 1000L);

assertLoggerExpectationsFor(() -> {
computer.compute(desiredBalance.get(), DesiredBalanceInput.create(2, allocation), queue(), ignore -> true);
},
new MockLog.UnseenEventExpectation(
"should NOT log long balance computation",
DesiredBalanceComputer.class.getCanonicalName(),
Level.INFO,
"* still not converged after *"
)
);
}

public void testComputeBalance() {
var desiredBalanceComputer = createDesiredBalanceComputer();
var clusterState = createInitialClusterState(3);
Expand Down Expand Up @@ -1348,7 +1387,7 @@ public void testShouldLogComputationIteration() {
"Should not report long computation too early",
DesiredBalanceComputer.class.getCanonicalName(),
Level.INFO,
"Desired balance computation for [*] is still not converged after [*] and [*] iterations"
"Desired balance computation for [*] is still not converged after [*] and [*] iterations *"
)
);

Expand All @@ -1359,7 +1398,8 @@ public void testShouldLogComputationIteration() {
"Should report long computation based on iteration count",
DesiredBalanceComputer.class.getCanonicalName(),
Level.INFO,
"Desired balance computation for [*] is still not converged after [10s] and [1000] iterations"
"Desired balance computation for [*] is still not converged after [10s] and [1000] iterations this round, "
+ "last convergence was [10s] ago"
)
);

Expand All @@ -1370,7 +1410,8 @@ public void testShouldLogComputationIteration() {
"Should report long computation based on time",
DesiredBalanceComputer.class.getCanonicalName(),
Level.INFO,
"Desired balance computation for [*] is still not converged after [1m] and [59] iterations"
"Desired balance computation for [*] is still not converged after [59s] and [59] iterations this round, "
+ "last convergence was [1m] ago"
)
);
}
Expand Down Expand Up @@ -1468,36 +1509,45 @@ record ExpectedLastConvergenceInfo(int numComputeCalls, int numTotalIterations,

record LogExpectationData(
boolean isConverged,
String timeSinceConverged,
String currentDuration,
int currentIterations,
String timeSinceRecompute,
int totalIterations,
int totalComputeCalls,
int currentIterations,
String currentDuration
String timeSinceConverged
) {
LogExpectationData(boolean isConverged, String timeSinceConverged, int totalIterations) {
this(isConverged, timeSinceConverged, totalIterations, 0, 0, "");
LogExpectationData(boolean isConverged, String currentDuration, int currentIterations, String timeSinceConverged) {
this(isConverged, currentDuration, currentIterations, "", 0, 0, timeSinceConverged);
}
}

Function<LogExpectationData, MockLog.SeenEventExpectation> getLogExpectation = data -> {
final var singleComputeCallMsg = "Desired balance computation for [%d] "
+ (data.isConverged ? "" : "is still not ")
+ "converged after [%s] and [%d] iterations";
+ "converged after [%s] and [%d] iterations this round, ";
return new MockLog.SeenEventExpectation(
"expected a " + (data.isConverged ? "converged" : "not converged") + " log message",
DesiredBalanceComputer.class.getCanonicalName(),
Level.INFO,
(data.totalComputeCalls > 1
? Strings.format(
singleComputeCallMsg + ", resumed computation [%d] times with [%d] iterations since the last resumption [%s] ago",
singleComputeCallMsg
+ "resumed computation [%s] ago with [%d] iterations over [%d] rounds since the last convergence [%s] ago",
indexSequence.get(),
data.timeSinceConverged,
data.currentDuration,
data.currentIterations,
data.timeSinceRecompute,
data.totalIterations,
data.totalComputeCalls,
data.currentIterations,
data.currentDuration
data.timeSinceConverged
)
: Strings.format(singleComputeCallMsg, indexSequence.get(), data.timeSinceConverged, data.totalIterations))
: Strings.format(
singleComputeCallMsg + "last convergence was [%s] ago",
indexSequence.get(),
data.currentDuration,
data.currentIterations,
data.timeSinceConverged
))
);
};

Expand All @@ -1515,7 +1565,7 @@ record LogExpectationData(
// Converges right away, verify the debug level convergence message.
assertLoggerExpectationsFor(
getComputeRunnableForIsFreshPredicate.apply(ignored -> true),
getLogExpectation.apply(new LogExpectationData(true, "3ms", 2))
getLogExpectation.apply(new LogExpectationData(true, "2ms", 2, "3ms"))
);
assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED);
final var lastConvergenceTimestampMillis = computer.getLastConvergedTimeMillis();
Expand All @@ -1526,7 +1576,7 @@ record LogExpectationData(
// This INFO is triggered from the interval since last convergence timestamp.
assertLoggerExpectationsFor(
getComputeRunnableForIsFreshPredicate.apply(ignored -> iterationCounter.get() < 6),
getLogExpectation.apply(new LogExpectationData(false, "5ms", 4))
getLogExpectation.apply(new LogExpectationData(false, "5ms", 5, "6ms"))
);
assertFinishReason.accept(DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT);
assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(1, 6, lastConvergenceTimestampMillis));
Expand All @@ -1535,33 +1585,58 @@ record LogExpectationData(
// The next INFO is triggered from the interval since last INFO message logged, and then another after the interval period.
assertLoggerExpectationsFor(
getComputeRunnableForIsFreshPredicate.apply(ignored -> iterationCounter.get() < 8),
getLogExpectation.apply(new LogExpectationData(false, "10ms", 8, 2, 2, "2ms")),
getLogExpectation.apply(new LogExpectationData(false, "15ms", 13, 2, 7, "7ms"))
getLogExpectation.apply(new LogExpectationData(false, "3ms", 3, "10ms", 9, 2, "11ms"))
);
assertFinishReason.accept(DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT);
assertLastConvergenceInfo.accept(new ExpectedLastConvergenceInfo(2, 14, lastConvergenceTimestampMillis));

assertLoggerExpectationsFor(
getComputeRunnableForIsFreshPredicate.apply(ignored -> true),
getLogExpectation.apply(new LogExpectationData(false, "20ms", 17, 3, 3, "3ms")),
getLogExpectation.apply(new LogExpectationData(false, "25ms", 22, 3, 8, "8ms")),
getLogExpectation.apply(new LogExpectationData(true, "27ms", 24, 3, 10, "10ms"))
getLogExpectation.apply(new LogExpectationData(false, "1ms", 1, "17ms", 15, 3, "18ms")),
getLogExpectation.apply(new LogExpectationData(false, "6ms", 6, "22ms", 20, 3, "23ms")),
getLogExpectation.apply(new LogExpectationData(true, "10ms", 10, "26ms", 24, 3, "27ms"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to go through these test changes in detail once we've removed the message rewordings from this PR.

);
assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED);

// First INFO is triggered from interval since last converged, second is triggered from the inverval since the last INFO log.
assertLoggerExpectationsFor(
getComputeRunnableForIsFreshPredicate.apply(ignored -> true),
getLogExpectation.apply(new LogExpectationData(false, "5ms", 4)),
getLogExpectation.apply(new LogExpectationData(false, "10ms", 9)),
getLogExpectation.apply(new LogExpectationData(true, "11ms", 10))
getLogExpectation.apply(new LogExpectationData(false, "5ms", 5, "6ms")),
getLogExpectation.apply(new LogExpectationData(true, "10ms", 10, "11ms"))
);
assertFinishReason.accept(DesiredBalance.ComputationFinishReason.CONVERGED);

// Verify the final assignment mappings after converging.
final var index = clusterState.metadata().getProject(Metadata.DEFAULT_PROJECT_ID).index(TEST_INDEX).getIndex();
final var expectedAssignmentsMap = Map.of(new ShardId(index, 0), new ShardAssignment(Set.of("node-0"), 1, 0, 0));
assertDesiredAssignments(desiredBalance.get(), expectedAssignmentsMap);

// Verify that if some time elapses and then another computation starts then we do not count the idle time
timeInMillis.addAndGet(100L);
iterationCounter.set(0);
requiredIterations.set(2);
assertLoggerExpectationsFor(
getComputeRunnableForIsFreshPredicate.apply(ignored -> iterationCounter.get() < 2),
new MockLog.UnseenEventExpectation(
"no log messages",
DesiredBalanceComputer.class.getCanonicalName(),
Level.INFO,
"* still not converged after *"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 a pattern is required here because we do emit a log message, otherwise I'd suggest *. But "no log messages" is misleading, we do expect one log message, just not a still not converged one. Can we assert that we do see the one we expect to see here?

)
);

// test a non-convergence and convergence message with a significant time delta,
// to check the time-since-convergence and time-since-last-compute order
iterationCounter.set(0);
timeInMillis.addAndGet(100L);
getComputeRunnableForIsFreshPredicate.apply(ignored -> false).run();
timeInMillis.addAndGet(100L);

assertLoggerExpectationsFor(
getComputeRunnableForIsFreshPredicate.apply(ignored -> true),
getLogExpectation.apply(new LogExpectationData(false, "1ms", 1, "103ms", 2, 2, "204ms")),
getLogExpectation.apply(new LogExpectationData(true, "2ms", 2, "104ms", 3, 2, "205ms"))
);
}

@TestLogging(
Expand Down