1
1
import time
2
2
3
3
from rabbitmq_amqp_python_client import (
4
+ BindingSpecification ,
4
5
Connection ,
6
+ ExchangeSpecification ,
5
7
Message ,
6
8
QuorumQueueSpecification ,
9
+ exchange_address ,
7
10
)
8
11
9
12
10
- def test_publish_exchange (connection : Connection ) -> None :
13
+ def test_publish_queue (connection : Connection ) -> None :
11
14
12
15
queue_name = "test-queue"
13
16
management = connection .management ()
@@ -29,6 +32,43 @@ def test_publish_exchange(connection: Connection) -> None:
29
32
management .delete_queue (queue_name )
30
33
31
34
35
+ def test_publish_exchange (connection : Connection ) -> None :
36
+
37
+ exchange_name = "test-exchange"
38
+ queue_name = "test-queue"
39
+ management = connection .management ()
40
+ routing_key = "routing-key"
41
+
42
+ management .declare_exchange (ExchangeSpecification (name = exchange_name , arguments = {}))
43
+
44
+ management .declare_queue (QuorumQueueSpecification (name = queue_name ))
45
+
46
+ management .bind (
47
+ BindingSpecification (
48
+ source_exchange = exchange_name ,
49
+ destination_queue = queue_name ,
50
+ binding_key = routing_key ,
51
+ )
52
+ )
53
+
54
+ addr = exchange_address (exchange_name , routing_key )
55
+
56
+ raised = False
57
+
58
+ try :
59
+ publisher = connection .publisher (addr )
60
+ publisher .publish (Message (body = "test" ))
61
+ except Exception :
62
+ raised = True
63
+
64
+ assert raised is False
65
+
66
+ publisher .close ()
67
+
68
+ management .delete_exchange (exchange_name )
69
+ management .delete_queue (queue_name )
70
+
71
+
32
72
def test_publish_purge (connection : Connection ) -> None :
33
73
connection = Connection ("amqp://guest:guest@localhost:5672/" )
34
74
connection .dial ()
0 commit comments