4
4
AddressHelper ,
5
5
AmqpMessage ,
6
6
ArgumentOutOfRangeException ,
7
- BindingSpecification ,
8
7
Connection ,
9
8
ConnectionClosed ,
10
9
Environment ,
11
- ExchangeSpecification ,
10
+ OutcomeState ,
12
11
QuorumQueueSpecification ,
13
12
StreamSpecification ,
14
13
)
15
14
16
15
from .http_requests import delete_all_connections
16
+ from .utils import create_binding , publish_per_message
17
17
18
18
19
19
def test_publish_queue (connection : Connection ) -> None :
@@ -31,21 +31,116 @@ def test_publish_queue(connection: Connection) -> None:
31
31
try :
32
32
publisher = connection .publisher ("/queues/" + queue_name )
33
33
status = publisher .publish (AmqpMessage (body = "test" ))
34
- if status .ACCEPTED :
34
+ if status .remote_state == OutcomeState .ACCEPTED :
35
+ accepted = True
36
+ except Exception :
37
+ raised = True
38
+
39
+ if publisher is not None :
40
+ publisher .close ()
41
+
42
+ management .delete_queue (queue_name )
43
+ management .close ()
44
+
45
+ assert accepted is True
46
+ assert raised is False
47
+
48
+
49
+ def test_publish_per_message (connection : Connection ) -> None :
50
+
51
+ queue_name = "test-queue-1"
52
+ queue_name_2 = "test-queue-2"
53
+ management = connection .management ()
54
+
55
+ management .declare_queue (QuorumQueueSpecification (name = queue_name ))
56
+ management .declare_queue (QuorumQueueSpecification (name = queue_name_2 ))
57
+
58
+ raised = False
59
+
60
+ publisher = None
61
+ accepted = False
62
+ accepted_2 = True
63
+
64
+ try :
65
+ publisher = connection .publisher ()
66
+ status = publish_per_message (
67
+ publisher , addr = AddressHelper .queue_address (queue_name )
68
+ )
69
+ if status .remote_state == OutcomeState .ACCEPTED :
35
70
accepted = True
71
+ status = publish_per_message (
72
+ publisher , addr = AddressHelper .queue_address (queue_name_2 )
73
+ )
74
+ if status .remote_state == OutcomeState .ACCEPTED :
75
+ accepted_2 = True
36
76
except Exception :
37
77
raised = True
38
78
39
79
if publisher is not None :
40
80
publisher .close ()
41
81
82
+ purged_messages_queue_1 = management .purge_queue (queue_name )
83
+ purged_messages_queue_2 = management .purge_queue (queue_name_2 )
42
84
management .delete_queue (queue_name )
85
+ management .delete_queue (queue_name_2 )
43
86
management .close ()
44
87
45
88
assert accepted is True
89
+ assert accepted_2 is True
90
+ assert purged_messages_queue_1 == 1
91
+ assert purged_messages_queue_2 == 1
46
92
assert raised is False
47
93
48
94
95
+ '''
96
+ def test_publish_per_message_to_exchange(connection: Connection) -> None:
97
+
98
+ exchange_name = "test-bind-exchange-to-queue-exchange"
99
+ queue_name = "test-bind-exchange-to-queue-queue"
100
+ routing_key = "routing-key"
101
+
102
+ management = connection.management()
103
+
104
+ binding_exchange_queue_path = create_binding(
105
+ management, exchange_name, queue_name, routing_key
106
+ )
107
+
108
+ raised = False
109
+
110
+ publisher = None
111
+ accepted = False
112
+ accepted_2 = True
113
+
114
+ try:
115
+ publisher = connection.publisher()
116
+ status = publish_per_message(
117
+ publisher, addr=AddressHelper.exchange_address(exchange_name, routing_key)
118
+ )
119
+ if status.remote_state == OutcomeState.ACCEPTED:
120
+ accepted = True
121
+ status = publish_per_message(
122
+ publisher, addr=AddressHelper.queue_address(queue_name)
123
+ )
124
+ if status.remote_state == OutcomeState.ACCEPTED:
125
+ accepted_2 = True
126
+ except Exception:
127
+ raised = True
128
+
129
+ if publisher is not None:
130
+ publisher.close()
131
+
132
+ purged_messages_queue = management.purge_queue(queue_name)
133
+ management.unbind(binding_exchange_queue_path)
134
+ management.delete_exchange(exchange_name)
135
+ management.delete_queue(queue_name)
136
+ management.close()
137
+
138
+ assert accepted is True
139
+ assert accepted_2 is True
140
+ assert purged_messages_queue == 2
141
+ assert raised is False
142
+ '''
143
+
49
144
def test_publish_ssl (connection_ssl : Connection ) -> None :
50
145
51
146
queue_name = "test-queue"
@@ -90,24 +185,37 @@ def test_publish_to_invalid_destination(connection: Connection) -> None:
90
185
assert raised is True
91
186
92
187
188
+ def test_publish_per_message_to_invalid_destination (connection : Connection ) -> None :
189
+
190
+ queue_name = "test-queue-1"
191
+ raised = False
192
+
193
+ publisher = None
194
+ message = AmqpMessage (body = "test" )
195
+ message .to_address ("/invalid_destination/" + queue_name )
196
+
197
+ try :
198
+ publisher = connection .publisher ()
199
+ publisher .publish (message )
200
+ except ArgumentOutOfRangeException :
201
+ raised = True
202
+ except Exception :
203
+ raised = False
204
+
205
+ if publisher is not None :
206
+ publisher .close ()
207
+
208
+ assert raised is True
209
+
210
+
93
211
def test_publish_exchange (connection : Connection ) -> None :
94
212
95
213
exchange_name = "test-exchange"
96
214
queue_name = "test-queue"
97
215
management = connection .management ()
98
216
routing_key = "routing-key"
99
217
100
- management .declare_exchange (ExchangeSpecification (name = exchange_name ))
101
-
102
- management .declare_queue (QuorumQueueSpecification (name = queue_name ))
103
-
104
- management .bind (
105
- BindingSpecification (
106
- source_exchange = exchange_name ,
107
- destination_queue = queue_name ,
108
- binding_key = routing_key ,
109
- )
110
- )
218
+ create_binding (management , exchange_name , queue_name , routing_key )
111
219
112
220
addr = AddressHelper .exchange_address (exchange_name , routing_key )
113
221
0 commit comments