19
19
20
20
import static com .rabbitmq .client .amqp .Management .ExchangeType .DIRECT ;
21
21
import static com .rabbitmq .client .amqp .Management .ExchangeType .FANOUT ;
22
+ import static com .rabbitmq .client .amqp .Publisher .Status .ACCEPTED ;
22
23
import static com .rabbitmq .client .amqp .impl .Assertions .assertThat ;
23
24
import static com .rabbitmq .client .amqp .impl .TestUtils .waitAtMost ;
24
25
import static java .time .Duration .ofMillis ;
35
36
import com .rabbitmq .client .amqp .Management ;
36
37
import com .rabbitmq .client .amqp .Publisher ;
37
38
import com .rabbitmq .client .amqp .Resource ;
39
+ import com .rabbitmq .client .amqp .impl .TestUtils .Sync ;
38
40
import java .util .List ;
39
41
import java .util .Set ;
40
42
import java .util .concurrent .*;
44
46
import org .junit .jupiter .api .*;
45
47
import org .junit .jupiter .params .ParameterizedTest ;
46
48
import org .junit .jupiter .params .provider .ValueSource ;
49
+ import org .slf4j .Logger ;
50
+ import org .slf4j .LoggerFactory ;
47
51
48
52
@ TestUtils .DisabledIfRabbitMqCtlNotSet
49
53
@ AmqpTestInfrastructure
50
54
public class TopologyRecoveryTest {
51
55
56
+ private static final Logger LOGGER = LoggerFactory .getLogger (TopologyRecoveryTest .class );
57
+
52
58
static final BackOffDelayPolicy BACK_OFF_DELAY_POLICY = BackOffDelayPolicy .fixed (ofMillis (100 ));
53
59
static Environment environment ;
54
60
TestInfo testInfo ;
55
61
String connectionName ;
56
- TestUtils . Sync recoveredSync ;
62
+ Sync recoveredSync ;
57
63
AtomicInteger connectionAttemptCount ;
58
64
59
65
@ BeforeEach
@@ -247,7 +253,7 @@ void autoDeleteExchangeAndExclusiveQueueShouldBeRedeclared(boolean isolateResour
247
253
connection .management ().queue (q ).autoDelete (true ).exclusive (true ).declare ();
248
254
connection .management ().binding ().sourceExchange (e ).key ("foo" ).destinationQueue (q ).bind ();
249
255
250
- TestUtils . Sync consumeSync = TestUtils .sync ();
256
+ Sync consumeSync = TestUtils .sync ();
251
257
Publisher publisher = connection .publisherBuilder ().exchange (e ).key ("foo" ).build ();
252
258
connection
253
259
.consumerBuilder ()
@@ -288,7 +294,7 @@ void autoDeleteExchangeAndExclusiveQueueWithE2eBindingShouldBeRedeclared(
288
294
connection .management ().binding ().sourceExchange (e1 ).destinationExchange (e2 ).bind ();
289
295
connection .management ().binding ().sourceExchange (e2 ).destinationQueue (q ).bind ();
290
296
291
- TestUtils . Sync consumeSync = TestUtils .sync ();
297
+ Sync consumeSync = TestUtils .sync ();
292
298
Publisher publisher = connection .publisherBuilder ().exchange (e1 ).build ();
293
299
connection
294
300
.consumerBuilder ()
@@ -326,7 +332,7 @@ void deletedQueueBindingIsNotRecovered(boolean isolateResources) {
326
332
connection .management ().queue (q ).declare ();
327
333
connection .management ().binding ().sourceExchange (e ).destinationQueue (q ).bind ();
328
334
329
- TestUtils . Sync consumeSync = TestUtils .sync ();
335
+ Sync consumeSync = TestUtils .sync ();
330
336
Publisher publisher = connection .publisherBuilder ().exchange (e ).build ();
331
337
Consumer consumer =
332
338
connection
@@ -382,7 +388,7 @@ void deletedExchangeBindingIsNotRecovered(boolean isolateResources) {
382
388
connection .management ().binding ().sourceExchange (e1 ).destinationExchange (e2 ).bind ();
383
389
connection .management ().binding ().sourceExchange (e2 ).destinationQueue (q ).bind ();
384
390
385
- TestUtils . Sync consumeSync = TestUtils .sync ();
391
+ Sync consumeSync = TestUtils .sync ();
386
392
Publisher publisher = connection .publisherBuilder ().exchange (e1 ).build ();
387
393
Consumer consumer =
388
394
connection
@@ -486,7 +492,7 @@ void recoverConsumers(boolean isolateResources) {
486
492
connection .management ().queue (q ).declare ();
487
493
connection .management ().binding ().sourceExchange (e ).destinationQueue (q ).bind ();
488
494
int consumerCount = 60 ;
489
- TestUtils . Sync consumeSync = TestUtils .sync (consumerCount );
495
+ Sync consumeSync = TestUtils .sync (consumerCount );
490
496
Consumer .MessageHandler handler =
491
497
(ctx , m ) -> {
492
498
consumeSync .down ();
@@ -526,7 +532,7 @@ void recoverPublisherConsumerSeveralTimes(boolean isolateResources) {
526
532
connection .management ().binding ().sourceExchange (e ).destinationQueue (q ).bind ();
527
533
528
534
Publisher publisher = connection .publisherBuilder ().exchange (e ).build ();
529
- TestUtils . Sync consumeSync = TestUtils .sync ();
535
+ Sync consumeSync = TestUtils .sync ();
530
536
connection
531
537
.consumerBuilder ()
532
538
.queue (q )
@@ -614,7 +620,7 @@ void autoDeleteClientNamedQueueShouldBeRecovered(boolean isolateResources) {
614
620
615
621
String queueName = queueInfo .name ();
616
622
617
- TestUtils . Sync consumeSync = TestUtils .sync ();
623
+ Sync consumeSync = TestUtils .sync ();
618
624
Publisher publisher = connection .publisherBuilder ().queue (queueName ).build ();
619
625
connection
620
626
.consumerBuilder ()
@@ -638,31 +644,43 @@ void autoDeleteClientNamedQueueShouldBeRecovered(boolean isolateResources) {
638
644
}
639
645
640
646
@ Test
641
- void shouldRecoverEvenIfManagementIsClosed () {
647
+ void shouldRecoverEvenIfManagementIsClosed (TestInfo info ) {
642
648
try (Connection connection = connection ()) {
643
649
Management management = connection .management ();
644
650
Management .QueueInfo queueInfo = management .queue ().exclusive (true ).declare ();
645
651
Publisher publisher = connection .publisherBuilder ().queue (queueInfo .name ()).build ();
646
- publisher .publish (publisher .message (), ctx -> {});
647
- TestUtils .Sync consumeSync = TestUtils .sync ();
652
+ Sync publishSync = TestUtils .sync ();
653
+ Publisher .Callback callback =
654
+ ctx -> {
655
+ if (ctx .status () == ACCEPTED ) {
656
+ publishSync .down ();
657
+ } else {
658
+ LOGGER .warn (
659
+ "Unexpected status: {} ({})" , ctx .status (), info .getTestMethod ().get ().getName ());
660
+ }
661
+ };
662
+ publisher .publish (publisher .message (), callback );
663
+ assertThat (publishSync ).completes ();
664
+ Sync consumeSync = TestUtils .sync ();
648
665
connection
649
666
.consumerBuilder ()
650
667
.queue (queueInfo .name ())
651
668
.messageHandler (
652
669
(ctx , message ) -> {
653
- ctx .accept ();
654
670
consumeSync .down ();
671
+ ctx .accept ();
655
672
})
656
673
.build ();
657
674
658
675
assertThat (consumeSync ).completes ();
659
676
waitAtMost (() -> management .queueInfo (queueInfo .name ()).messageCount () == 0 );
660
677
management .close ();
661
678
679
+ publishSync .reset ();
662
680
consumeSync .reset ();
663
681
664
682
closeConnectionAndWaitForRecovery ();
665
- publisher .publish (publisher .message (), ctx -> {} );
683
+ publisher .publish (publisher .message (), callback );
666
684
assertThat (consumeSync ).completes ();
667
685
assertThatThrownBy (() -> management .queueInfo (queueInfo .name ()))
668
686
.isInstanceOf (AmqpResourceClosedException .class );
@@ -710,7 +728,7 @@ Connection connection() {
710
728
Connection connection (
711
729
String name ,
712
730
boolean isolateResources ,
713
- TestUtils . Sync recoveredSync ,
731
+ Sync recoveredSync ,
714
732
java .util .function .Consumer <AmqpConnectionBuilder > builderCallback ) {
715
733
AmqpConnectionBuilder builder = (AmqpConnectionBuilder ) environment .connectionBuilder ();
716
734
builder
0 commit comments