Skip to content

Commit 29ff443

Browse files
committed
Add test for consumer with moving quorum queue
1 parent 0ea4048 commit 29ff443

File tree

2 files changed

+80
-2
lines changed

2 files changed

+80
-2
lines changed

src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121

2222
import com.rabbitmq.client.amqp.Management;
2323
import java.time.Duration;
24+
import java.util.List;
2425
import java.util.concurrent.CountDownLatch;
2526
import java.util.concurrent.TimeUnit;
2627
import java.util.concurrent.atomic.AtomicReference;
28+
import java.util.stream.Collectors;
2729
import org.assertj.core.api.AbstractObjectAssert;
2830

2931
final class Assertions {
@@ -218,6 +220,7 @@ private ConnectionAssert(AmqpConnection connection) {
218220
}
219221

220222
ConnectionAssert hasNodename(String nodename) {
223+
Assert.notNull(nodename, "Expected nodename cannot be null");
221224
isNotNull();
222225
if (!actual.connectionNodename().equals(nodename)) {
223226
fail(
@@ -226,5 +229,19 @@ ConnectionAssert hasNodename(String nodename) {
226229
}
227230
return this;
228231
}
232+
233+
ConnectionAssert isOnFollower(Management.QueueInfo info) {
234+
Assert.notNull(info, "Queue info cannot be null");
235+
List<String> followers =
236+
info.replicas().stream()
237+
.filter(n -> !n.equals(info.leader()))
238+
.collect(Collectors.toList());
239+
if (!followers.contains(actual.connectionNodename())) {
240+
fail(
241+
"Connection is expected to be on follower node(s) '%s' but is on '%s'",
242+
followers, actual.connectionNodename());
243+
}
244+
return this;
245+
}
229246
}
230247
}

src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,64 @@ void publishToMovingQq() {
148148
})
149149
.build();
150150
assertThat(consumeSync).completes();
151-
assertThat(messageIds).containsOnly(1L, 2L, 3L);
151+
assertThat(messageIds).containsExactlyInAnyOrder(1L, 2L, 3L);
152+
} finally {
153+
management.queueDeletion().delete(q);
154+
}
155+
}
156+
157+
@Test
158+
void consumeFromMovingQq() {
159+
try {
160+
management.queue(q).type(Management.QueueType.QUORUM).declare();
161+
162+
AmqpConnection consumeConnection = connection(b -> b.affinity().queue(q).operation(CONSUME));
163+
assertThat(consumeConnection).isOnFollower(queueInfo());
164+
165+
Set<Long> messageIds = ConcurrentHashMap.newKeySet();
166+
Sync consumeSync = sync();
167+
consumeConnection
168+
.consumerBuilder()
169+
.queue(q)
170+
.messageHandler(
171+
(ctx, msg) -> {
172+
messageIds.add(msg.messageIdAsLong());
173+
consumeSync.down();
174+
ctx.accept();
175+
})
176+
.build();
177+
178+
Publisher publisher = connection.publisherBuilder().queue(q).build();
179+
Sync publishSync = sync();
180+
publisher.publish(publisher.message().messageId(1L), ctx -> publishSync.down());
181+
assertThat(publishSync).completes();
182+
publishSync.reset();
183+
184+
assertThat(consumeSync).completes();
185+
assertThat(messageIds).containsExactlyInAnyOrder(1L);
186+
consumeSync.reset();
187+
188+
String follower = consumeConnection.connectionNodename();
189+
190+
deleteQqMember(follower);
191+
192+
publisher.publish(publisher.message().messageId(2L), ctx -> publishSync.down());
193+
assertThat(publishSync).completes();
194+
publishSync.reset();
195+
196+
assertThat(consumeSync).completes();
197+
assertThat(messageIds).containsExactlyInAnyOrder(1L, 2L);
198+
consumeSync.reset();
199+
200+
addQqMember(follower);
201+
202+
publisher.publish(publisher.message().messageId(3L), ctx -> publishSync.down());
203+
assertThat(publishSync).completes();
204+
publishSync.reset();
205+
206+
assertThat(consumeSync).completes();
207+
assertThat(messageIds).containsExactlyInAnyOrder(1L, 2L, 3L);
208+
consumeSync.reset();
152209
} finally {
153210
management.queueDeletion().delete(q);
154211
}
@@ -166,12 +223,16 @@ String deleteQqLeader() {
166223
Management.QueueInfo info = queueInfo();
167224
String initialLeader = info.leader();
168225
int initialReplicaCount = info.replicas().size();
169-
Cli.deleteQuorumQueueMember(q, initialLeader);
226+
deleteQqMember(initialLeader);
170227
TestUtils.waitAtMost(() -> !queueInfo().leader().equals(initialLeader));
171228
assertThat(queueInfo().replicas()).hasSize(initialReplicaCount - 1);
172229
return initialLeader;
173230
}
174231

232+
void deleteQqMember(String member) {
233+
Cli.deleteQuorumQueueMember(q, member);
234+
}
235+
175236
void addQqMember(String newMember) {
176237
Management.QueueInfo info = queueInfo();
177238
int initialReplicaCount = info.replicas().size();

0 commit comments

Comments
 (0)