Skip to content

Commit 12b73d7

Browse files
author
DanielePalaia
committed
adding purge test
1 parent 43efb26 commit 12b73d7

File tree

3 files changed

+80
-4
lines changed

3 files changed

+80
-4
lines changed

rabbitmq_amqp_python_client/management.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def _request(
105105
def declare_exchange(
106106
self, exchange_specification: ExchangeSpecification
107107
) -> ExchangeSpecification:
108-
logger.debug("delete_exchange operation called")
108+
logger.debug("declare_exchange operation called")
109109
body = {}
110110
body["auto_delete"] = exchange_specification.is_auto_delete
111111
body["durable"] = exchange_specification.is_durable
@@ -326,11 +326,13 @@ def unbind(self, binding_exchange_queue_path: str) -> None:
326326
],
327327
)
328328

329-
def purge_queue(self, queue_name: str) -> None:
329+
def purge_queue(self, queue_name: str) -> int:
330330
logger.debug("purge_queue operation called")
331331
path = purge_queue_address(queue_name)
332332

333-
self.request(
333+
print("path: " + path)
334+
335+
response = self.request(
334336
None,
335337
path,
336338
CommonValues.command_delete.value,
@@ -339,6 +341,8 @@ def purge_queue(self, queue_name: str) -> None:
339341
],
340342
)
341343

344+
return int(response.body["message_count"])
345+
342346
def queue_info(self, queue_name: str) -> QueueInfo:
343347
logger.debug("queue_info operation called")
344348
path = queue_address(queue_name)

tests/test_management.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,27 @@ def test_queue_info_with_validations() -> None:
111111
assert queue_info.message_count == 0
112112

113113

114+
def test_queue_info_for_stream_with_validations() -> None:
115+
connection = Connection("amqp://guest:guest@localhost:5672/")
116+
connection.dial()
117+
118+
stream_name = "test_stream_info_with_validation"
119+
management = connection.management()
120+
121+
queue_specification = StreamSpecification(
122+
name=stream_name,
123+
)
124+
management.declare_queue(queue_specification)
125+
126+
stream_info = management.queue_info(queue_name=stream_name)
127+
128+
management.delete_queue(stream_name)
129+
130+
assert stream_info.name == stream_name
131+
assert stream_info.queue_type == queue_specification.queue_type
132+
assert stream_info.message_count == 0
133+
134+
114135
def test_queue_precondition_fail() -> None:
115136
connection = Connection("amqp://guest:guest@localhost:5672/")
116137
connection.dial()
@@ -199,6 +220,30 @@ def test_declare_classic_queue_with_args() -> None:
199220
management.delete_queue(queue_name)
200221

201222

223+
def test_declare_classic_queue_with_invalid_args() -> None:
224+
connection = Connection("amqp://guest:guest@localhost:5672/")
225+
connection.dial()
226+
227+
queue_name = "test-queue_with_args"
228+
management = connection.management()
229+
test_failure = True
230+
231+
queue_specification = ClassicQueueSpecification(
232+
name=queue_name,
233+
queue_type=QueueType.classic,
234+
max_len=-5,
235+
)
236+
237+
try:
238+
management.declare_queue(queue_specification)
239+
except ValidationCodeException:
240+
test_failure = False
241+
242+
management.delete_queue(queue_name)
243+
244+
assert test_failure is False
245+
246+
202247
def test_declare_stream_with_args() -> None:
203248
connection = Connection("amqp://guest:guest@localhost:5672/")
204249
connection.dial()

tests/test_publisher.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
)
66

77

8-
def test_bind_exchange_to_queue() -> None:
8+
def test_publish_exchange() -> None:
99
connection = Connection("amqp://guest:guest@localhost:5672/")
1010
connection.dial()
1111

@@ -27,3 +27,30 @@ def test_bind_exchange_to_queue() -> None:
2727
publisher.close()
2828

2929
management.delete_queue(queue_name)
30+
31+
32+
def test_publish_purge() -> None:
33+
connection = Connection("amqp://guest:guest@localhost:5672/")
34+
connection.dial()
35+
36+
queue_name = "test-queue"
37+
management = connection.management()
38+
39+
management.declare_queue(QuorumQueueSpecification(name=queue_name))
40+
41+
raised = False
42+
43+
try:
44+
publisher = connection.publisher("/queues/" + queue_name)
45+
publisher.publish(Message(body="test"))
46+
except Exception:
47+
raised = True
48+
49+
message_purged = management.purge_queue(queue_name)
50+
51+
assert raised is False
52+
assert message_purged == 1
53+
54+
publisher.close()
55+
56+
management.delete_queue(queue_name)

0 commit comments

Comments
 (0)