File tree Expand file tree Collapse file tree 1 file changed +18
-1
lines changed
src/test/java/com/rabbitmq/client/amqp/impl Expand file tree Collapse file tree 1 file changed +18
-1
lines changed Original file line number Diff line number Diff line change 27
27
import com .rabbitmq .client .amqp .*;
28
28
import com .rabbitmq .client .amqp .impl .TestUtils .Sync ;
29
29
import java .util .List ;
30
+ import java .util .Set ;
31
+ import java .util .concurrent .ConcurrentHashMap ;
30
32
import java .util .concurrent .atomic .AtomicInteger ;
31
33
import java .util .function .Consumer ;
32
34
import org .junit .jupiter .api .*;
@@ -131,7 +133,22 @@ void publishToMovingQq() {
131
133
publisher .publish (publisher .message ().messageId (3L ), ctx -> publishSync .down ());
132
134
assertThat (publishSync ).completes ();
133
135
134
- assertThat (queueInfo ()).hasMessageCount (3 );
136
+ int messageCount = 3 ;
137
+ assertThat (queueInfo ()).hasMessageCount (messageCount );
138
+ Sync consumeSync = sync (messageCount );
139
+ Set <Long > messageIds = ConcurrentHashMap .newKeySet (3 );
140
+ connection
141
+ .consumerBuilder ()
142
+ .queue (q )
143
+ .messageHandler (
144
+ (ctx , msg ) -> {
145
+ messageIds .add (msg .messageIdAsLong ());
146
+ consumeSync .down ();
147
+ ctx .accept ();
148
+ })
149
+ .build ();
150
+ assertThat (consumeSync ).completes ();
151
+ assertThat (messageIds ).containsOnly (1L , 2L , 3L );
135
152
} finally {
136
153
management .queueDeletion ().delete (q );
137
154
}
You can’t perform that action at this time.
0 commit comments