Skip to content

Commit 11bed13

Browse files
author
DanielePalaia
committed
improving disconnection management
1 parent d4ed42d commit 11bed13

File tree

9 files changed

+119
-160
lines changed

9 files changed

+119
-160
lines changed

examples/getting_started/getting_started.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def __init__(self):
2323
super().__init__()
2424
self._count = 0
2525

26-
def on_message(self, event: Event):
26+
def on_amqp_message(self, event: Event):
2727
print("received message: " + str(event.message.body))
2828

2929
# accepting
@@ -147,7 +147,7 @@ def main() -> None:
147147
consumer.close()
148148
# once we finish consuming if we close the connection we need to create a new one
149149
# connection = create_connection()
150-
# management = connection.management()
150+
management = connection.management()
151151

152152
print("unbind")
153153
management.unbind(bind_name)

examples/reconnection/reconnection_example.py

Lines changed: 31 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,22 @@
11
# type: ignore
2-
3-
4-
import time
5-
from dataclasses import dataclass
6-
from typing import Optional
7-
82
from rabbitmq_amqp_python_client import (
93
AddressHelper,
104
AMQPMessagingHandler,
115
Connection,
126
ConnectionClosed,
13-
Consumer,
147
Environment,
158
Event,
169
ExchangeSpecification,
1710
ExchangeToQueueBindingSpecification,
18-
Management,
1911
Message,
20-
Publisher,
2112
QuorumQueueSpecification,
2213
)
2314

24-
2515
# here we keep track of the objects we need to reconnect
26-
@dataclass
27-
class ConnectionConfiguration:
28-
connection: Optional[Connection] = None
29-
management: Optional[Management] = None
30-
publisher: Optional[Publisher] = None
31-
consumer: Optional[Consumer] = None
32-
16+
MESSAGES_TO_PUBLISH = 50000
3317

34-
connection_configuration = ConnectionConfiguration()
35-
MESSAGES_TO_PUBLSH = 50000
3618

37-
38-
# disconnection callback
39-
# here you can cleanup or reconnect
40-
def on_disconnection():
41-
42-
print("disconnected")
43-
global environment
44-
exchange_name = "test-exchange"
45-
queue_name = "example-queue"
46-
routing_key = "routing-key"
47-
48-
global connection_configuration
49-
50-
addr = AddressHelper.exchange_address(exchange_name, routing_key)
51-
addr_queue = AddressHelper.queue_address(queue_name)
52-
53-
if connection_configuration.connection is not None:
54-
connection_configuration.connection = create_connection()
55-
if connection_configuration.management is not None:
56-
connection_configuration.management = (
57-
connection_configuration.connection.management()
58-
)
59-
if connection_configuration.publisher is not None:
60-
connection_configuration.publisher = (
61-
connection_configuration.connection.publisher(addr)
62-
)
63-
if connection_configuration.consumer is not None:
64-
connection_configuration.consumer = (
65-
connection_configuration.connection.consumer(
66-
addr_queue, message_handler=MyMessageHandler()
67-
)
68-
)
69-
70-
71-
environment = Environment(
72-
uri="amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnection
73-
)
19+
environment = Environment(uri="amqp://guest:guest@localhost:5672/", reconnect=True)
7420

7521

7622
class MyMessageHandler(AMQPMessagingHandler):
@@ -102,7 +48,7 @@ def on_message(self, event: Event):
10248

10349
self._count = self._count + 1
10450

105-
if self._count == MESSAGES_TO_PUBLSH:
51+
if self._count == MESSAGES_TO_PUBLISH:
10652
print("closing receiver")
10753
# if you want you can add cleanup operations here
10854

@@ -136,29 +82,22 @@ def main() -> None:
13682
queue_name = "example-queue"
13783
routing_key = "routing-key"
13884

139-
global connection_configuration
140-
14185
print("connection to amqp server")
142-
if connection_configuration.connection is None:
143-
connection_configuration.connection = create_connection()
144-
145-
if connection_configuration.management is None:
146-
connection_configuration.management = (
147-
connection_configuration.connection.management()
148-
)
86+
connection = create_connection()
87+
management = connection.management()
88+
publisher = None
89+
consumer = None
14990

