Skip to content

Commit c7b6a0a

Browse files
committed
Return PurgeStatus from queuePurge
It only contains the number of messages deleted from the queue, but there may be more information in the future. Fixes #151
1 parent 98f17c7 commit c7b6a0a

File tree

3 files changed

+32
-4
lines changed

3 files changed

+32
-4
lines changed

src/main/java/com/rabbitmq/client/amqp/Management.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ public interface Management extends AutoCloseable {
7171
* Purge (delete all messages) from a queue.
7272
*
7373
* @param queue queue to delete messages from
74+
* @return the status of the purge operation
7475
*/
75-
void queuePurge(String queue);
76+
PurgeStatus queuePurge(String queue);
7677

7778
/**
7879
* Start exchange specification.
@@ -948,4 +949,14 @@ interface QueueInfo {
948949
*/
949950
int consumerCount();
950951
}
952+
953+
interface PurgeStatus {
954+
955+
/**
956+
* The number of messages purged from the queue.
957+
*
958+
* @return the number of messages purged
959+
*/
960+
long messageCount();
961+
}
951962
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -183,11 +183,13 @@ public UnbindSpecification unbind() {
183183
}
184184

185185
@Override
186-
public void queuePurge(String queue) {
186+
public PurgeStatus queuePurge(String queue) {
187187
Map<String, Object> responseBody = delete(queueLocation(queue) + "/messages", CODE_200);
188-
if (!responseBody.containsKey("message_count")) {
188+
if (!responseBody.containsKey("message_count")
189+
&& !(responseBody.get("message_count") instanceof Number)) {
189190
throw new AmqpException("Response body should contain message_count");
190191
}
192+
return new DefaultPurgeStatus(((Number) responseBody.get("message_count")).longValue());
191193
}
192194

193195
void setToken(String token) {
@@ -856,4 +858,18 @@ enum State {
856858
UNAVAILABLE,
857859
CLOSED
858860
}
861+
862+
private static final class DefaultPurgeStatus implements PurgeStatus {
863+
864+
private final long messageCount;
865+
866+
private DefaultPurgeStatus(long messageCount) {
867+
this.messageCount = messageCount;
868+
}
869+
870+
@Override
871+
public long messageCount() {
872+
return this.messageCount;
873+
}
874+
}
859875
}

src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -725,7 +725,8 @@ void queuePurgeShouldRemoveAllMessages(TestInfo info) {
725725
.forEach(ignored -> publisher.publish(publisher.message(), callback));
726726
assertThat(publishSync).completes();
727727
assertThat(management.queueInfo(q)).hasMessageCount(messageCount);
728-
management.queuePurge(q);
728+
Management.PurgeStatus purgeStatus = management.queuePurge(q);
729+
org.assertj.core.api.Assertions.assertThat(purgeStatus.messageCount()).isEqualTo(messageCount);
729730
assertThat(management.queueInfo(q)).isEmpty();
730731
}
731732

0 commit comments

Comments
 (0)