Skip to content

Commit 2c35e5c

Browse files
authored
[Test] Ensure assertion run only once (elastic#131174) (elastic#131256)
The assertion runs inside the cluster state listener may run a second time if the listener is not removed in time. It is possible that the assertion no longer holds on the 2nd time due to cluster state changes. There is no need to assert it more than once. This PR ensures assertion runs only once by removing the listener right after it. Resolves: elastic#130274
1 parent e6f3188 commit 2c35e5c

File tree

1 file changed

+28
-26
lines changed

1 file changed

+28
-26
lines changed

x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.action.bulk.BulkRequestBuilder;
2222
import org.elasticsearch.action.get.GetResponse;
2323
import org.elasticsearch.action.index.IndexRequest;
24+
import org.elasticsearch.action.support.ActionTestUtils;
2425
import org.elasticsearch.action.support.ActiveShardCount;
2526
import org.elasticsearch.action.support.IndicesOptions;
2627
import org.elasticsearch.action.support.PlainActionFuture;
@@ -51,6 +52,7 @@
5152
import org.elasticsearch.snapshots.SnapshotId;
5253
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
5354
import org.elasticsearch.snapshots.SnapshotsInfoService;
55+
import org.elasticsearch.test.ClusterServiceUtils;
5456
import org.elasticsearch.test.transport.MockTransportService;
5557
import org.elasticsearch.transport.TransportActionProxy;
5658
import org.elasticsearch.transport.TransportService;
@@ -655,39 +657,39 @@ public void testCcrRepositoryFailsToFetchSnapshotShardSizes() throws Exception {
655657
try {
656658
final SnapshotsInfoService snapshotsInfoService = getFollowerCluster().getCurrentMasterNodeInstance(SnapshotsInfoService.class);
657659

660+
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
658661
final PlainActionFuture<Void> waitForAllShardSnapshotSizesFailures = new PlainActionFuture<>();
659-
final ClusterStateListener listener = event -> {
660-
if (RestoreInProgress.get(event.state()).isEmpty() == false && event.state().routingTable().hasIndex(followerIndex)) {
661-
try {
662-
final IndexRoutingTable indexRoutingTable = event.state().routingTable().index(followerIndex);
663-
// this assertBusy completes because the listener is added after the InternalSnapshotsInfoService
664-
// and ClusterService preserves the order of listeners.
665-
assertBusy(() -> {
666-
List<Long> sizes = indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED)
667-
.stream()
668-
.filter(shard -> shard.unassignedInfo().lastAllocationStatus() == AllocationStatus.FETCHING_SHARD_DATA)
669-
.sorted(Comparator.comparingInt(ShardRouting::getId))
670-
.map(shard -> snapshotsInfoService.snapshotShardSizes().getShardSize(shard))
671-
.filter(Objects::nonNull)
672-
.filter(size -> ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE == size)
673-
.collect(Collectors.toList());
674-
assertThat(sizes, hasSize(numberOfShards));
675-
});
676-
waitForAllShardSnapshotSizesFailures.onResponse(null);
677-
} catch (Exception e) {
678-
throw new AssertionError("Failed to retrieve all snapshot shard sizes", e);
679-
}
662+
ClusterServiceUtils.addTemporaryStateListener(
663+
clusterService,
664+
state -> RestoreInProgress.get(state).isEmpty() == false && state.routingTable().hasIndex(followerIndex)
665+
).addListener(ActionTestUtils.assertNoFailureListener(ignore -> {
666+
try {
667+
// This listener runs synchronously in the same thread so that clusterService.state() returns the same state
668+
// that satisfied the predicate.
669+
final IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(followerIndex);
670+
// this assertBusy completes because the listener is added after the InternalSnapshotsInfoService
671+
// and ClusterService preserves the order of listeners.
672+
assertBusy(() -> {
673+
List<Long> sizes = indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED)
674+
.stream()
675+
.filter(shard -> shard.unassignedInfo().lastAllocationStatus() == AllocationStatus.FETCHING_SHARD_DATA)
676+
.sorted(Comparator.comparingInt(ShardRouting::getId))
677+
.map(shard -> snapshotsInfoService.snapshotShardSizes().getShardSize(shard))
678+
.filter(Objects::nonNull)
679+
.filter(size -> ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE == size)
680+
.collect(Collectors.toList());
681+
assertThat(sizes, hasSize(numberOfShards));
682+
});
683+
waitForAllShardSnapshotSizesFailures.onResponse(null);
684+
} catch (Exception e) {
685+
throw new AssertionError("Failed to retrieve all snapshot shard sizes", e);
680686
}
681-
};
682-
683-
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
684-
clusterService.addListener(listener);
687+
}));
685688

686689
logger.debug("--> creating follower index [{}]", followerIndex);
687690
followerClient().execute(PutFollowAction.INSTANCE, putFollow(leaderIndex, followerIndex, ActiveShardCount.NONE));
688691

689692
waitForAllShardSnapshotSizesFailures.get(30L, TimeUnit.SECONDS);
690-
clusterService.removeListener(listener);
691693

692694
assertThat(simulatedFailures.get(), equalTo(numberOfShards));
693695

0 commit comments

Comments
 (0)