9
9
ConnectionClosed ,
10
10
Environment ,
11
11
ExchangeSpecification ,
12
+ OutcomeState ,
12
13
QuorumQueueSpecification ,
13
14
StreamSpecification ,
14
15
)
15
16
16
17
from .http_requests import delete_all_connections
18
+ from .utils import publish_per_message , create_binding
17
19
18
20
19
21
def test_publish_queue (connection : Connection ) -> None :
@@ -31,18 +33,112 @@ def test_publish_queue(connection: Connection) -> None:
31
33
try :
32
34
publisher = connection .publisher ("/queues/" + queue_name )
33
35
status = publisher .publish (AmqpMessage (body = "test" ))
34
- if status .ACCEPTED :
36
+ if status .remote_state == OutcomeState .ACCEPTED :
37
+ accepted = True
38
+ except Exception :
39
+ raised = True
40
+
41
+ if publisher is not None :
42
+ publisher .close ()
43
+
44
+ management .delete_queue (queue_name )
45
+ management .close ()
46
+
47
+ assert accepted is True
48
+ assert raised is False
49
+
50
+
51
+ def test_publish_per_message (connection : Connection ) -> None :
52
+
53
+ queue_name = "test-queue-1"
54
+ queue_name_2 = "test-queue-2"
55
+ management = connection .management ()
56
+
57
+ management .declare_queue (QuorumQueueSpecification (name = queue_name ))
58
+ management .declare_queue (QuorumQueueSpecification (name = queue_name_2 ))
59
+
60
+ raised = False
61
+
62
+ publisher = None
63
+ accepted = False
64
+ accepted_2 = True
65
+
66
+ try :
67
+ publisher = connection .publisher ()
68
+ status = publish_per_message (
69
+ publisher , addr = AddressHelper .queue_address (queue_name )
70
+ )
71
+ if status .remote_state == OutcomeState .ACCEPTED :
72
+ accepted = True
73
+ status = publish_per_message (
74
+ publisher , addr = AddressHelper .queue_address (queue_name_2 )
75
+ )
76
+ if status .remote_state == OutcomeState .ACCEPTED :
77
+ accepted_2 = True
78
+ except Exception :
79
+ raised = True
80
+
81
+ if publisher is not None :
82
+ publisher .close ()
83
+
84
+ purged_messages_queue_1 = management .purge_queue (queue_name )
85
+ purged_messages_queue_2 = management .purge_queue (queue_name_2 )
86
+ management .delete_queue (queue_name )
87
+ management .delete_queue (queue_name_2 )
88
+ management .close ()
89
+
90
+ assert accepted is True
91
+ assert accepted_2 is True
92
+ assert purged_messages_queue_1 == 1
93
+ assert purged_messages_queue_2 == 1
94
+ assert raised is False
95
+
96
+
97
+ def test_publish_per_message_to_exchange (connection : Connection ) -> None :
98
+
99
+ exchange_name = "test-bind-exchange-to-queue-exchange"
100
+ queue_name = "test-bind-exchange-to-queue-queue"
101
+ routing_key = "routing-key"
102
+
103
+ management = connection .management ()
104
+
105
+ binding_exchange_queue_path = create_binding (
106
+ management , exchange_name , queue_name , routing_key
107
+ )
108
+
109
+ raised = False
110
+
111
+ publisher = None
112
+ accepted = False
113
+ accepted_2 = True
114
+
115
+ try :
116
+ publisher = connection .publisher ()
117
+ status = publish_per_message (
118
+ publisher , addr = AddressHelper .exchange_address (exchange_name , routing_key )
119
+ )
120
+ if status .remote_state == OutcomeState .ACCEPTED :
35
121
accepted = True
122
+ status = publish_per_message (
123
+ publisher , addr = AddressHelper .queue_address (queue_name )
124
+ )
125
+ if status .remote_state == OutcomeState .ACCEPTED :
126
+ accepted_2 = True
36
127
except Exception :
37
128
raised = True
38
129
39
130
if publisher is not None :
40
131
publisher .close ()
41
132
133
+ purged_messages_queue = management .purge_queue (queue_name )
134
+ management .unbind (binding_exchange_queue_path )
135
+ management .delete_exchange (exchange_name )
42
136
management .delete_queue (queue_name )
43
137
management .close ()
44
138
45
139
assert accepted is True
140
+ assert accepted_2 is True
141
+ assert purged_messages_queue == 2
46
142
assert raised is False
47
143
48
144
@@ -90,24 +186,37 @@ def test_publish_to_invalid_destination(connection: Connection) -> None:
90
186
assert raised is True
91
187
92
188
189
+ def test_publish_per_message_to_invalid_destination (connection : Connection ) -> None :
190
+
191
+ queue_name = "test-queue-1"
192
+ raised = False
193
+
194
+ publisher = None
195
+ message = AmqpMessage (body = "test" )
196
+ message .to_address ("/invalid_destination/" + queue_name )
197
+
198
+ try :
199
+ publisher = connection .publisher ()
200
+ publisher .publish (message )
201
+ except ArgumentOutOfRangeException :
202
+ raised = True
203
+ except Exception :
204
+ raised = False
205
+
206
+ if publisher is not None :
207
+ publisher .close ()
208
+
209
+ assert raised is True
210
+
211
+
93
212
def test_publish_exchange (connection : Connection ) -> None :
94
213
95
214
exchange_name = "test-exchange"
96
215
queue_name = "test-queue"
97
216
management = connection .management ()
98
217
routing_key = "routing-key"
99
218
100
- management .declare_exchange (ExchangeSpecification (name = exchange_name ))
101
-
102
- management .declare_queue (QuorumQueueSpecification (name = queue_name ))
103
-
104
- management .bind (
105
- BindingSpecification (
106
- source_exchange = exchange_name ,
107
- destination_queue = queue_name ,
108
- binding_key = routing_key ,
109
- )
110
- )
219
+ create_binding (management , exchange_name , queue_name , routing_key )
111
220
112
221
addr = AddressHelper .exchange_address (exchange_name , routing_key )
113
222
0 commit comments