Skip to content

Commit f05c54e

Browse files
author
DanielePalaia
committed
adding purge test
1 parent 43efb26 commit f05c54e

File tree

3 files changed

+78
-4
lines changed

3 files changed

+78
-4
lines changed

rabbitmq_amqp_python_client/management.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def _request(
105105
def declare_exchange(
106106
self, exchange_specification: ExchangeSpecification
107107
) -> ExchangeSpecification:
108-
logger.debug("delete_exchange operation called")
108+
logger.debug("declare_exchange operation called")
109109
body = {}
110110
body["auto_delete"] = exchange_specification.is_auto_delete
111111
body["durable"] = exchange_specification.is_durable
@@ -326,11 +326,13 @@ def unbind(self, binding_exchange_queue_path: str) -> None:
326326
],
327327
)
328328

329-
def purge_queue(self, queue_name: str) -> None:
329+
def purge_queue(self, queue_name: str) -> int:
330330
logger.debug("purge_queue operation called")
331331
path = purge_queue_address(queue_name)
332332

333-
self.request(
333+
print("path: " + path)
334+
335+
response = self.request(
334336
None,
335337
path,
336338
CommonValues.command_delete.value,
@@ -339,6 +341,8 @@ def purge_queue(self, queue_name: str) -> None:
339341
],
340342
)
341343

344+
return int(response.body["message_count"])
345+
342346
def queue_info(self, queue_name: str) -> QueueInfo:
343347
logger.debug("queue_info operation called")
344348
path = queue_address(queue_name)

tests/test_management.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,25 @@ def test_queue_info_with_validations() -> None:
110110
assert queue_info.is_durable == queue_specification.is_durable
111111
assert queue_info.message_count == 0
112112

113+
def test_queue_info_for_stream_with_validations() -> None:
114+
connection = Connection("amqp://guest:guest@localhost:5672/")
115+
connection.dial()
116+
117+
stream_name = "test_stream_info_with_validation"
118+
management = connection.management()
119+
120+
queue_specification = StreamSpecification(
121+
name=stream_name,
122+
)
123+
management.declare_queue(queue_specification)
124+
125+
stream_info = management.queue_info(queue_name=stream_name)
126+
127+
management.delete_queue(stream_name)
128+
129+
assert stream_info.name == stream_name
130+
assert stream_info.queue_type == queue_specification.queue_type
131+
assert stream_info.message_count == 0
113132

114133
def test_queue_precondition_fail() -> None:
115134
connection = Connection("amqp://guest:guest@localhost:5672/")
@@ -199,6 +218,30 @@ def test_declare_classic_queue_with_args() -> None:
199218
management.delete_queue(queue_name)
200219

201220

221+
def test_declare_classic_queue_with_invalid_args() -> None:
222+
connection = Connection("amqp://guest:guest@localhost:5672/")
223+
connection.dial()
224+
225+
queue_name = "test-queue_with_args"
226+
management = connection.management()
227+
test_failure = True
228+
229+
queue_specification = ClassicQueueSpecification(
230+
name=queue_name,
231+
queue_type=QueueType.classic,
232+
max_len=-5,
233+
)
234+
235+
try:
236+
management.declare_queue(queue_specification)
237+
except ValidationCodeException:
238+
test_failure = False
239+
240+
management.delete_queue(queue_name)
241+
242+
assert test_failure is False
243+
244+
202245
def test_declare_stream_with_args() -> None:
203246
connection = Connection("amqp://guest:guest@localhost:5672/")
204247
connection.dial()

tests/test_publisher.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
)
66

77

8-
def test_bind_exchange_to_queue() -> None:
8+
def test_publish_exchange() -> None:
99
connection = Connection("amqp://guest:guest@localhost:5672/")
1010
connection.dial()
1111

@@ -27,3 +27,30 @@ def test_bind_exchange_to_queue() -> None:
2727
publisher.close()
2828

2929
management.delete_queue(queue_name)
30+
31+
32+
def test_publish_purge() -> None:
33+
connection = Connection("amqp://guest:guest@localhost:5672/")
34+
connection.dial()
35+
36+
queue_name = "test-queue"
37+
management = connection.management()
38+
39+
management.declare_queue(QuorumQueueSpecification(name=queue_name))
40+
41+
raised = False
42+
43+
try:
44+
publisher = connection.publisher("/queues/" + queue_name)
45+
publisher.publish(Message(body="test"))
46+
except Exception:
47+
raised = True
48+
49+
message_purged = management.purge_queue(queue_name)
50+
51+
assert raised is False
52+
assert message_purged == 1
53+
54+
publisher.close()
55+
56+
management.delete_queue(queue_name)

0 commit comments

Comments
 (0)