13
13
ExchangeSpecification ,
14
14
Message ,
15
15
QuorumQueueSpecification ,
16
+ Management ,
17
+ Publisher ,
18
+ Consumer ,
16
19
)
17
20
18
- connection = None
19
- management = None
20
- publisher = None
21
- consumer = None
21
+ from typing import Optional
22
+ from dataclasses import dataclass
23
+
24
+
25
+ # here we keep track of the objects we need to reconnect
26
+ @dataclass
27
+ class ConnectionConfiguration :
28
+ connection : Optional [Connection ] = None
29
+ management : Optional [Management ] = None
30
+ publisher : Optional [Publisher ] = None
31
+ consumer : Optional [Consumer ] = None
32
+
33
+
34
+ connection_configuration = ConnectionConfiguration ()
35
+ messages_to_publish = 50000
22
36
23
37
24
38
# disconnection callback
@@ -30,22 +44,27 @@ def on_disconnection():
30
44
queue_name = "example-queue"
31
45
routing_key = "routing-key"
32
46
33
- global connection
34
- global management
35
- global publisher
36
- global consumer
47
+ global connection_configuration
37
48
38
49
addr = AddressHelper .exchange_address (exchange_name , routing_key )
39
50
addr_queue = AddressHelper .queue_address (queue_name )
40
51
41
- if connection is not None :
42
- connection = create_connection ()
43
- if management is not None :
44
- management = connection .management ()
45
- if publisher is not None :
46
- publisher = connection .publisher (addr )
47
- if consumer is not None :
48
- consumer = connection .consumer (addr_queue , handler = MyMessageHandler ())
52
+ if connection_configuration .connection is not None :
53
+ connection_configuration .connection = create_connection ()
54
+ if connection_configuration .management is not None :
55
+ connection_configuration .management = (
56
+ connection_configuration .connection .management ()
57
+ )
58
+ if connection_configuration .publisher is not None :
59
+ connection_configuration .publisher = (
60
+ connection_configuration .connection .publisher (addr )
61
+ )
62
+ if connection_configuration .consumer is not None :
63
+ connection_configuration .consumer = (
64
+ connection_configuration .connection .consumer (
65
+ addr_queue , handler = MyMessageHandler ()
66
+ )
67
+ )
49
68
50
69
51
70
class MyMessageHandler (AMQPMessagingHandler ):
@@ -55,7 +74,8 @@ def __init__(self):
55
74
self ._count = 0
56
75
57
76
def on_message (self , event : Event ):
58
- print ("received message: " + str (event .message .annotations ))
77
+ if self ._count % 1000 == 0 :
78
+ print ("received 100 message: " + str (event .message .body ))
59
79
60
80
# accepting
61
81
self .delivery_context .accept (event )
@@ -74,11 +94,9 @@ def on_message(self, event: Event):
74
94
# in case of rejection with annotations added
75
95
# self.delivery_context.discard_with_annotations(event)
76
96
77
- print ("count " + str (self ._count ))
78
-
79
97
self ._count = self ._count + 1
80
98
81
- if self ._count == 100 :
99
+ if self ._count == messages_to_publish :
82
100
print ("closing receiver" )
83
101
# if you want you can add cleanup operations here
84
102
# event.receiver.close()
@@ -115,30 +133,30 @@ def main() -> None:
115
133
exchange_name = "test-exchange"
116
134
queue_name = "example-queue"
117
135
routing_key = "routing-key"
118
- messages_to_publish = 50000
119
136
120
- global connection
121
- global management
122
- global publisher
123
- global consumer
137
+ global connection_configuration
124
138
125
139
print ("connection to amqp server" )
126
- if connection is None :
127
- connection = create_connection ()
140
+ if connection_configuration . connection is None :
141
+ connection_configuration . connection = create_connection ()
128
142
129
- if management is None :
130
- management = connection .management ()
143
+ if connection_configuration .management is None :
144
+ connection_configuration .management = (
145
+ connection_configuration .connection .management ()
146
+ )
131
147
132
148
print ("declaring exchange and queue" )
133
- management .declare_exchange (ExchangeSpecification (name = exchange_name , arguments = {}))
149
+ connection_configuration .management .declare_exchange (
150
+ ExchangeSpecification (name = exchange_name , arguments = {})
151
+ )
134
152
135
- management .declare_queue (
153
+ connection_configuration . management .declare_queue (
136
154
QuorumQueueSpecification (name = queue_name )
137
155
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
138
156
)
139
157
140
158
print ("binding queue to exchange" )
141
- bind_name = management .bind (
159
+ bind_name = connection_configuration . management .bind (
142
160
BindingSpecification (
143
161
source_exchange = exchange_name ,
144
162
destination_queue = queue_name ,
@@ -151,30 +169,34 @@ def main() -> None:
151
169
addr_queue = AddressHelper .queue_address (queue_name )
152
170
153
171
print ("create a publisher and publish a test message" )
154
- if publisher is None :
155
- publisher = connection .publisher (addr )
172
+ if connection_configuration .publisher is None :
173
+ connection_configuration .publisher = (
174
+ connection_configuration .connection .publisher (addr )
175
+ )
156
176
157
177
print ("purging the queue" )
158
- messages_purged = management .purge_queue (queue_name )
178
+ messages_purged = connection_configuration . management .purge_queue (queue_name )
159
179
160
180
print ("messages purged: " + str (messages_purged ))
161
181
# management.close()
162
182
163
- # publish 10 messages
183
+ # publishing messages
164
184
while True :
165
185
for i in range (messages_to_publish ):
166
186
167
187
if i % 1000 == 0 :
168
- print ("publishing" )
188
+ print ("publishing 1000 messages... " )
169
189
try :
170
- publisher .publish (Message (body = "test" ))
190
+ if connection_configuration .publisher is not None :
191
+ connection_configuration .publisher .publish (Message (body = "test" ))
171
192
except ConnectionClosed :
172
193
print ("publisher closing exception, resubmitting" )
173
194
continue
174
195
175
- print ("closing" )
196
+ print ("closing publisher " )
176
197
try :
177
- publisher .close ()
198
+ if connection_configuration .publisher is not None :
199
+ connection_configuration .publisher .close ()
178
200
except ConnectionClosed :
179
201
print ("publisher closing exception, resubmitting" )
180
202
continue
@@ -183,12 +205,16 @@ def main() -> None:
183
205
print (
184
206
"create a consumer and consume the test message - press control + c to terminate to consume"
185
207
)
186
- if consumer is None :
187
- consumer = connection .consumer (addr_queue , handler = MyMessageHandler ())
208
+ if connection_configuration .consumer is None :
209
+ connection_configuration .consumer = (
210
+ connection_configuration .connection .consumer (
211
+ addr_queue , handler = MyMessageHandler ()
212
+ )
213
+ )
188
214
189
215
while True :
190
216
try :
191
- consumer .run ()
217
+ connection_configuration . consumer .run ()
192
218
except KeyboardInterrupt :
193
219
pass
194
220
except ConnectionClosed :
@@ -200,24 +226,24 @@ def main() -> None:
200
226
break
201
227
202
228
print ("cleanup" )
203
- consumer .close ()
229
+ connection_configuration . consumer .close ()
204
230
# once we finish consuming if we close the connection we need to create a new one
205
231
# connection = create_connection()
206
232
# management = connection.management()
207
233
208
234
print ("unbind" )
209
- management .unbind (bind_name )
235
+ connection_configuration . management .unbind (bind_name )
210
236
211
237
print ("delete queue" )
212
- management .delete_queue (queue_name )
238
+ connection_configuration . management .delete_queue (queue_name )
213
239
214
240
print ("delete exchange" )
215
- management .delete_exchange (exchange_name )
241
+ connection_configuration . management .delete_exchange (exchange_name )
216
242
217
243
print ("closing connections" )
218
- management .close ()
244
+ connection_configuration . management .close ()
219
245
print ("after management closing" )
220
- connection .close ()
246
+ connection_configuration . connection .close ()
221
247
print ("after connection closing" )
222
248
223
249
0 commit comments