Skip to content

Commit 06bec76

Browse files
committed
Use latest state
1 parent cc461af commit 06bec76

File tree

4 files changed

+16
-65
lines changed

4 files changed

+16
-65
lines changed

muted-tests.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,6 @@ tests:
178178
- class: org.elasticsearch.xpack.security.FileSettingsRoleMappingsRestartIT
179179
method: testFileSettingsReprocessedOnRestartWithoutVersionChange
180180
issue: https://github.com/elastic/elasticsearch/issues/120964
181-
- class: org.elasticsearch.xpack.ml.integration.PyTorchModelIT
182-
issue: https://github.com/elastic/elasticsearch/issues/121165
183181
- class: org.elasticsearch.xpack.security.authc.ldap.ActiveDirectorySessionFactoryTests
184182
issue: https://github.com/elastic/elasticsearch/issues/121285
185183
- class: org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java

Lines changed: 14 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.ElasticsearchStatusException;
1313
import org.elasticsearch.ResourceNotFoundException;
14-
import org.elasticsearch.TransportVersion;
15-
import org.elasticsearch.TransportVersions;
1614
import org.elasticsearch.action.ActionListener;
1715
import org.elasticsearch.action.ActionRequestValidationException;
1816
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -80,9 +78,6 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
8078

8179
private static final Logger logger = LogManager.getLogger(TrainedModelAssignmentClusterService.class);
8280

83-
private static final TransportVersion RENAME_ALLOCATION_TO_ASSIGNMENT_TRANSPORT_VERSION = TransportVersions.V_8_3_0;
84-
public static final TransportVersion DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION = TransportVersions.V_8_4_0;
85-
8681
private final ClusterService clusterService;
8782
private final ThreadPool threadPool;
8883
private final NodeLoadDetector nodeLoadDetector;
@@ -170,14 +165,6 @@ public void clusterChanged(ClusterChangedEvent event) {
170165
return;
171166
}
172167

173-
if (eventStateMinTransportVersionIsBeforeDistributedModelAllocationTransportVersion(event)) {
174-
// we should not try to rebalance assignments while there may be nodes running on a version
175-
// prior to introducing distributed model allocation.
176-
// But we should remove routing to removed or shutting down nodes.
177-
removeRoutingToRemovedOrShuttingDownNodes(event);
178-
return;
179-
}
180-
181168
if (event.nodesAdded()) {
182169
logMlNodeHeterogeneity();
183170
}
@@ -204,10 +191,6 @@ public void clusterChanged(ClusterChangedEvent event) {
204191
}
205192
}
206193

