21
21
import static com .rabbitmq .client .amqp .ConnectionSettings .Affinity .Operation .PUBLISH ;
22
22
import static com .rabbitmq .client .amqp .impl .Assertions .assertThat ;
23
23
import static com .rabbitmq .client .amqp .impl .TestUtils .sync ;
24
+ import static com .rabbitmq .client .amqp .impl .TestUtils .waitAtMost ;
24
25
import static java .time .Duration .ofMillis ;
25
26
import static org .assertj .core .api .Assertions .assertThat ;
26
27
@@ -211,6 +212,90 @@ void consumeFromMovingQq() {
211
212
}
212
213
}
213
214
215
+ @ Test
216
+ void publishToRestartedStream () {
217
+ try {
218
+ management .queue (q ).type (Management .QueueType .STREAM ).declare ();
219
+
220
+ AmqpConnection publishConnection = connection (b -> b .affinity ().queue (q ).operation (PUBLISH ));
221
+ assertThat (publishConnection ).isOnLeader (queueInfo ());
222
+
223
+ Publisher publisher = publishConnection .publisherBuilder ().queue (q ).build ();
224
+ Sync publishSync = sync ();
225
+ publisher .publish (publisher .message ().messageId (1L ), ctx -> publishSync .down ());
226
+ assertThat (publishSync ).completes ();
227
+
228
+ restartStream ();
229
+
230
+ publishSync .reset ();
231
+ publisher .publish (publisher .message ().messageId (2L ), ctx -> publishSync .down ());
232
+ assertThat (publishSync ).completes ();
233
+
234
+ int messageCount = 2 ;
235
+ waitAtMost (() -> queueInfo ().messageCount () == messageCount );
236
+ Sync consumeSync = sync (messageCount );
237
+ Set <Long > messageIds = ConcurrentHashMap .newKeySet (messageCount );
238
+ connection .consumerBuilder ().queue (q ).stream ()
239
+ .offset (ConsumerBuilder .StreamOffsetSpecification .FIRST )
240
+ .builder ()
241
+ .messageHandler (
242
+ (ctx , msg ) -> {
243
+ messageIds .add (msg .messageIdAsLong ());
244
+ consumeSync .down ();
245
+ ctx .accept ();
246
+ })
247
+ .build ();
248
+ assertThat (consumeSync ).completes ();
249
+ assertThat (messageIds ).containsExactlyInAnyOrder (1L , 2L );
250
+ } finally {
251
+ management .queueDeletion ().delete (q );
252
+ }
253
+ }
254
+
255
+ @ Test
256
+ void consumeFromRestartedStream () {
257
+ try {
258
+ management .queue (q ).type (Management .QueueType .STREAM ).declare ();
259
+
260
+ AmqpConnection consumeConnection = connection (b -> b .affinity ().queue (q ).operation (CONSUME ));
261
+ assertThat (consumeConnection ).isOnFollower (queueInfo ());
262
+
263
+ Set <Long > messageIds = ConcurrentHashMap .newKeySet ();
264
+ Sync consumeSync = sync ();
265
+ consumeConnection .consumerBuilder ().queue (q ).stream ()
266
+ .offset (ConsumerBuilder .StreamOffsetSpecification .FIRST )
267
+ .builder ()
268
+ .messageHandler (
269
+ (ctx , msg ) -> {
270
+ messageIds .add (msg .messageIdAsLong ());
271
+ consumeSync .down ();
272
+ ctx .accept ();
273
+ })
274
+ .build ();
275
+
276
+ Publisher publisher = connection .publisherBuilder ().queue (q ).build ();
277
+ Sync publishSync = sync ();
278
+ publisher .publish (publisher .message ().messageId (1L ), ctx -> publishSync .down ());
279
+ assertThat (publishSync ).completes ();
280
+ publishSync .reset ();
281
+
282
+ assertThat (consumeSync ).completes ();
283
+ assertThat (messageIds ).containsExactlyInAnyOrder (1L );
284
+ consumeSync .reset ();
285
+
286
+ restartStream ();
287
+
288
+ publisher .publish (publisher .message ().messageId (2L ), ctx -> publishSync .down ());
289
+ assertThat (publishSync ).completes ();
290
+ publishSync .reset ();
291
+
292
+ assertThat (consumeSync ).completes ();
293
+ assertThat (messageIds ).containsExactlyInAnyOrder (1L , 2L );
294
+ } finally {
295
+ management .queueDeletion ().delete (q );
296
+ }
297
+ }
298
+
214
299
String moveQqLeader () {
215
300
String initialLeader = deleteQqLeader ();
216
301
addQqMember (initialLeader );
@@ -220,23 +305,47 @@ String moveQqLeader() {
220
305
}
221
306
222
307
String deleteQqLeader () {
308
+ return deleteLeader (this ::deleteQqMember );
309
+ }
310
+
311
+ String deleteStreamLeader () {
312
+ return deleteLeader (leader -> Cli .deleteStreamMember (q , leader ));
313
+ }
314
+
315
+ String deleteLeader (Consumer <String > deleteMemberOperation ) {
223
316
Management .QueueInfo info = queueInfo ();
224
317
String initialLeader = info .leader ();
225
318
int initialReplicaCount = info .replicas ().size ();
226
- deleteQqMember (initialLeader );
319
+ deleteMemberOperation . accept (initialLeader );
227
320
TestUtils .waitAtMost (() -> !queueInfo ().leader ().equals (initialLeader ));
228
321
assertThat (queueInfo ().replicas ()).hasSize (initialReplicaCount - 1 );
229
322
return initialLeader ;
230
323
}
231
324
325
+ void restartStream () {
326
+ Cli .restartStream (this .q );
327
+ }
328
+
232
329
void deleteQqMember (String member ) {
233
330
Cli .deleteQuorumQueueMember (q , member );
234
331
}
235
332
333
+ void deleteStreamMember (String member ) {
334
+ Cli .deleteStreamMember (q , member );
335
+ }
336
+
236
337
void addQqMember (String newMember ) {
338
+ addMember (() -> Cli .addQuorumQueueMember (q , newMember ));
339
+ }
340
+
341
+ void addStreamMember (String newMember ) {
342
+ addMember (() -> Cli .addStreamMember (q , newMember ));
343
+ }
344
+
345
+ void addMember (Runnable addMemberOperation ) {
237
346
Management .QueueInfo info = queueInfo ();
238
347
int initialReplicaCount = info .replicas ().size ();
239
- Cli . addQuorumQueueMember ( q , newMember );
348
+ addMemberOperation . run ( );
240
349
TestUtils .waitAtMost (() -> queueInfo ().replicas ().size () == initialReplicaCount + 1 );
241
350
}
242
351
0 commit comments