4
4
AddressHelper ,
5
5
AmqpMessage ,
6
6
ArgumentOutOfRangeException ,
7
- BindingSpecification ,
8
7
Connection ,
9
8
ConnectionClosed ,
10
9
Environment ,
11
- ExchangeSpecification ,
10
+ OutcomeState ,
12
11
QuorumQueueSpecification ,
13
12
StreamSpecification ,
14
13
)
15
14
16
15
from .http_requests import delete_all_connections
16
+ from .utils import create_binding , publish_per_message
17
17
18
18
19
19
def test_publish_queue (connection : Connection ) -> None :
@@ -31,18 +31,111 @@ def test_publish_queue(connection: Connection) -> None:
31
31
try :
32
32
publisher = connection .publisher ("/queues/" + queue_name )
33
33
status = publisher .publish (AmqpMessage (body = "test" ))
34
- if status .ACCEPTED :
34
+ if status .remote_state == OutcomeState .ACCEPTED :
35
+ accepted = True
36
+ except Exception :
37
+ raised = True
38
+
39
+ if publisher is not None :
40
+ publisher .close ()
41
+
42
+ management .delete_queue (queue_name )
43
+ management .close ()
44
+
45
+ assert accepted is True
46
+ assert raised is False
47
+
48
+
49
+ def test_publish_per_message (connection : Connection ) -> None :
50
+
51
+ queue_name = "test-queue-1"
52
+ queue_name_2 = "test-queue-2"
53
+ management = connection .management ()
54
+
55
+ management .declare_queue (QuorumQueueSpecification (name = queue_name ))
56
+ management .declare_queue (QuorumQueueSpecification (name = queue_name_2 ))
57
+
58
+ raised = False
59
+
60
+ publisher = None
61
+ accepted = False
62
+ accepted_2 = True
63
+
64
+ try :
65
+ publisher = connection .publisher ()
66
+ status = publish_per_message (
67
+ publisher , addr = AddressHelper .queue_address (queue_name )
68
+ )
69
+ if status .remote_state == OutcomeState .ACCEPTED :
35
70
accepted = True
71
+ status = publish_per_message (
72
+ publisher , addr = AddressHelper .queue_address (queue_name_2 )
73
+ )
74
+ if status .remote_state == OutcomeState .ACCEPTED :
75
+ accepted_2 = True
36
76
except Exception :
37
77
raised = True
38
78
39
79
if publisher is not None :
40
80
publisher .close ()
41
81
82
+ purged_messages_queue_1 = management .purge_queue (queue_name )
83
+ purged_messages_queue_2 = management .purge_queue (queue_name_2 )
42
84
management .delete_queue (queue_name )
85
+ management .delete_queue (queue_name_2 )
43
86
management .close ()
44
87
45
88
assert accepted is True
89
+ assert accepted_2 is True
90
+ assert purged_messages_queue_1 == 1
91
+ assert purged_messages_queue_2 == 1
92
+ assert raised is False
93
+
94
+
95
+ def test_publish_per_message_to_exchange (connection : Connection ) -> None :
96
+
97
+ # exchange_name = "test-exchange-per-message"
98
+ # queue_name = "test-queue-per-message"
99
+ # management = connection.management()
100
+ # routing_key = "routing-key-per-message"
101
+
102
+ # create_binding(management, exchange_name, queue_name, routing_key)
103
+
104
+ raised = False
105
+
106
+ # publisher = None
107
+ # accepted = False
108
+ # accepted_2 = True
109
+
110
+ """
111
+ try:
112
+ publisher = connection.publisher()
113
+ status = publish_per_message(
114
+ publisher, addr=AddressHelper.exchange_address(exchange_name, routing_key)
115
+ )
116
+ if status.remote_state == OutcomeState.ACCEPTED:
117
+ accepted = True
118
+ status = publish_per_message(
119
+ publisher, addr=AddressHelper.queue_address(queue_name)
120
+ )
121
+ if status.remote_state == OutcomeState.ACCEPTED:
122
+ accepted_2 = True
123
+ except Exception:
124
+ raised = True
125
+
126
+ # if publisher is not None:
127
+ publisher.close()
128
+
129
+ purged_messages_queue = management.purge_queue(queue_name)
130
+ management.unbind(bind_name)
131
+ management.delete_exchange(exchange_name)
132
+ management.delete_queue(queue_name)
133
+ management.close()
134
+ """
135
+
136
+ # assert accepted is True
137
+ # assert accepted_2 is True
138
+ # assert purged_messages_queue == 2
46
139
assert raised is False
47
140
48
141
@@ -90,24 +183,36 @@ def test_publish_to_invalid_destination(connection: Connection) -> None:
90
183
assert raised is True
91
184
92
185
186
+ def test_publish_per_message_to_invalid_destination (connection : Connection ) -> None :
187
+
188
+ queue_name = "test-queue-1"
189
+ raised = False
190
+
191
+ message = AmqpMessage (body = "test" )
192
+ message .to_address ("/invalid_destination/" + queue_name )
193
+ publisher = connection .publisher ()
194
+
195
+ try :
196
+ publisher .publish (message )
197
+ except ArgumentOutOfRangeException :
198
+ raised = True
199
+ except Exception :
200
+ raised = False
201
+
202
+ if publisher is not None :
203
+ publisher .close ()
204
+
205
+ assert raised is True
206
+
207
+
93
208
def test_publish_exchange (connection : Connection ) -> None :
94
209
95
210
exchange_name = "test-exchange"
96
211
queue_name = "test-queue"
97
212
management = connection .management ()
98
213
routing_key = "routing-key"
99
214
100
- management .declare_exchange (ExchangeSpecification (name = exchange_name ))
101
-
102
- management .declare_queue (QuorumQueueSpecification (name = queue_name ))
103
-
104
- management .bind (
105
- BindingSpecification (
106
- source_exchange = exchange_name ,
107
- destination_queue = queue_name ,
108
- binding_key = routing_key ,
109
- )
110
- )
215
+ bind_name = create_binding (management , exchange_name , queue_name , routing_key )
111
216
112
217
addr = AddressHelper .exchange_address (exchange_name , routing_key )
113
218
@@ -124,6 +229,7 @@ def test_publish_exchange(connection: Connection) -> None:
124
229
125
230
publisher .close ()
126
231
232
+ management .unbind (bind_name )
127
233
management .delete_exchange (exchange_name )
128
234
management .delete_queue (queue_name )
129
235
management .close ()
0 commit comments