207-
boolean eventStateMinTransportVersionIsBeforeDistributedModelAllocationTransportVersion(ClusterChangedEvent event) {
208-
return event.state().getMinTransportVersion().before(DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION);
209-
}
210-
211194
boolean eventStateHasGlobalBlockStateNotRecoveredBlock(ClusterChangedEvent event) {
212195
return event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
213196
}
@@ -401,18 +384,6 @@ public void createNewModelAssignment(
401384
CreateTrainedModelAssignmentAction.Request request,
402385
ActionListener<TrainedModelAssignment> listener
403386
) {
404-
if (clusterService.state().getMinTransportVersion().before(DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION)) {
405-
listener.onFailure(
406-
new ElasticsearchStatusException(
407-
"cannot create new assignment [{}] for model [{}] while cluster upgrade is in progress",
408-
RestStatus.CONFLICT,
409-
request.getTaskParams().getDeploymentId(),
410-
request.getTaskParams().getModelId()
411-
)
412-
);
413-
return;
414-
}
415-
416387
if (MlMetadata.getMlMetadata(clusterService.state()).isResetMode()) {
417388
listener.onFailure(
418389
new ElasticsearchStatusException(
@@ -524,12 +495,8 @@ private static ClusterState update(ClusterState currentState, TrainedModelAssign
524495
private static ClusterState forceUpdate(ClusterState currentState, TrainedModelAssignmentMetadata.Builder modelAssignments) {
525496
logger.debug(() -> format("updated assignments: %s", modelAssignments.build()));
526497
Metadata.Builder metadata = Metadata.builder(currentState.metadata());
527-
if (currentState.getMinTransportVersion().onOrAfter(RENAME_ALLOCATION_TO_ASSIGNMENT_TRANSPORT_VERSION)) {
528-
metadata.putCustom(TrainedModelAssignmentMetadata.NAME, modelAssignments.build())
529-
.removeProjectCustom(TrainedModelAssignmentMetadata.DEPRECATED_NAME);
530-
} else {
531-
metadata.putCustom(TrainedModelAssignmentMetadata.DEPRECATED_NAME, modelAssignments.buildOld());
532-
}
498+
metadata.putCustom(TrainedModelAssignmentMetadata.NAME, modelAssignments.build())
499+
.removeProjectCustom(TrainedModelAssignmentMetadata.DEPRECATED_NAME);
533500
return ClusterState.builder(currentState).metadata(metadata).build();
534501
}
535502

@@ -847,7 +814,7 @@ private void updateDeployment(
847814
}
848815
boolean hasUpdates = hasUpdates(numberOfAllocations, adaptiveAllocationsSettingsUpdates, existingAssignment);
849816
if (hasUpdates == false) {
850-
logger.info("no updates");
817+
logger.debug("no updates to be made for deployment [{}]", deploymentId);
851818
listener.onResponse(existingAssignment);
852819
return;
853820
}
@@ -861,27 +828,17 @@ private void updateDeployment(
861828
);
862829
return;
863830
}
864-
if (clusterState.getMinTransportVersion().before(DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION)) {
865-
listener.onFailure(
866-
new ElasticsearchStatusException(
867-
"cannot update deployment with model id [{}] while cluster upgrade is in progress.",
868-
RestStatus.CONFLICT,
869-
deploymentId
870-
)
871-
);
872-
return;
873-
}
874831

875-
ActionListener<ClusterState> updatedStateListener = ActionListener.wrap(
876-
updatedState -> submitUnbatchedTask("update model deployment", new ClusterStateUpdateTask() {
832+
ActionListener<TrainedModelAssignmentMetadata.Builder> updatedAssignmentListener = ActionListener.wrap(
833+
updatedAssignment -> submitUnbatchedTask("update model deployment", new ClusterStateUpdateTask() {
877834

878835
private volatile boolean isUpdated;
879836

880837
@Override
881838
public ClusterState execute(ClusterState currentState) {
882839
if (areClusterStatesCompatibleForRebalance(clusterState, currentState)) {
883840
isUpdated = true;
884-
return updatedState;
841+
return update(currentState, updatedAssignment);
885842
}
886843
logger.debug(() -> format("[%s] Retrying update as cluster state has been modified", deploymentId));
887844
updateDeployment(currentState, deploymentId, numberOfAllocations, adaptiveAllocationsSettings, isInternal, listener);
@@ -913,7 +870,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
913870
listener::onFailure
914871
);
915872

916-
updateAssignment(clusterState, existingAssignment, numberOfAllocations, adaptiveAllocationsSettings, updatedStateListener);
873+
updateAssignment(clusterState, existingAssignment, numberOfAllocations, adaptiveAllocationsSettings, updatedAssignmentListener);
917874
}
918875

919876
static boolean hasUpdates(
@@ -947,7 +904,7 @@ private void updateAssignment(
947904
TrainedModelAssignment assignment,
948905
Integer numberOfAllocations,
949906
AdaptiveAllocationsSettings adaptiveAllocationsSettings,
950-
ActionListener<ClusterState> listener
907+
ActionListener<TrainedModelAssignmentMetadata.Builder> listener
951908
) {
952909
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
953910
if (numberOfAllocations == null || numberOfAllocations == assignment.getTaskParams().getNumberOfAllocations()) {
@@ -964,21 +921,21 @@ private void updateAndKeepNumberOfAllocations(
964921
ClusterState clusterState,
965922
TrainedModelAssignment assignment,
966923
AdaptiveAllocationsSettings adaptiveAllocationsSettings,
967-
ActionListener<ClusterState> listener
924+
ActionListener<TrainedModelAssignmentMetadata.Builder> listener
968925
) {
969926
TrainedModelAssignment.Builder updatedAssignment = TrainedModelAssignment.Builder.fromAssignment(assignment)
970927
.setAdaptiveAllocationsSettings(adaptiveAllocationsSettings);
971928
TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.builder(clusterState);
972929
builder.updateAssignment(assignment.getDeploymentId(), updatedAssignment);
973-
listener.onResponse(update(clusterState, builder));
930+
listener.onResponse(builder);
974931
}
975932

976933
private void increaseNumberOfAllocations(
977934
ClusterState clusterState,
978935
TrainedModelAssignment assignment,
979936
int numberOfAllocations,
980937
AdaptiveAllocationsSettings adaptiveAllocationsSettings,
981-
ActionListener<ClusterState> listener
938+
ActionListener<TrainedModelAssignmentMetadata.Builder> listener
982939
) {
983940
try {
984941
TrainedModelAssignment.Builder updatedAssignment = TrainedModelAssignment.Builder.fromAssignment(assignment)
@@ -998,7 +955,7 @@ private void increaseNumberOfAllocations(
998955
)
999956
);
1000957
} else {
1001-
listener.onResponse(update(clusterState, rebalancedMetadata));
958+
listener.onResponse(rebalancedMetadata);
1002959
}
1003960
} catch (Exception e) {
1004961
listener.onFailure(e);
@@ -1010,7 +967,7 @@ private void decreaseNumberOfAllocations(
1010967
TrainedModelAssignment assignment,
1011968
int numberOfAllocations,
1012969
AdaptiveAllocationsSettings adaptiveAllocationsSettings,
1013-
ActionListener<ClusterState> listener
970+
ActionListener<TrainedModelAssignmentMetadata.Builder> listener
1014971
) {
1015972
TrainedModelAssignment.Builder updatedAssignment = numberOfAllocations < assignment.totalTargetAllocations()
1016973
? new AllocationReducer(assignment, nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(clusterState)).reduceTo(
@@ -1025,7 +982,7 @@ private void decreaseNumberOfAllocations(
1025982
}
1026983
TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.builder(clusterState);
1027984
builder.updateAssignment(assignment.getDeploymentId(), updatedAssignment);
1028-
listener.onResponse(update(clusterState, builder));
985+
listener.onResponse(builder);
1029986
}
1030987

1031988
static ClusterState setToStopping(ClusterState clusterState, String deploymentId, String reason) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentNodeService.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -375,20 +375,17 @@ public void clusterChanged(ClusterChangedEvent event) {
375375
final boolean isResetMode = MlMetadata.getMlMetadata(event.state()).isResetMode();
376376
TrainedModelAssignmentMetadata modelAssignmentMetadata = TrainedModelAssignmentMetadata.fromState(event.state());
377377
final String currentNode = event.state().nodes().getLocalNodeId();
378-
final boolean isNewAllocationSupported = event.state()
379-
.getMinTransportVersion()
380-
.onOrAfter(TrainedModelAssignmentClusterService.DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION);
381378
final Set<String> shuttingDownNodes = Collections.unmodifiableSet(event.state().metadata().nodeShutdowns().getAllNodeIds());
382379

383-
if (isResetMode == false && isNewAllocationSupported) {
380+
if (isResetMode == false) {
384381
updateNumberOfAllocations(modelAssignmentMetadata);
385382
}
386383

387384
for (TrainedModelAssignment trainedModelAssignment : modelAssignmentMetadata.allAssignments().values()) {
388385
RoutingInfo routingInfo = trainedModelAssignment.getNodeRoutingTable().get(currentNode);
389386
if (routingInfo != null) {
390387
// Add new models to start loading if the assignment is not stopping
391-
if (isNewAllocationSupported && trainedModelAssignment.getAssignmentState() != AssignmentState.STOPPING) {
388+
if (trainedModelAssignment.getAssignmentState() != AssignmentState.STOPPING) {
392389
if (shouldAssignmentBeRestarted(routingInfo, trainedModelAssignment.getDeploymentId())) {
393390
prepareAssignmentForRestart(trainedModelAssignment);
394391
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterServiceTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,6 @@ public void testClusterChanged_GivenNodesAdded_ThenLogMlNodeHeterogeneityCalled(
208208
TrainedModelAssignmentClusterService serviceSpy = spy(createClusterService(randomInt(5)));
209209
doNothing().when(serviceSpy).logMlNodeHeterogeneity();
210210
doReturn(false).when(serviceSpy).eventStateHasGlobalBlockStateNotRecoveredBlock(any());
211-
doReturn(false).when(serviceSpy).eventStateMinTransportVersionIsBeforeDistributedModelAllocationTransportVersion(any());
212211

213212
ClusterChangedEvent mockNodesAddedEvent = mock(ClusterChangedEvent.class);
214213
ClusterState mockState = mock(ClusterState.class);

0 commit comments

Comments
 (0)