@@ -648,7 +648,8 @@ void shouldRecoverEvenIfManagementIsClosed(TestInfo info) {
648
648
try (Connection connection = connection ()) {
649
649
Management management = connection .management ();
650
650
Management .QueueInfo queueInfo = management .queue ().exclusive (true ).declare ();
651
- Publisher publisher = connection .publisherBuilder ().queue (queueInfo .name ()).build ();
651
+ String q = queueInfo .name ();
652
+ Publisher publisher = connection .publisherBuilder ().queue (q ).build ();
652
653
Sync publishSync = TestUtils .sync ();
653
654
Publisher .Callback callback =
654
655
ctx -> {
@@ -661,10 +662,13 @@ void shouldRecoverEvenIfManagementIsClosed(TestInfo info) {
661
662
};
662
663
publisher .publish (publisher .message (), callback );
663
664
assertThat (publishSync ).completes ();
665
+
666
+ waitAtMost (() -> management .queueInfo (q ).messageCount () == 1 );
667
+
664
668
Sync consumeSync = TestUtils .sync ();
665
669
connection
666
670
.consumerBuilder ()
667
- .queue (queueInfo . name () )
671
+ .queue (q )
668
672
.messageHandler (
669
673
(ctx , message ) -> {
670
674
consumeSync .down ();
@@ -673,7 +677,7 @@ void shouldRecoverEvenIfManagementIsClosed(TestInfo info) {
673
677
.build ();
674
678
675
679
assertThat (consumeSync ).completes ();
676
- waitAtMost (() -> management .queueInfo (queueInfo . name () ).messageCount () == 0 );
680
+ waitAtMost (() -> management .queueInfo (q ).messageCount () == 0 );
677
681
management .close ();
678
682
679
683
publishSync .reset ();
@@ -682,9 +686,9 @@ void shouldRecoverEvenIfManagementIsClosed(TestInfo info) {
682
686
closeConnectionAndWaitForRecovery ();
683
687
publisher .publish (publisher .message (), callback );
684
688
assertThat (consumeSync ).completes ();
685
- assertThatThrownBy (() -> management .queueInfo (queueInfo . name () ))
689
+ assertThatThrownBy (() -> management .queueInfo (q ))
686
690
.isInstanceOf (AmqpResourceClosedException .class );
687
- assertThat (connection .management ().queueInfo (queueInfo . name () )).isEmpty ();
691
+ assertThat (connection .management ().queueInfo (q )).isEmpty ();
688
692
}
689
693
}
690
694
0 commit comments