15091
print("declaring exchange and queue")
151-
connection_configuration.management.declare_exchange(
152-
ExchangeSpecification(name=exchange_name)
153-
)
92+
management.declare_exchange(ExchangeSpecification(name=exchange_name))
15493

155-
connection_configuration.management.declare_queue(
94+
management.declare_queue(
15695
QuorumQueueSpecification(name=queue_name)
15796
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
15897
)
15998

16099
print("binding queue to exchange")
161-
bind_name = connection_configuration.management.bind(
100+
bind_name = management.bind(
162101
ExchangeToQueueBindingSpecification(
163102
source_exchange=exchange_name,
164103
destination_queue=queue_name,
@@ -171,34 +110,32 @@ def main() -> None:
171110
addr_queue = AddressHelper.queue_address(queue_name)
172111

173112
print("create a publisher and publish a test message")
174-
if connection_configuration.publisher is None:
175-
connection_configuration.publisher = (
176-
connection_configuration.connection.publisher(addr)
177-
)
113+
if publisher is None:
114+
publisher = connection.publisher(addr)
178115

179116
print("purging the queue")
180-
messages_purged = connection_configuration.management.purge_queue(queue_name)
117+
messages_purged = management.purge_queue(queue_name)
181118

182119
print("messages purged: " + str(messages_purged))
183-
# management.close()
184120

185121
# publishing messages
186122
while True:
187-
for i in range(MESSAGES_TO_PUBLSH):
123+
for i in range(MESSAGES_TO_PUBLISH):
188124

189125
if i % 1000 == 0:
190126
print("published 1000 messages...")
191127
try:
192-
if connection_configuration.publisher is not None:
193-
connection_configuration.publisher.publish(Message(body="test"))
128+
if publisher is not None:
129+
publisher.publish(Message(body="test"))
194130
except ConnectionClosed:
195131
print("publisher closing exception, resubmitting")
132+
publisher = connection.publisher(addr)
196133
continue
197134

198135
print("closing publisher")
199136
try:
200-
if connection_configuration.publisher is not None:
201-
connection_configuration.publisher.close()
137+
if publisher is not None:
138+
publisher.close()
202139
except ConnectionClosed:
203140
print("publisher closing exception, resubmitting")
204141
continue
@@ -207,43 +144,39 @@ def main() -> None:
207144
print(
208145
"create a consumer and consume the test message - press control + c to terminate to consume"
209146
)
210-
if connection_configuration.consumer is None:
211-
connection_configuration.consumer = (
212-
connection_configuration.connection.consumer(
213-
addr_queue, message_handler=MyMessageHandler()
214-
)
215-
)
147+
if consumer is None:
148+
consumer = connection.consumer(addr_queue, message_handler=MyMessageHandler())
216149

217150
while True:
218151
try:
219-
connection_configuration.consumer.run()
152+
consumer.run()
220153
except KeyboardInterrupt:
221154
pass
222155
except ConnectionClosed:
223-
time.sleep(1)
156+
consumer = connection.consumer(
157+
addr_queue, message_handler=MyMessageHandler()
158+
)
224159
continue
225160
except Exception as e:
226161
print("consumer exited for exception " + str(e))
227162

228163
break
229164

230165
print("cleanup")
231-
connection_configuration.consumer.close()
232-
# once we finish consuming if we close the connection we need to create a new one
233-
# connection = create_connection()
234-
# management = connection.management()
166+
consumer.close()
235167

168+
management = connection.management()
236169
print("unbind")
237-
connection_configuration.management.unbind(bind_name)
170+
management.unbind(bind_name)
238171

239172
print("delete queue")
240-
connection_configuration.management.delete_queue(queue_name)
173+
management.delete_queue(queue_name)
241174

242175
print("delete exchange")
243-
connection_configuration.management.delete_exchange(exchange_name)
176+
management.delete_exchange(exchange_name)
244177

245178
print("closing connections")
246-
connection_configuration.management.close()
179+
management.close()
247180
print("after management closing")
248181
environment.close()
249182
print("after connection closing")

rabbitmq_amqp_python_client/amqp_consumer_handler.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from .delivery_context import DeliveryContext
2+
from .qpid.proton._events import Event
23
from .qpid.proton.handlers import MessagingHandler
34

45
"""
@@ -20,3 +21,10 @@ def __init__(self, auto_accept: bool = False, auto_settle: bool = True):
2021
"""
2122
super().__init__(auto_accept=auto_accept, auto_settle=auto_settle)
2223
self.delivery_context: DeliveryContext = DeliveryContext()
24+
25+
def on_amqp_message(self, event: Event) -> None:
26+
pass
27+
28+
def on_message(self, event: Event) -> None:
29+
print("first level callback")
30+
self.on_amqp_message(event)

rabbitmq_amqp_python_client/connection.py

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def __init__(
5353
ssl_context: Union[
5454
PosixSslConfigurationContext, WinSslConfigurationContext, None
5555
] = None,
56-
on_disconnection_handler: Optional[CB] = None, # type: ignore
56+
reconnect: bool = False,
5757
):
5858
"""
5959
Initialize a new Connection instance.
@@ -77,13 +77,15 @@ def __init__(
7777
self._addrs: Optional[list[str]] = uris
7878
self._conn: BlockingConnection
7979
self._management: Management
80-
self._on_disconnection_handler = on_disconnection_handler
8180
self._conf_ssl_context: Union[
8281
PosixSslConfigurationContext, WinSslConfigurationContext, None
8382
] = ssl_context
83+
self._reconnect = reconnect
8484
self._ssl_domain = None
8585
self._connections = [] # type: ignore
8686
self._index: int = -1
87+
self._publishers: list[Publisher] = []
88+
self._consumers: list[Consumer] = []
8789

8890
def _set_environment_connection_list(self, connections: []): # type: ignore
8991
self._connections = connections
@@ -141,12 +143,21 @@ def dial(self) -> None:
141143
client_key,
142144
password,
143145
)
144-
self._conn = BlockingConnection(
145-
url=self._addr,
146-
urls=self._addrs,
147-
ssl_domain=self._ssl_domain,
148-
on_disconnection_handler=self._on_disconnection_handler,
149-
)
146+
147+
if self._reconnect is False:
148+
self._conn = BlockingConnection(
149+
url=self._addr,
150+
urls=self._addrs,
151+
ssl_domain=self._ssl_domain,
152+
)
153+
else:
154+
self._conn = BlockingConnection(
155+
url=self._addr,
156+
urls=self._addrs,
157+
ssl_domain=self._ssl_domain,
158+
on_disconnection_handler=self._on_disconnection,
159+
)
160+
150161
self._open()
151162
logger.debug("Connection to the server established")
152163

@@ -185,6 +196,10 @@ def close(self) -> None:
185196
"""
186197
logger.debug("Closing connection")
187198
try:
199+
for publisher in self._publishers:
200+
publisher.close()
201+
for consumer in self._consumers:
202+
consumer.close()
188203
self._conn.close()
189204
except Exception as e:
190205
logger.error(f"Error closing connection: {e}")
@@ -213,7 +228,8 @@ def publisher(self, destination: str = "") -> Publisher:
213228
"destination address must start with /queues or /exchanges"
214229
)
215230
publisher = Publisher(self._conn, destination)
216-
return publisher
231+
self._publishers.append(publisher)
232+
return self._publishers[self._publishers.index(publisher)]
217233

218234
def consumer(
219235
self,
@@ -244,4 +260,31 @@ def consumer(
244260
consumer = Consumer(
245261
self._conn, destination, message_handler, stream_filter_options, credit
246262
)
263+
self._consumers.append(consumer)
247264
return consumer
265+
266+
def _on_disconnection(self) -> None:
267+
268+
print("disconnected")
269+
270+
if self in self._connections:
271+
self._connections.remove(self)
272+
273+
print("reconnecting")
274+
self._conn = BlockingConnection(
275+
url=self._addr,
276+
urls=self._addrs,
277+
ssl_domain=self._ssl_domain,
278+
on_disconnection_handler=self._on_disconnection,
279+
)
280+
self._open()
281+
self._connections.append(self)
282+
283+
for index, publisher in enumerate(self._publishers):
284+
# publisher = self._publishers.pop(index)
285+
# address = publisher.address
286+
self._publishers.remove(publisher)
287+
# self._publishers.insert(index, Publisher(self._conn, address))
288+
289+
for i, consumer in enumerate(self._consumers):
290+
self._consumers.remove(consumer)

0 commit comments

Comments
 (0)