Skip to content

Commit 524b78a

Browse files
author
DanielePalaia
committed
managing disconnections
1 parent d3d93ec commit 524b78a

File tree

4 files changed

+235
-3
lines changed

4 files changed

+235
-3
lines changed
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
# type: ignore
2+
3+
4+
import time
5+
6+
from rabbitmq_amqp_python_client import (
7+
AddressHelper,
8+
AMQPMessagingHandler,
9+
BindingSpecification,
10+
Connection,
11+
ConnectionClosed,
12+
Event,
13+
ExchangeSpecification,
14+
Message,
15+
QuorumQueueSpecification,
16+
)
17+
18+
connection = None
19+
management = None
20+
publisher = None
21+
consumer = None
22+
23+
24+
def on_disconnected():
25+
26+
print("disconnected")
27+
exchange_name = "test-exchange"
28+
queue_name = "example-queue"
29+
routing_key = "routing-key"
30+
31+
global connection
32+
global management
33+
global publisher
34+
global consumer
35+
36+
addr = AddressHelper.exchange_address(exchange_name, routing_key)
37+
addr_queue = AddressHelper.queue_address(queue_name)
38+
39+
connection = create_connection()
40+
if management is not None:
41+
management = connection.management()
42+
if publisher is not None:
43+
publisher = connection.publisher(addr)
44+
if consumer is not None:
45+
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())
46+
47+
48+
class MyMessageHandler(AMQPMessagingHandler):
49+
50+
def __init__(self):
51+
super().__init__()
52+
self._count = 0
53+
54+
def on_message(self, event: Event):
55+
print("received message: " + str(event.message.annotations))
56+
57+
# accepting
58+
self.delivery_context.accept(event)
59+
60+
# in case of rejection (+eventually deadlettering)
61+
# self.delivery_context.discard(event)
62+
63+
# in case of requeuing
64+
# self.delivery_context.requeue(event)
65+
66+
# annotations = {}
67+
# annotations[symbol('x-opt-string')] = 'x-test1'
68+
# in case of requeuing with annotations added
69+
# self.delivery_context.requeue_with_annotations(event, annotations)
70+
71+
# in case of rejection with annotations added
72+
# self.delivery_context.discard_with_annotations(event)
73+
74+
print("count " + str(self._count))
75+
76+
self._count = self._count + 1
77+
78+
if self._count == 100:
79+
print("closing receiver")
80+
# if you want you can add cleanup operations here
81+
# event.receiver.close()
82+
# event.connection.close()
83+
84+
def on_connection_closed(self, event: Event):
85+
# if you want you can add cleanup operations here
86+
print("connection closed")
87+
88+
def on_link_closed(self, event: Event) -> None:
89+
# if you want you can add cleanup operations here
90+
print("link closed")
91+
92+
93+
def create_connection() -> Connection:
94+
connection = Connection(
95+
"amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnected
96+
)
97+
connection.dial()
98+
99+
return connection
100+
101+
102+
def main() -> None:
103+
104+
exchange_name = "test-exchange"
105+
queue_name = "example-queue"
106+
routing_key = "routing-key"
107+
messages_to_publish = 50000
108+
109+
global connection
110+
global management
111+
global publisher
112+
global consumer
113+
114+
print("connection to amqp server")
115+
if connection is None:
116+
connection = create_connection()
117+
118+
if management is None:
119+
management = connection.management()
120+
121+
print("declaring exchange and queue")
122+
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
123+
124+
management.declare_queue(
125+
QuorumQueueSpecification(name=queue_name)
126+
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
127+
)
128+
129+
print("binding queue to exchange")
130+
bind_name = management.bind(
131+
BindingSpecification(
132+
source_exchange=exchange_name,
133+
destination_queue=queue_name,
134+
binding_key=routing_key,
135+
)
136+
)
137+
138+
addr = AddressHelper.exchange_address(exchange_name, routing_key)
139+
140+
addr_queue = AddressHelper.queue_address(queue_name)
141+
142+
print("create a publisher and publish a test message")
143+
if publisher is None:
144+
publisher = connection.publisher(addr)
145+
146+
print("purging the queue")
147+
messages_purged = management.purge_queue(queue_name)
148+
149+
print("messages purged: " + str(messages_purged))
150+
# management.close()
151+
152+
# publish 10 messages
153+
while True:
154+
for i in range(messages_to_publish):
155+
156+
if i % 1000 == 0:
157+
print("publishing")
158+
try:
159+
publisher.publish(Message(body="test"))
160+
except ConnectionClosed:
161+
print("publisher closing exception, resubmitting")
162+
continue
163+
164+
print("closing")
165+
try:
166+
publisher.close()
167+
except ConnectionClosed:
168+
print("publisher closing exception, resubmitting")
169+
continue
170+
break
171+
172+
print(
173+
"create a consumer and consume the test message - press control + c to terminate to consume"
174+
)
175+
if consumer is None:
176+
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())
177+
178+
while True:
179+
try:
180+
consumer.run()
181+
except KeyboardInterrupt:
182+
pass
183+
except ConnectionClosed:
184+
time.sleep(1)
185+
continue
186+
except Exception as e:
187+
print("consumer exited for exception " + str(e))
188+
189+
break
190+
191+
print("cleanup")
192+
consumer.close()
193+
# once we finish consuming if we close the connection we need to create a new one
194+
# connection = create_connection()
195+
# management = connection.management()
196+
197+
print("unbind")
198+
management.unbind(bind_name)
199+
200+
print("delete queue")
201+
management.delete_queue(queue_name)
202+
203+
print("delete exchange")
204+
management.delete_exchange(exchange_name)
205+
206+
print("closing connections")
207+
management.close()
208+
print("after management closing")
209+
connection.close()
210+
print("after connection closing")
211+
212+
213+
if __name__ == "__main__":
214+
main()

rabbitmq_amqp_python_client/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from .qpid.proton._delivery import Delivery
1717
from .qpid.proton._events import Event
1818
from .qpid.proton._message import Message
19+
from .qpid.proton._utils import ConnectionClosed
1920
from .qpid.proton.handlers import MessagingHandler
2021
from .queues import (
2122
ClassicQueueSpecification,
@@ -53,4 +54,5 @@
5354
"AMQPMessagingHandler",
5455
"ArgumentOutOfRangeException",
5556
"Delivery",
57+
"ConnectionClosed",
5658
]

rabbitmq_amqp_python_client/connection.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import logging
2-
from typing import Optional
2+
from typing import Annotated, Callable, Optional, TypeVar
33

44
from .address_helper import validate_address
55
from .consumer import Consumer
@@ -11,16 +11,22 @@
1111

1212
logger = logging.getLogger(__name__)
1313

14+
MT = TypeVar("MT")
15+
CB = Annotated[Callable[[MT], None], "Message callback type"]
16+
1417

1518
class Connection:
16-
def __init__(self, addr: str):
19+
def __init__(self, addr: str, on_disconnection_handler: Optional[CB] = None): # type: ignore
1720
self._addr: str = addr
1821
self._conn: BlockingConnection
1922
self._management: Management
23+
self._on_disconnection_handler = on_disconnection_handler
2024

2125
def dial(self) -> None:
2226
logger.debug("Establishing a connection to the amqp server")
23-
self._conn = BlockingConnection(self._addr)
27+
self._conn = BlockingConnection(
28+
self._addr, on_disconnection_handler=self._on_disconnection_handler
29+
)
2430
self._open()
2531
logger.debug("Connection to the server established")
2632

rabbitmq_amqp_python_client/qpid/proton/_utils.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ class Literal(metaclass=GenericMeta):
7171
)
7272
from ._transport import SSLDomain
7373

74+
from typing import Annotated, TypeVar
75+
76+
MT = TypeVar("MT")
77+
CB = Annotated[Callable[[MT], None], "Message callback type"]
78+
7479

7580
class BlockingLink:
7681
def __init__(
@@ -423,6 +428,7 @@ def __init__(
423428
heartbeat: Optional[float] = None,
424429
urls: Optional[List[str]] = None,
425430
reconnect: Union[None, Literal[False], "Backoff"] = None,
431+
on_disconnection_handler: Optional[CB] = None,
426432
**kwargs
427433
) -> None:
428434
self.disconnected = False
@@ -432,6 +438,7 @@ def __init__(
432438
self.container.start()
433439
self.conn = None
434440
self.closing = False
441+
self._on_disconnection_handler = on_disconnection_handler
435442
# Preserve previous behaviour if neither reconnect nor urls are supplied
436443
if url is not None and urls is None and reconnect is None:
437444
reconnect = False
@@ -631,7 +638,10 @@ def on_connection_remote_close(self, event: "Event") -> None:
631638
"""
632639
Event callback for when the link peer closes the connection.
633640
"""
641+
print("connection remote closed")
634642
if event.connection.state & Endpoint.LOCAL_ACTIVE:
643+
if self._on_disconnection_handler is not None:
644+
event.container.schedule(0, self._on_disconnection_handler())
635645
event.connection.close()
636646
if not self.closing:
637647
raise ConnectionClosed(event.connection)

0 commit comments

Comments
 (0)