From 11bed139fb433c1298074e08097f2279157813df Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Wed, 26 Feb 2025 15:25:37 +0100 Subject: [PATCH 1/9] improving disconnection management --- examples/getting_started/getting_started.py | 4 +- examples/reconnection/reconnection_example.py | 129 +++++------------- .../amqp_consumer_handler.py | 8 ++ rabbitmq_amqp_python_client/connection.py | 61 +++++++-- rabbitmq_amqp_python_client/consumer.py | 11 ++ rabbitmq_amqp_python_client/environment.py | 6 +- rabbitmq_amqp_python_client/publisher.py | 6 + tests/test_connection.py | 21 +-- tests/test_publisher.py | 33 +---- 9 files changed, 119 insertions(+), 160 deletions(-) diff --git a/examples/getting_started/getting_started.py b/examples/getting_started/getting_started.py index 589f2e5..bbd6f60 100644 --- a/examples/getting_started/getting_started.py +++ b/examples/getting_started/getting_started.py @@ -23,7 +23,7 @@ def __init__(self): super().__init__() self._count = 0 - def on_message(self, event: Event): + def on_amqp_message(self, event: Event): print("received message: " + str(event.message.body)) # accepting @@ -147,7 +147,7 @@ def main() -> None: consumer.close() # once we finish consuming if we close the connection we need to create a new one # connection = create_connection() - # management = connection.management() + management = connection.management() print("unbind") management.unbind(bind_name) diff --git a/examples/reconnection/reconnection_example.py b/examples/reconnection/reconnection_example.py index 6504f11..2a6fb6b 100644 --- a/examples/reconnection/reconnection_example.py +++ b/examples/reconnection/reconnection_example.py @@ -1,76 +1,22 @@ # type: ignore - - -import time -from dataclasses import dataclass -from typing import Optional - from rabbitmq_amqp_python_client import ( AddressHelper, AMQPMessagingHandler, Connection, ConnectionClosed, - Consumer, Environment, Event, ExchangeSpecification, ExchangeToQueueBindingSpecification, - Management, Message, - Publisher, QuorumQueueSpecification, ) - # here we keep track of the objects we need to reconnect -@dataclass -class ConnectionConfiguration: - connection: Optional[Connection] = None - management: Optional[Management] = None - publisher: Optional[Publisher] = None - consumer: Optional[Consumer] = None - +MESSAGES_TO_PUBLISH = 50000 -connection_configuration = ConnectionConfiguration() -MESSAGES_TO_PUBLSH = 50000 - -# disconnection callback -# here you can cleanup or reconnect -def on_disconnection(): - - print("disconnected") - global environment - exchange_name = "test-exchange" - queue_name = "example-queue" - routing_key = "routing-key" - - global connection_configuration - - addr = AddressHelper.exchange_address(exchange_name, routing_key) - addr_queue = AddressHelper.queue_address(queue_name) - - if connection_configuration.connection is not None: - connection_configuration.connection = create_connection() - if connection_configuration.management is not None: - connection_configuration.management = ( - connection_configuration.connection.management() - ) - if connection_configuration.publisher is not None: - connection_configuration.publisher = ( - connection_configuration.connection.publisher(addr) - ) - if connection_configuration.consumer is not None: - connection_configuration.consumer = ( - connection_configuration.connection.consumer( - addr_queue, message_handler=MyMessageHandler() - ) - ) - - -environment = Environment( - uri="amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnection -) +environment = Environment(uri="amqp://guest:guest@localhost:5672/", reconnect=True) class MyMessageHandler(AMQPMessagingHandler): @@ -102,7 +48,7 @@ def on_message(self, event: Event): self._count = self._count + 1 - if self._count == MESSAGES_TO_PUBLSH: + if self._count == MESSAGES_TO_PUBLISH: print("closing receiver") # if you want you can add cleanup operations here @@ -136,29 +82,22 @@ def main() -> None: queue_name = "example-queue" routing_key = "routing-key" - global connection_configuration - print("connection to amqp server") - if connection_configuration.connection is None: - connection_configuration.connection = create_connection() - - if connection_configuration.management is None: - connection_configuration.management = ( - connection_configuration.connection.management() - ) + connection = create_connection() + management = connection.management() + publisher = None + consumer = None print("declaring exchange and queue") - connection_configuration.management.declare_exchange( - ExchangeSpecification(name=exchange_name) - ) + management.declare_exchange(ExchangeSpecification(name=exchange_name)) - connection_configuration.management.declare_queue( + management.declare_queue( QuorumQueueSpecification(name=queue_name) # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter") ) print("binding queue to exchange") - bind_name = connection_configuration.management.bind( + bind_name = management.bind( ExchangeToQueueBindingSpecification( source_exchange=exchange_name, destination_queue=queue_name, @@ -171,34 +110,32 @@ def main() -> None: addr_queue = AddressHelper.queue_address(queue_name) print("create a publisher and publish a test message") - if connection_configuration.publisher is None: - connection_configuration.publisher = ( - connection_configuration.connection.publisher(addr) - ) + if publisher is None: + publisher = connection.publisher(addr) print("purging the queue") - messages_purged = connection_configuration.management.purge_queue(queue_name) + messages_purged = management.purge_queue(queue_name) print("messages purged: " + str(messages_purged)) - # management.close() # publishing messages while True: - for i in range(MESSAGES_TO_PUBLSH): + for i in range(MESSAGES_TO_PUBLISH): if i % 1000 == 0: print("published 1000 messages...") try: - if connection_configuration.publisher is not None: - connection_configuration.publisher.publish(Message(body="test")) + if publisher is not None: + publisher.publish(Message(body="test")) except ConnectionClosed: print("publisher closing exception, resubmitting") + publisher = connection.publisher(addr) continue print("closing publisher") try: - if connection_configuration.publisher is not None: - connection_configuration.publisher.close() + if publisher is not None: + publisher.close() except ConnectionClosed: print("publisher closing exception, resubmitting") continue @@ -207,20 +144,18 @@ def main() -> None: print( "create a consumer and consume the test message - press control + c to terminate to consume" ) - if connection_configuration.consumer is None: - connection_configuration.consumer = ( - connection_configuration.connection.consumer( - addr_queue, message_handler=MyMessageHandler() - ) - ) + if consumer is None: + consumer = connection.consumer(addr_queue, message_handler=MyMessageHandler()) while True: try: - connection_configuration.consumer.run() + consumer.run() except KeyboardInterrupt: pass except ConnectionClosed: - time.sleep(1) + consumer = connection.consumer( + addr_queue, message_handler=MyMessageHandler() + ) continue except Exception as e: print("consumer exited for exception " + str(e)) @@ -228,22 +163,20 @@ def main() -> None: break print("cleanup") - connection_configuration.consumer.close() - # once we finish consuming if we close the connection we need to create a new one - # connection = create_connection() - # management = connection.management() + consumer.close() + management = connection.management() print("unbind") - connection_configuration.management.unbind(bind_name) + management.unbind(bind_name) print("delete queue") - connection_configuration.management.delete_queue(queue_name) + management.delete_queue(queue_name) print("delete exchange") - connection_configuration.management.delete_exchange(exchange_name) + management.delete_exchange(exchange_name) print("closing connections") - connection_configuration.management.close() + management.close() print("after management closing") environment.close() print("after connection closing") diff --git a/rabbitmq_amqp_python_client/amqp_consumer_handler.py b/rabbitmq_amqp_python_client/amqp_consumer_handler.py index a1c4612..d748090 100644 --- a/rabbitmq_amqp_python_client/amqp_consumer_handler.py +++ b/rabbitmq_amqp_python_client/amqp_consumer_handler.py @@ -1,4 +1,5 @@ from .delivery_context import DeliveryContext +from .qpid.proton._events import Event from .qpid.proton.handlers import MessagingHandler """ @@ -20,3 +21,10 @@ def __init__(self, auto_accept: bool = False, auto_settle: bool = True): """ super().__init__(auto_accept=auto_accept, auto_settle=auto_settle) self.delivery_context: DeliveryContext = DeliveryContext() + + def on_amqp_message(self, event: Event) -> None: + pass + + def on_message(self, event: Event) -> None: + print("first level callback") + self.on_amqp_message(event) diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index 49c0064..a4a65b6 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -53,7 +53,7 @@ def __init__( ssl_context: Union[ PosixSslConfigurationContext, WinSslConfigurationContext, None ] = None, - on_disconnection_handler: Optional[CB] = None, # type: ignore + reconnect: bool = False, ): """ Initialize a new Connection instance. @@ -77,13 +77,15 @@ def __init__( self._addrs: Optional[list[str]] = uris self._conn: BlockingConnection self._management: Management - self._on_disconnection_handler = on_disconnection_handler self._conf_ssl_context: Union[ PosixSslConfigurationContext, WinSslConfigurationContext, None ] = ssl_context + self._reconnect = reconnect self._ssl_domain = None self._connections = [] # type: ignore self._index: int = -1 + self._publishers: list[Publisher] = [] + self._consumers: list[Consumer] = [] def _set_environment_connection_list(self, connections: []): # type: ignore self._connections = connections @@ -141,12 +143,21 @@ def dial(self) -> None: client_key, password, ) - self._conn = BlockingConnection( - url=self._addr, - urls=self._addrs, - ssl_domain=self._ssl_domain, - on_disconnection_handler=self._on_disconnection_handler, - ) + + if self._reconnect is False: + self._conn = BlockingConnection( + url=self._addr, + urls=self._addrs, + ssl_domain=self._ssl_domain, + ) + else: + self._conn = BlockingConnection( + url=self._addr, + urls=self._addrs, + ssl_domain=self._ssl_domain, + on_disconnection_handler=self._on_disconnection, + ) + self._open() logger.debug("Connection to the server established") @@ -185,6 +196,10 @@ def close(self) -> None: """ logger.debug("Closing connection") try: + for publisher in self._publishers: + publisher.close() + for consumer in self._consumers: + consumer.close() self._conn.close() except Exception as e: logger.error(f"Error closing connection: {e}") @@ -213,7 +228,8 @@ def publisher(self, destination: str = "") -> Publisher: "destination address must start with /queues or /exchanges" ) publisher = Publisher(self._conn, destination) - return publisher + self._publishers.append(publisher) + return self._publishers[self._publishers.index(publisher)] def consumer( self, @@ -244,4 +260,31 @@ def consumer( consumer = Consumer( self._conn, destination, message_handler, stream_filter_options, credit ) + self._consumers.append(consumer) return consumer + + def _on_disconnection(self) -> None: + + print("disconnected") + + if self in self._connections: + self._connections.remove(self) + + print("reconnecting") + self._conn = BlockingConnection( + url=self._addr, + urls=self._addrs, + ssl_domain=self._ssl_domain, + on_disconnection_handler=self._on_disconnection, + ) + self._open() + self._connections.append(self) + + for index, publisher in enumerate(self._publishers): + # publisher = self._publishers.pop(index) + # address = publisher.address + self._publishers.remove(publisher) + # self._publishers.insert(index, Publisher(self._conn, address)) + + for i, consumer in enumerate(self._consumers): + self._consumers.remove(consumer) diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 3da4dec..8f36546 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -57,6 +57,7 @@ def __init__( self._handler = handler self._stream_options = stream_options self._credit = credit + self._consumers: list[Consumer] = [] self._open() def _open(self) -> None: @@ -64,6 +65,9 @@ def _open(self) -> None: logger.debug("Creating Sender") self._receiver = self._create_receiver(self._addr) + def _set_consumers_list(self, consumers: []) -> None: # type: ignore + self._consumers = consumers + def consume(self, timeout: Union[None, Literal[False], float] = False) -> Message: """ Consume a message from the queue. @@ -93,6 +97,8 @@ def close(self) -> None: logger.debug("Closing the receiver") if self._receiver is not None: self._receiver.close() + if self in self._consumers: + self._consumers.remove(self) def run(self) -> None: """ @@ -134,3 +140,8 @@ def _create_receiver(self, addr: str) -> BlockingReceiver: receiver.credit = self._credit return receiver + + @property + def address(self) -> str: + """Get the current publisher address.""" + return self._addr diff --git a/rabbitmq_amqp_python_client/environment.py b/rabbitmq_amqp_python_client/environment.py index ad7be8b..a8d51da 100644 --- a/rabbitmq_amqp_python_client/environment.py +++ b/rabbitmq_amqp_python_client/environment.py @@ -40,7 +40,7 @@ def __init__( ssl_context: Union[ PosixSslConfigurationContext, WinSslConfigurationContext, None ] = None, - on_disconnection_handler: Optional[CB] = None, # type: ignore + reconnect: bool = False, ): """ Initialize a new Environment instance. @@ -63,7 +63,7 @@ def __init__( self._uri = uri self._uris = uris self._ssl_context = ssl_context - self._on_disconnection_handler = on_disconnection_handler + self._reconnect = reconnect self._connections: list[Connection] = [] def connection( @@ -85,7 +85,7 @@ def connection( uri=self._uri, uris=self._uris, ssl_context=self._ssl_context, - on_disconnection_handler=self._on_disconnection_handler, + reconnect=self._reconnect, ) logger.debug("Environment: Creating and returning a new connection") self._connections.append(connection) diff --git a/rabbitmq_amqp_python_client/publisher.py b/rabbitmq_amqp_python_client/publisher.py index 5c057df..8ada467 100644 --- a/rabbitmq_amqp_python_client/publisher.py +++ b/rabbitmq_amqp_python_client/publisher.py @@ -43,6 +43,7 @@ def __init__(self, conn: BlockingConnection, addr: str = ""): self._sender: Optional[BlockingSender] = None self._conn = conn self._addr = addr + self._publishers: list[Publisher] = [] self._open() def _open(self) -> None: @@ -50,6 +51,9 @@ def _open(self) -> None: logger.debug("Creating Sender") self._sender = self._create_sender(self._addr) + def _set_publishers_list(self, publishers: []) -> None: # type: ignore + self._publishers = publishers + def publish(self, message: Message) -> Delivery: """ Publish a message to RabbitMQ. @@ -94,6 +98,8 @@ def close(self) -> None: logger.debug("Closing Sender") if self.is_open: self._sender.close() # type: ignore + if self in self._publishers: + self._publishers.remove(self) def _create_sender(self, addr: str) -> BlockingSender: return self._conn.create_sender(addr, options=SenderOptionUnseattle(addr)) diff --git a/tests/test_connection.py b/tests/test_connection.py index e006fd7..bff98c6 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -71,25 +71,9 @@ def test_environment_connections_management() -> None: def test_connection_reconnection() -> None: - reconnected = False - connection = None disconnected = False - def on_disconnected(): - - nonlocal connection - - # reconnect - if connection is not None: - connection = environment.connection() - connection.dial() - - nonlocal reconnected - reconnected = True - - environment = Environment( - "amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnected - ) + environment = Environment("amqp://guest:guest@localhost:5672/", reconnect=True) connection = environment.connection() connection.dial() @@ -114,8 +98,7 @@ def on_disconnected(): management = connection.management() management.declare_queue(queue_specification) management.delete_queue(stream_name) - environment.close() management.close() + environment.close() assert disconnected is True - assert reconnected is True diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 20abe10..fd0e1e3 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -260,36 +260,9 @@ def test_publish_purge(connection: Connection) -> None: def test_disconnection_reconnection() -> None: disconnected = False - reconnected = False generic_exception_raised = False - publisher = None - queue_name = "test-queue" - connection_test = None - environment = None - - def on_disconnected(): - - nonlocal publisher - nonlocal queue_name - nonlocal connection_test - nonlocal environment - # reconnect - if connection_test is not None: - connection_test = environment.connection() - connection_test.dial() - - if publisher is not None: - publisher = connection_test.publisher( - destination=AddressHelper.queue_address(queue_name) - ) - - nonlocal reconnected - reconnected = True - - environment = Environment( - "amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnected - ) + environment = Environment("amqp://guest:guest@localhost:5672/", reconnect=True) connection_test = environment.connection() @@ -318,6 +291,9 @@ def on_disconnected(): except ConnectionClosed: disconnected = True + publisher = connection_test.publisher( + destination=AddressHelper.queue_address(queue_name) + ) continue except Exception: @@ -345,7 +321,6 @@ def on_disconnected(): assert generic_exception_raised is False assert disconnected is True - assert reconnected is True assert message_purged == messages_to_publish - 1 From c09162d412e46cd14ee9f5dee141fcd398dce88d Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Tue, 4 Mar 2025 16:13:43 +0100 Subject: [PATCH 2/9] reconnecting managements, publishers and consumers --- examples/reconnection/reconnection_example.py | 6 +- rabbitmq_amqp_python_client/connection.py | 50 ++++++++----- rabbitmq_amqp_python_client/consumer.py | 4 ++ rabbitmq_amqp_python_client/management.py | 7 ++ rabbitmq_amqp_python_client/publisher.py | 4 ++ tests/test_connection.py | 5 +- tests/test_publisher.py | 71 ++++++++++++++----- 7 files changed, 105 insertions(+), 42 deletions(-) diff --git a/examples/reconnection/reconnection_example.py b/examples/reconnection/reconnection_example.py index 2a6fb6b..568df6f 100644 --- a/examples/reconnection/reconnection_example.py +++ b/examples/reconnection/reconnection_example.py @@ -129,7 +129,7 @@ def main() -> None: publisher.publish(Message(body="test")) except ConnectionClosed: print("publisher closing exception, resubmitting") - publisher = connection.publisher(addr) + # publisher = connection.publisher(addr) continue print("closing publisher") @@ -153,9 +153,6 @@ def main() -> None: except KeyboardInterrupt: pass except ConnectionClosed: - consumer = connection.consumer( - addr_queue, message_handler=MyMessageHandler() - ) continue except Exception as e: print("consumer exited for exception " + str(e)) @@ -165,7 +162,6 @@ def main() -> None: print("cleanup") consumer.close() - management = connection.management() print("unbind") management.unbind(bind_name) diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index a4a65b6..7a23583 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -62,7 +62,7 @@ def __init__( uri: Single node connection URI uris: List of URIs for multi-node setup ssl_context: SSL configuration for secure connections - on_disconnection_handler: Callback for handling disconnection events + reconnect: Ability to automatically reconnect in case of disconnections from the server Raises: ValueError: If neither uri nor uris is provided @@ -76,10 +76,10 @@ def __init__( self._addr: Optional[str] = uri self._addrs: Optional[list[str]] = uris self._conn: BlockingConnection - self._management: Management self._conf_ssl_context: Union[ PosixSslConfigurationContext, WinSslConfigurationContext, None ] = ssl_context + self._managements: list[Management] = [] self._reconnect = reconnect self._ssl_domain = None self._connections = [] # type: ignore @@ -158,7 +158,7 @@ def dial(self) -> None: on_disconnection_handler=self._on_disconnection, ) - self._open() + # self._open() logger.debug("Connection to the server established") def _win_store_to_cert( @@ -185,7 +185,12 @@ def management(self) -> Management: Returns: Management: The management interface for performing administrative tasks """ - return self._management + if len(self._managements) == 0: + management = Management(self._conn) + management.open() + self._managements.append(management) + + return self._managements[0] # closes the connection to the AMQP 1.0 server. def close(self) -> None: @@ -196,9 +201,9 @@ def close(self) -> None: """ logger.debug("Closing connection") try: - for publisher in self._publishers: + for publisher in self._publishers[:]: publisher.close() - for consumer in self._consumers: + for consumer in self._consumers[:]: consumer.close() self._conn.close() except Exception as e: @@ -228,8 +233,9 @@ def publisher(self, destination: str = "") -> Publisher: "destination address must start with /queues or /exchanges" ) publisher = Publisher(self._conn, destination) + publisher._set_publishers_list(self._publishers) self._publishers.append(publisher) - return self._publishers[self._publishers.index(publisher)] + return publisher def consumer( self, @@ -265,26 +271,36 @@ def consumer( def _on_disconnection(self) -> None: - print("disconnected") - if self in self._connections: self._connections.remove(self) - print("reconnecting") self._conn = BlockingConnection( url=self._addr, urls=self._addrs, ssl_domain=self._ssl_domain, on_disconnection_handler=self._on_disconnection, ) - self._open() + self._connections.append(self) - for index, publisher in enumerate(self._publishers): - # publisher = self._publishers.pop(index) - # address = publisher.address - self._publishers.remove(publisher) - # self._publishers.insert(index, Publisher(self._conn, address)) + for i, management in enumerate(self._managements): + # Update the broken connection and sender in the management + self._managements[i]._update_connection(self._conn) + + for i, publisher in enumerate(self._publishers): + # Update the broken connection and sender in the publisher + self._publishers[i]._update_connection(self._conn) for i, consumer in enumerate(self._consumers): - self._consumers.remove(consumer) + # Update the broken connection and sender in the consumer + self._consumers[i]._update_connection(self._conn) + + @property + def active_producers(self) -> int: + """Returns the number of active publishers""" + return len(self._publishers) + + @property + def active_consumers(self) -> int: + """Returns the number of active consumers""" + return len(self._consumers) diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 8f36546..c03eb40 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -65,6 +65,10 @@ def _open(self) -> None: logger.debug("Creating Sender") self._receiver = self._create_receiver(self._addr) + def _update_connection(self, conn: BlockingConnection) -> None: + self._conn = conn + self._receiver = self._create_receiver(self._addr) + def _set_consumers_list(self, consumers: []) -> None: # type: ignore self._consumers = consumers diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index 0ea7a83..1c6550b 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -53,6 +53,13 @@ def __init__(self, conn: BlockingConnection): self._receiver: Optional[BlockingReceiver] = None self._conn = conn + def _update_connection(self, conn: BlockingConnection) -> None: + self._conn = conn + self._sender = self._create_sender(CommonValues.management_node_address.value) + self._receiver = self._create_receiver( + CommonValues.management_node_address.value, + ) + def open(self) -> None: """ Open the management connection by creating sender and receiver. diff --git a/rabbitmq_amqp_python_client/publisher.py b/rabbitmq_amqp_python_client/publisher.py index 8ada467..dc63d5d 100644 --- a/rabbitmq_amqp_python_client/publisher.py +++ b/rabbitmq_amqp_python_client/publisher.py @@ -46,6 +46,10 @@ def __init__(self, conn: BlockingConnection, addr: str = ""): self._publishers: list[Publisher] = [] self._open() + def _update_connection(self, conn: BlockingConnection) -> None: + self._conn = conn + self._sender = self._create_sender(self._addr) + def _open(self) -> None: if self._sender is None: logger.debug("Creating Sender") diff --git a/tests/test_connection.py b/tests/test_connection.py index bff98c6..24652e9 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -81,9 +81,11 @@ def test_connection_reconnection() -> None: # delay time.sleep(5) # simulate a disconnection - delete_all_connections() # raise a reconnection management = connection.management() + + delete_all_connections() + stream_name = "test_stream_info_with_validation" queue_specification = StreamSpecification( name=stream_name, @@ -95,7 +97,6 @@ def test_connection_reconnection() -> None: disconnected = True # check that we reconnected - management = connection.management() management.declare_queue(queue_specification) management.delete_queue(stream_name) management.close() diff --git a/tests/test_publisher.py b/tests/test_publisher.py index fd0e1e3..bd6c882 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -270,13 +270,11 @@ def test_disconnection_reconnection() -> None: # delay time.sleep(5) messages_to_publish = 10000 - queue_name = "test-queue" + queue_name = "test-queue-reconnection" management = connection_test.management() management.declare_queue(QuorumQueueSpecification(name=queue_name)) - management.close() - publisher = connection_test.publisher( destination=AddressHelper.queue_address(queue_name) ) @@ -291,9 +289,6 @@ def test_disconnection_reconnection() -> None: except ConnectionClosed: disconnected = True - publisher = connection_test.publisher( - destination=AddressHelper.queue_address(queue_name) - ) continue except Exception: @@ -303,14 +298,6 @@ def test_disconnection_reconnection() -> None: publisher.close() - # cleanup, we need to create a new connection as the previous one - # was closed by the test - - connection_test = environment.connection() - connection_test.dial() - - management = connection_test.management() - # purge the queue and check number of published messages message_purged = management.purge_queue(queue_name) @@ -335,14 +322,10 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None: management = connection.management() management.declare_queue(queue_specification) - print("before creating publisher") - publisher = connection.publisher( destination=AddressHelper.queue_address(stream_name) ) - print("after creating publisher") - for i in range(messages_to_send): publisher.publish(Message(body="test")) @@ -392,3 +375,55 @@ def test_publish_per_message_exchange(connection: Connection) -> None: assert accepted_2 is True assert purged_messages_queue == 2 assert raised is False + + +def test_multiple_publishers(environment: Environment) -> None: + + stream_name = "test_multiple_publisher_1" + stream_name_2 = "test_multiple_publisher_2" + connection = environment.connection() + connection.dial() + + stream_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(stream_specification) + + stream_specification = StreamSpecification( + name=stream_name_2, + ) + management.declare_queue(stream_specification) + + destination = AddressHelper.queue_address(stream_name) + destination_2 = AddressHelper.queue_address(stream_name_2) + connection.publisher(destination) + + assert connection.active_producers == 1 + + publisher_2 = connection.publisher(destination_2) + + assert connection.active_producers == 2 + + publisher_2.close() + + assert connection.active_producers == 1 + + connection.publisher(destination_2) + + assert connection.active_producers == 2 + + connection.close() + + assert connection.active_producers == 0 + + # cleanup + connection = environment.connection() + connection.dial() + management = connection.management() + + management.delete_queue(stream_name) + + management.delete_queue(stream_name_2) + + management.close() From 60d4763706fade64ea13c45d4f53956181d40acd Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Thu, 6 Mar 2025 10:06:40 +0100 Subject: [PATCH 3/9] supporting reconnection of streams by last consumed offset --- examples/streams/example_with_streams.py | 25 ++++++++---- .../amqp_consumer_handler.py | 8 ++++ rabbitmq_amqp_python_client/connection.py | 1 + rabbitmq_amqp_python_client/consumer.py | 26 ++++++++++-- tests/conftest.py | 18 +++++++++ tests/test_streams.py | 40 +++++++++++++++++++ 6 files changed, 107 insertions(+), 11 deletions(-) diff --git a/examples/streams/example_with_streams.py b/examples/streams/example_with_streams.py index 8cd41c7..57cc7c0 100644 --- a/examples/streams/example_with_streams.py +++ b/examples/streams/example_with_streams.py @@ -4,6 +4,7 @@ AddressHelper, AMQPMessagingHandler, Connection, + ConnectionClosed, Environment, Event, Message, @@ -12,7 +13,7 @@ StreamSpecification, ) -MESSAGES_TO_PUBLISH = 100 +MESSAGES_TO_PUBLISH = 1 class MyMessageHandler(AMQPMessagingHandler): @@ -21,7 +22,7 @@ def __init__(self): super().__init__() self._count = 0 - def on_message(self, event: Event): + def on_amqp_message(self, event: Event): # just messages with banana filters get received print( "received message from stream: " @@ -86,7 +87,7 @@ def main() -> None: queue_name = "example-queue" print("connection to amqp server") - environment = Environment("amqp://guest:guest@localhost:5672/") + environment = Environment("amqp://guest:guest@localhost:5672/", reconnect=True) connection = create_connection(environment) management = connection.management() @@ -134,14 +135,22 @@ def main() -> None: publisher.close() - try: - consumer.run() - except KeyboardInterrupt: - pass + while True: + try: + consumer.run() + except KeyboardInterrupt: + pass + except ConnectionClosed: + print("connection closed") + continue + except Exception as e: + print("consumer exited for exception " + str(e)) + + break # print("delete queue") - management.delete_queue(queue_name) + # management.delete_queue(queue_name) print("closing connections") management.close() diff --git a/rabbitmq_amqp_python_client/amqp_consumer_handler.py b/rabbitmq_amqp_python_client/amqp_consumer_handler.py index d748090..6ae33cd 100644 --- a/rabbitmq_amqp_python_client/amqp_consumer_handler.py +++ b/rabbitmq_amqp_python_client/amqp_consumer_handler.py @@ -21,10 +21,18 @@ def __init__(self, auto_accept: bool = False, auto_settle: bool = True): """ super().__init__(auto_accept=auto_accept, auto_settle=auto_settle) self.delivery_context: DeliveryContext = DeliveryContext() + self._offset = 0 def on_amqp_message(self, event: Event) -> None: pass def on_message(self, event: Event) -> None: print("first level callback") + if "x-stream-offset" in event.message.annotations: + print("setting offset") + self._offset = int(event.message.annotations["x-stream-offset"]) self.on_amqp_message(event) + + @property + def offset(self) -> int: + return self._offset diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index 7a23583..a908adb 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -292,6 +292,7 @@ def _on_disconnection(self) -> None: self._publishers[i]._update_connection(self._conn) for i, consumer in enumerate(self._consumers): + print("reconnecting consumer") # Update the broken connection and sender in the consumer self._consumers[i]._update_connection(self._conn) diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index c03eb40..31126bc 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -1,12 +1,12 @@ import logging from typing import Literal, Optional, Union, cast +from .amqp_consumer_handler import AMQPMessagingHandler from .entities import StreamOptions from .options import ( ReceiverOptionUnsettled, ReceiverOptionUnsettledWithFilters, ) -from .qpid.proton._handlers import MessagingHandler from .qpid.proton._message import Message from .qpid.proton.utils import ( BlockingConnection, @@ -37,7 +37,7 @@ def __init__( self, conn: BlockingConnection, addr: str, - handler: Optional[MessagingHandler] = None, + handler: Optional[AMQPMessagingHandler] = None, stream_options: Optional[StreamOptions] = None, credit: Optional[int] = None, ): @@ -67,7 +67,23 @@ def _open(self) -> None: def _update_connection(self, conn: BlockingConnection) -> None: self._conn = conn - self._receiver = self._create_receiver(self._addr) + if self._stream_options is None: + print("creating new receiver without stream") + self._receiver = self._conn.create_receiver( + self._addr, + options=ReceiverOptionUnsettled(self._addr), + handler=self._handler, + ) + else: + print("creating new stream receiver") + self._stream_options.offset(self._handler.offset - 1) # type: ignore + self._receiver = self._conn.create_receiver( + self._addr, + options=ReceiverOptionUnsettledWithFilters( + self._addr, self._stream_options + ), + handler=self._handler, + ) def _set_consumers_list(self, consumers: []) -> None: # type: ignore self._consumers = consumers @@ -149,3 +165,7 @@ def _create_receiver(self, addr: str) -> BlockingReceiver: def address(self) -> str: """Get the current publisher address.""" return self._addr + + @property + def handler(self) -> Optional[AMQPMessagingHandler]: + return self._handler diff --git a/tests/conftest.py b/tests/conftest.py index 4df3ff6..baf57c5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -20,6 +20,8 @@ FriendlyName, ) +from .http_requests import delete_all_connections + os.chdir(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @@ -150,6 +152,22 @@ def on_message(self, event: Event): raise ConsumerTestException("consumed") +class MyMessageHandlerAcceptStreamOffsetReconnect(AMQPMessagingHandler): + + def __init__(self, starting_offset: Optional[int] = None): + super().__init__() + self._received = 0 + self._starting_offset = starting_offset + + def on_message(self, event: Event): + if self._received == 5: + delete_all_connections() + self.delivery_context.accept(event) + self._received = self._received + 1 + if self._received == 10: + raise ConsumerTestException("consumed") + + class MyMessageHandlerNoack(AMQPMessagingHandler): def __init__(self): diff --git a/tests/test_streams.py b/tests/test_streams.py index 535874a..5c86b20 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -10,6 +10,7 @@ from .conftest import ( ConsumerTestException, MyMessageHandlerAcceptStreamOffset, + MyMessageHandlerAcceptStreamOffsetReconnect, ) from .utils import publish_messages @@ -370,3 +371,42 @@ def test_stream_match_unfiltered( consumer.close() management.delete_queue(stream_name) + + +def test_stream_reconnection(connection: Connection, environment: Environment) -> None: + + consumer = None + stream_name = "test_stream_info_with_filtering" + messages_to_send = 10 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + # consume and then publish + try: + stream_filter_options = StreamOptions() + stream_filter_options.filter_values(["banana"]) + stream_filter_options.filter_match_unfiltered(True) + connection_consumer = environment.connection() + connection_consumer.dial() + consumer = connection_consumer.consumer( + addr_queue, + # disconnection and check happens here + message_handler=MyMessageHandlerAcceptStreamOffsetReconnect(), + stream_filter_options=stream_filter_options, + ) + # send with annotations filter banana + publish_messages(connection, messages_to_send, stream_name) + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + consumer.close() + + management.delete_queue(stream_name) From 5a5b1dbad7742491299a67630a1e6b882eb064c1 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Fri, 7 Mar 2025 10:09:21 +0100 Subject: [PATCH 4/9] updating readme --- README.md | 3 +-- examples/reconnection/README.md | 13 +++++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) create mode 100644 examples/reconnection/README.md diff --git a/README.md b/README.md index 5785d09..1892079 100644 --- a/README.md +++ b/README.md @@ -147,8 +147,7 @@ You can check the [`ssl example`](./examples/tls/tls_example.py) to see how to e ### Managing disconnections -At this stage the client doesn't support auto-reconnect but a callback is invoked everytime a remote disconnection is detected. -You can use this callback to implement your own logic and eventually attempt a reconnection. +The client supports automatic reconnection with the ability to reconnect Managements, Producers and Consumers You can check the [`reconnection example`](./examples/reconnection/reconnection_example.py) to see how to manage disconnections and eventually attempt a reconnection diff --git a/examples/reconnection/README.md b/examples/reconnection/README.md new file mode 100644 index 0000000..88133a3 --- /dev/null +++ b/examples/reconnection/README.md @@ -0,0 +1,13 @@ +Automatic reconnection +=== + +You can use this example to test automatic reconnection. + +The scenario is publishing and consuming a lot of messages in a queue. + +From the RabbitMQ UI you can break a connection to see the automatic reconnection happening. + +Same for Consumers. + +In case of streams we connection will restart consuming from the last consumed offset. + From 3f1ac5c26d8c5f2e8676f67fbbd82c6a5bfcf7f7 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Mon, 10 Mar 2025 15:28:07 +0100 Subject: [PATCH 5/9] removing print --- rabbitmq_amqp_python_client/amqp_consumer_handler.py | 2 -- rabbitmq_amqp_python_client/connection.py | 1 - 2 files changed, 3 deletions(-) diff --git a/rabbitmq_amqp_python_client/amqp_consumer_handler.py b/rabbitmq_amqp_python_client/amqp_consumer_handler.py index 6ae33cd..85b3e54 100644 --- a/rabbitmq_amqp_python_client/amqp_consumer_handler.py +++ b/rabbitmq_amqp_python_client/amqp_consumer_handler.py @@ -27,9 +27,7 @@ def on_amqp_message(self, event: Event) -> None: pass def on_message(self, event: Event) -> None: - print("first level callback") if "x-stream-offset" in event.message.annotations: - print("setting offset") self._offset = int(event.message.annotations["x-stream-offset"]) self.on_amqp_message(event) diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index a908adb..7a23583 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -292,7 +292,6 @@ def _on_disconnection(self) -> None: self._publishers[i]._update_connection(self._conn) for i, consumer in enumerate(self._consumers): - print("reconnecting consumer") # Update the broken connection and sender in the consumer self._consumers[i]._update_connection(self._conn) From 54d07152fe0fd08c28a66dc6fcb69319aaaad663 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Mon, 10 Mar 2025 16:56:31 +0100 Subject: [PATCH 6/9] implement BackOff and MaxReconnectAttempts --- examples/reconnection/reconnection_example.py | 6 +- rabbitmq_amqp_python_client/__init__.py | 2 + rabbitmq_amqp_python_client/connection.py | 74 ++++++++++++++----- rabbitmq_amqp_python_client/entities.py | 25 +++++++ rabbitmq_amqp_python_client/environment.py | 7 +- .../qpid/proton/_tracing.py | 6 +- .../qpid/proton/_transport.py | 3 +- tests/conftest.py | 16 ++++ tests/test_connection.py | 7 +- tests/test_publisher.py | 6 +- tests/test_streams.py | 8 +- 11 files changed, 127 insertions(+), 33 deletions(-) diff --git a/examples/reconnection/reconnection_example.py b/examples/reconnection/reconnection_example.py index 568df6f..0df5757 100644 --- a/examples/reconnection/reconnection_example.py +++ b/examples/reconnection/reconnection_example.py @@ -10,13 +10,17 @@ ExchangeToQueueBindingSpecification, Message, QuorumQueueSpecification, + RecoveryConfiguration, ) # here we keep track of the objects we need to reconnect MESSAGES_TO_PUBLISH = 50000 -environment = Environment(uri="amqp://guest:guest@localhost:5672/", reconnect=True) +environment = Environment( + uri="amqp://guest:guest@localhost:5672/", + recovery_configuration=RecoveryConfiguration(active_recovery=True), +) class MyMessageHandler(AMQPMessagingHandler): diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 72742eb..a41a13d 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -11,6 +11,7 @@ ExchangeToExchangeBindingSpecification, ExchangeToQueueBindingSpecification, OffsetSpecification, + RecoveryConfiguration, StreamOptions, ) from .environment import Environment @@ -87,4 +88,5 @@ "OutcomeState", "Environment", "ExchangeCustomSpecification", + "RecoveryConfiguration", ] diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index 7a23583..12bac72 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -1,4 +1,7 @@ import logging +import random +import time +from datetime import timedelta from typing import ( Annotated, Callable, @@ -11,10 +14,11 @@ from .address_helper import validate_address from .consumer import Consumer -from .entities import StreamOptions +from .entities import RecoveryConfiguration, StreamOptions from .exceptions import ArgumentOutOfRangeException from .management import Management from .publisher import Publisher +from .qpid.proton._exceptions import ConnectionException from .qpid.proton._handlers import MessagingHandler from .qpid.proton._transport import SSLDomain from .qpid.proton.utils import BlockingConnection @@ -53,7 +57,7 @@ def __init__( ssl_context: Union[ PosixSslConfigurationContext, WinSslConfigurationContext, None ] = None, - reconnect: bool = False, + recovery_configuration: Optional[RecoveryConfiguration] = None, ): """ Initialize a new Connection instance. @@ -80,7 +84,7 @@ def __init__( PosixSslConfigurationContext, WinSslConfigurationContext, None ] = ssl_context self._managements: list[Management] = [] - self._reconnect = reconnect + self._recovery_configuration = recovery_configuration self._ssl_domain = None self._connections = [] # type: ignore self._index: int = -1 @@ -144,7 +148,10 @@ def dial(self) -> None: password, ) - if self._reconnect is False: + if ( + self._recovery_configuration is None + or self._recovery_configuration.active_recovery is False + ): self._conn = BlockingConnection( url=self._addr, urls=self._addrs, @@ -274,26 +281,53 @@ def _on_disconnection(self) -> None: if self in self._connections: self._connections.remove(self) - self._conn = BlockingConnection( - url=self._addr, - urls=self._addrs, - ssl_domain=self._ssl_domain, - on_disconnection_handler=self._on_disconnection, - ) + base_delay = self._recovery_configuration.back_off_reconnect_interval # type: ignore + max_delay = timedelta(minutes=1) + + for attempt in range(self._recovery_configuration.MaxReconnectAttempts): # type: ignore + + jitter = timedelta(milliseconds=random.randint(0, 500)) + delay = base_delay + jitter + + if delay > max_delay: + delay = max_delay + + time.sleep(delay.total_seconds()) - self._connections.append(self) + try: + self._conn = BlockingConnection( + url=self._addr, + urls=self._addrs, + ssl_domain=self._ssl_domain, + on_disconnection_handler=self._on_disconnection, + ) + + self._connections.append(self) + + for i, management in enumerate(self._managements): + # Update the broken connection and sender in the management + self._managements[i]._update_connection(self._conn) - for i, management in enumerate(self._managements): - # Update the broken connection and sender in the management - self._managements[i]._update_connection(self._conn) + for i, publisher in enumerate(self._publishers): + # Update the broken connection and sender in the publisher + self._publishers[i]._update_connection(self._conn) - for i, publisher in enumerate(self._publishers): - # Update the broken connection and sender in the publisher - self._publishers[i]._update_connection(self._conn) + for i, consumer in enumerate(self._consumers): + # Update the broken connection and sender in the consumer + self._consumers[i]._update_connection(self._conn) + + except ConnectionException as e: + base_delay *= 2 + logger.error( + "Reconnection attempt failed", + "attempt", + attempt, + "error", + str(e), + ) + continue - for i, consumer in enumerate(self._consumers): - # Update the broken connection and sender in the consumer - self._consumers[i]._update_connection(self._conn) + break @property def active_producers(self) -> int: diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 8e1d04a..8e5b9fc 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -1,4 +1,5 @@ from dataclasses import dataclass, field +from datetime import timedelta from enum import Enum from typing import Any, Dict, Optional, Union @@ -208,3 +209,27 @@ def filter_set(self) -> Dict[symbol, Described]: Dict[symbol, Described]: The current filter set configuration """ return self._filter_set + + +@dataclass +class RecoveryConfiguration: + """ + Configuration options for automatic reconnection. + + This dataclass contains parameters to manage automatic reconnection + + Attributes: + active_recovery: Define if the recovery is activated. If is not activated the connection will not try to reconnect + back_off_reconnect_interval: the time to wait before trying to createSender after a connection is closed. + time will be increased exponentially with each attempt. + Default is 5 seconds, each attempt will double the time. + The minimum value is 1 second. Avoid setting a value low values since it can cause a high + number of reconnection attempts. + MaxReconnectAttempts: MaxReconnectAttempts The maximum number of reconnection attempts. + Default is 5. + The minimum value is 1. + """ + + active_recovery: bool = True + back_off_reconnect_interval: timedelta = timedelta(0.5) + MaxReconnectAttempts: int = 5 diff --git a/rabbitmq_amqp_python_client/environment.py b/rabbitmq_amqp_python_client/environment.py index a8d51da..0c22642 100644 --- a/rabbitmq_amqp_python_client/environment.py +++ b/rabbitmq_amqp_python_client/environment.py @@ -9,6 +9,7 @@ ) from .connection import Connection +from .entities import RecoveryConfiguration from .ssl_configuration import ( PosixSslConfigurationContext, WinSslConfigurationContext, @@ -40,7 +41,7 @@ def __init__( ssl_context: Union[ PosixSslConfigurationContext, WinSslConfigurationContext, None ] = None, - reconnect: bool = False, + recovery_configuration: Optional[RecoveryConfiguration] = None, ): """ Initialize a new Environment instance. @@ -63,7 +64,7 @@ def __init__( self._uri = uri self._uris = uris self._ssl_context = ssl_context - self._reconnect = reconnect + self._recovery_configuration = recovery_configuration self._connections: list[Connection] = [] def connection( @@ -85,7 +86,7 @@ def connection( uri=self._uri, uris=self._uris, ssl_context=self._ssl_context, - reconnect=self._reconnect, + recovery_configuration=self._recovery_configuration, ) logger.debug("Environment: Creating and returning a new connection") self._connections.append(connection) diff --git a/rabbitmq_amqp_python_client/qpid/proton/_tracing.py b/rabbitmq_amqp_python_client/qpid/proton/_tracing.py index 0c02d12..ec1649b 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_tracing.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_tracing.py @@ -32,8 +32,10 @@ import proton from proton import Sender as ProtonSender -from proton.handlers import IncomingMessageHandler as ProtonIncomingMessageHandler -from proton.handlers import OutgoingMessageHandler as ProtonOutgoingMessageHandler +from proton.handlers import \ + IncomingMessageHandler as ProtonIncomingMessageHandler +from proton.handlers import \ + OutgoingMessageHandler as ProtonOutgoingMessageHandler _tracer = None _trace_key = proton.symbol("x-opt-qpid-tracestate") diff --git a/rabbitmq_amqp_python_client/qpid/proton/_transport.py b/rabbitmq_amqp_python_client/qpid/proton/_transport.py index d4a1702..1e5a31e 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_transport.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_transport.py @@ -139,7 +139,8 @@ if TYPE_CHECKING: from ._condition import Condition - from ._endpoints import Connection # would produce circular import + from ._endpoints import \ + Connection # would produce circular import class TraceAdapter: diff --git a/tests/conftest.py b/tests/conftest.py index baf57c5..859340c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,6 +12,7 @@ PKCS12Store, PosixClientCert, PosixSslConfigurationContext, + RecoveryConfiguration, WinClientCert, WinSslConfigurationContext, symbol, @@ -47,6 +48,21 @@ def connection(pytestconfig): environment.close() +@pytest.fixture() +def connection_with_reconnect(pytestconfig): + environment = Environment( + uri="amqp://guest:guest@localhost:5672/", + recovery_configuration=RecoveryConfiguration(active_recovery=True), + ) + connection = environment.connection() + connection.dial() + try: + yield connection + + finally: + environment.close() + + @pytest.fixture() def ssl_context(pytestconfig): if sys.platform == "win32": diff --git a/tests/test_connection.py b/tests/test_connection.py index 24652e9..f3bdb97 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -3,6 +3,7 @@ from rabbitmq_amqp_python_client import ( ConnectionClosed, Environment, + RecoveryConfiguration, StreamSpecification, ) @@ -11,7 +12,6 @@ def on_disconnected(): - print("disconnected") global disconnected disconnected = True @@ -73,7 +73,10 @@ def test_connection_reconnection() -> None: disconnected = False - environment = Environment("amqp://guest:guest@localhost:5672/", reconnect=True) + environment = Environment( + "amqp://guest:guest@localhost:5672/", + recovery_configuration=RecoveryConfiguration(active_recovery=True), + ) connection = environment.connection() connection.dial() diff --git a/tests/test_publisher.py b/tests/test_publisher.py index bd6c882..4d21c7c 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -9,6 +9,7 @@ Message, OutcomeState, QuorumQueueSpecification, + RecoveryConfiguration, StreamSpecification, ValidationCodeException, ) @@ -262,7 +263,10 @@ def test_disconnection_reconnection() -> None: disconnected = False generic_exception_raised = False - environment = Environment("amqp://guest:guest@localhost:5672/", reconnect=True) + environment = Environment( + "amqp://guest:guest@localhost:5672/", + recovery_configuration=RecoveryConfiguration(active_recovery=True), + ) connection_test = environment.connection() diff --git a/tests/test_streams.py b/tests/test_streams.py index 5c86b20..890ab94 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -373,7 +373,9 @@ def test_stream_match_unfiltered( management.delete_queue(stream_name) -def test_stream_reconnection(connection: Connection, environment: Environment) -> None: +def test_stream_reconnection( + connection_with_reconnect: Connection, environment: Environment +) -> None: consumer = None stream_name = "test_stream_info_with_filtering" @@ -382,7 +384,7 @@ def test_stream_reconnection(connection: Connection, environment: Environment) - queue_specification = StreamSpecification( name=stream_name, ) - management = connection.management() + management = connection_with_reconnect.management() management.declare_queue(queue_specification) addr_queue = AddressHelper.queue_address(stream_name) @@ -401,7 +403,7 @@ def test_stream_reconnection(connection: Connection, environment: Environment) - stream_filter_options=stream_filter_options, ) # send with annotations filter banana - publish_messages(connection, messages_to_send, stream_name) + publish_messages(connection_with_reconnect, messages_to_send, stream_name) consumer.run() # ack to terminate the consumer except ConsumerTestException: From 067b3f296f5f4c39c842d5d7a361c23acde539c1 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Tue, 11 Mar 2025 10:10:33 +0100 Subject: [PATCH 7/9] few improvements --- examples/reconnection/README.md | 5 ++++- rabbitmq_amqp_python_client/connection.py | 16 +++++++++++++--- rabbitmq_amqp_python_client/entities.py | 2 +- .../qpid/proton/_tracing.py | 6 ++---- .../qpid/proton/_transport.py | 3 +-- 5 files changed, 21 insertions(+), 11 deletions(-) diff --git a/examples/reconnection/README.md b/examples/reconnection/README.md index 88133a3..0f10133 100644 --- a/examples/reconnection/README.md +++ b/examples/reconnection/README.md @@ -9,5 +9,8 @@ From the RabbitMQ UI you can break a connection to see the automatic reconnectio Same for Consumers. -In case of streams we connection will restart consuming from the last consumed offset. +In case of streams the connection will restart consuming from the last consumed offset. + +You can control some reconnection parameters with the RecoveryConfiguration dataclass, where you can specify +the backoff interval and the maximum_retries before the client gives up. diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index 12bac72..85d103b 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -278,6 +278,7 @@ def consumer( def _on_disconnection(self) -> None: + logger.debug("_on_disconnection: disconnection detected") if self in self._connections: self._connections.remove(self) @@ -286,6 +287,7 @@ def _on_disconnection(self) -> None: for attempt in range(self._recovery_configuration.MaxReconnectAttempts): # type: ignore + logger.debug("attempting a reconnection") jitter = timedelta(milliseconds=random.randint(0, 500)) delay = base_delay + jitter @@ -318,16 +320,24 @@ def _on_disconnection(self) -> None: except ConnectionException as e: base_delay *= 2 - logger.error( + logger.debug( "Reconnection attempt failed", "attempt", attempt, "error", str(e), ) - continue + # maximum attempts reached without establishing a connection + if attempt == self._recovery_configuration.MaxReconnectAttempts - 1: # type: ignore + logger.debug("Not able to reconnect") + raise ConnectionException + else: + continue - break + # connection established + else: + logger.debug("reconnected successful") + return @property def active_producers(self) -> int: diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 8e5b9fc..9878a6a 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -231,5 +231,5 @@ class RecoveryConfiguration: """ active_recovery: bool = True - back_off_reconnect_interval: timedelta = timedelta(0.5) + back_off_reconnect_interval: timedelta = timedelta(seconds=5) MaxReconnectAttempts: int = 5 diff --git a/rabbitmq_amqp_python_client/qpid/proton/_tracing.py b/rabbitmq_amqp_python_client/qpid/proton/_tracing.py index ec1649b..0c02d12 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_tracing.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_tracing.py @@ -32,10 +32,8 @@ import proton from proton import Sender as ProtonSender -from proton.handlers import \ - IncomingMessageHandler as ProtonIncomingMessageHandler -from proton.handlers import \ - OutgoingMessageHandler as ProtonOutgoingMessageHandler +from proton.handlers import IncomingMessageHandler as ProtonIncomingMessageHandler +from proton.handlers import OutgoingMessageHandler as ProtonOutgoingMessageHandler _tracer = None _trace_key = proton.symbol("x-opt-qpid-tracestate") diff --git a/rabbitmq_amqp_python_client/qpid/proton/_transport.py b/rabbitmq_amqp_python_client/qpid/proton/_transport.py index 1e5a31e..d4a1702 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_transport.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_transport.py @@ -139,8 +139,7 @@ if TYPE_CHECKING: from ._condition import Condition - from ._endpoints import \ - Connection # would produce circular import + from ._endpoints import Connection # would produce circular import class TraceAdapter: From dc85a1f00c11ab6e79600ec99278e5706965ff38 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Tue, 11 Mar 2025 11:11:05 +0100 Subject: [PATCH 8/9] removing prints --- rabbitmq_amqp_python_client/consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 31126bc..15ef4c9 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -68,14 +68,14 @@ def _open(self) -> None: def _update_connection(self, conn: BlockingConnection) -> None: self._conn = conn if self._stream_options is None: - print("creating new receiver without stream") + logger.debug("creating new receiver without stream") self._receiver = self._conn.create_receiver( self._addr, options=ReceiverOptionUnsettled(self._addr), handler=self._handler, ) else: - print("creating new stream receiver") + logger.debug("creating new stream receiver") self._stream_options.offset(self._handler.offset - 1) # type: ignore self._receiver = self._conn.create_receiver( self._addr, From c5dae5f56d003a66b149ed1d9306bec02283b725 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Tue, 11 Mar 2025 14:06:19 +0100 Subject: [PATCH 9/9] address PR comments --- examples/reconnection/README.md | 2 + examples/reconnection/reconnection_example.py | 2 - rabbitmq_amqp_python_client/connection.py | 93 ++++++++++--------- rabbitmq_amqp_python_client/environment.py | 2 +- tests/test_connection.py | 22 +++++ 5 files changed, 76 insertions(+), 45 deletions(-) diff --git a/examples/reconnection/README.md b/examples/reconnection/README.md index 0f10133..2a31a6b 100644 --- a/examples/reconnection/README.md +++ b/examples/reconnection/README.md @@ -14,3 +14,5 @@ In case of streams the connection will restart consuming from the last consumed You can control some reconnection parameters with the RecoveryConfiguration dataclass, where you can specify the backoff interval and the maximum_retries before the client gives up. +To disable automatic reconnection you can set active_recovery of RecoveryConfiguration to False + diff --git a/examples/reconnection/reconnection_example.py b/examples/reconnection/reconnection_example.py index 0df5757..161b8d4 100644 --- a/examples/reconnection/reconnection_example.py +++ b/examples/reconnection/reconnection_example.py @@ -10,7 +10,6 @@ ExchangeToQueueBindingSpecification, Message, QuorumQueueSpecification, - RecoveryConfiguration, ) # here we keep track of the objects we need to reconnect @@ -19,7 +18,6 @@ environment = Environment( uri="amqp://guest:guest@localhost:5672/", - recovery_configuration=RecoveryConfiguration(active_recovery=True), ) diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index 85d103b..d43e2a4 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -15,7 +15,10 @@ from .address_helper import validate_address from .consumer import Consumer from .entities import RecoveryConfiguration, StreamOptions -from .exceptions import ArgumentOutOfRangeException +from .exceptions import ( + ArgumentOutOfRangeException, + ValidationCodeException, +) from .management import Management from .publisher import Publisher from .qpid.proton._exceptions import ConnectionException @@ -57,7 +60,7 @@ def __init__( ssl_context: Union[ PosixSslConfigurationContext, WinSslConfigurationContext, None ] = None, - recovery_configuration: Optional[RecoveryConfiguration] = None, + recovery_configuration: RecoveryConfiguration = RecoveryConfiguration(), ): """ Initialize a new Connection instance. @@ -84,16 +87,55 @@ def __init__( PosixSslConfigurationContext, WinSslConfigurationContext, None ] = ssl_context self._managements: list[Management] = [] - self._recovery_configuration = recovery_configuration + self._recovery_configuration: RecoveryConfiguration = recovery_configuration self._ssl_domain = None self._connections = [] # type: ignore self._index: int = -1 self._publishers: list[Publisher] = [] self._consumers: list[Consumer] = [] + # Some recovery_configuration validation + if recovery_configuration.back_off_reconnect_interval < timedelta(seconds=1): + raise ValidationCodeException( + "back_off_reconnect_interval must be > 1 second" + ) + + if recovery_configuration.MaxReconnectAttempts < 1: + raise ValidationCodeException("MaxReconnectAttempts must be at least 1") + def _set_environment_connection_list(self, connections: []): # type: ignore self._connections = connections + def _open_connections(self, reconnect_handlers: bool = False) -> None: + + if self._recovery_configuration.active_recovery is False: + self._conn = BlockingConnection( + url=self._addr, + urls=self._addrs, + ssl_domain=self._ssl_domain, + ) + else: + self._conn = BlockingConnection( + url=self._addr, + urls=self._addrs, + ssl_domain=self._ssl_domain, + on_disconnection_handler=self._on_disconnection, + ) + + if reconnect_handlers is True: + + for i, management in enumerate(self._managements): + # Update the broken connection and sender in the management + self._managements[i]._update_connection(self._conn) + + for i, publisher in enumerate(self._publishers): + # Update the broken connection and sender in the publisher + self._publishers[i]._update_connection(self._conn) + + for i, consumer in enumerate(self._consumers): + # Update the broken connection and sender in the consumer + self._consumers[i]._update_connection(self._conn) + def dial(self) -> None: """ Establish a connection to the AMQP server. @@ -148,24 +190,7 @@ def dial(self) -> None: password, ) - if ( - self._recovery_configuration is None - or self._recovery_configuration.active_recovery is False - ): - self._conn = BlockingConnection( - url=self._addr, - urls=self._addrs, - ssl_domain=self._ssl_domain, - ) - else: - self._conn = BlockingConnection( - url=self._addr, - urls=self._addrs, - ssl_domain=self._ssl_domain, - on_disconnection_handler=self._on_disconnection, - ) - - # self._open() + self._open_connections() logger.debug("Connection to the server established") def _win_store_to_cert( @@ -282,10 +307,10 @@ def _on_disconnection(self) -> None: if self in self._connections: self._connections.remove(self) - base_delay = self._recovery_configuration.back_off_reconnect_interval # type: ignore + base_delay = self._recovery_configuration.back_off_reconnect_interval max_delay = timedelta(minutes=1) - for attempt in range(self._recovery_configuration.MaxReconnectAttempts): # type: ignore + for attempt in range(self._recovery_configuration.MaxReconnectAttempts): logger.debug("attempting a reconnection") jitter = timedelta(milliseconds=random.randint(0, 500)) @@ -297,26 +322,10 @@ def _on_disconnection(self) -> None: time.sleep(delay.total_seconds()) try: - self._conn = BlockingConnection( - url=self._addr, - urls=self._addrs, - ssl_domain=self._ssl_domain, - on_disconnection_handler=self._on_disconnection, - ) - - self._connections.append(self) - for i, management in enumerate(self._managements): - # Update the broken connection and sender in the management - self._managements[i]._update_connection(self._conn) + self._open_connections(reconnect_handlers=True) - for i, publisher in enumerate(self._publishers): - # Update the broken connection and sender in the publisher - self._publishers[i]._update_connection(self._conn) - - for i, consumer in enumerate(self._consumers): - # Update the broken connection and sender in the consumer - self._consumers[i]._update_connection(self._conn) + self._connections.append(self) except ConnectionException as e: base_delay *= 2 @@ -328,7 +337,7 @@ def _on_disconnection(self) -> None: str(e), ) # maximum attempts reached without establishing a connection - if attempt == self._recovery_configuration.MaxReconnectAttempts - 1: # type: ignore + if attempt == self._recovery_configuration.MaxReconnectAttempts - 1: logger.debug("Not able to reconnect") raise ConnectionException else: diff --git a/rabbitmq_amqp_python_client/environment.py b/rabbitmq_amqp_python_client/environment.py index 0c22642..c2adef9 100644 --- a/rabbitmq_amqp_python_client/environment.py +++ b/rabbitmq_amqp_python_client/environment.py @@ -41,7 +41,7 @@ def __init__( ssl_context: Union[ PosixSslConfigurationContext, WinSslConfigurationContext, None ] = None, - recovery_configuration: Optional[RecoveryConfiguration] = None, + recovery_configuration: RecoveryConfiguration = RecoveryConfiguration(), ): """ Initialize a new Environment instance. diff --git a/tests/test_connection.py b/tests/test_connection.py index f3bdb97..0f18410 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,10 +1,12 @@ import time +from datetime import timedelta from rabbitmq_amqp_python_client import ( ConnectionClosed, Environment, RecoveryConfiguration, StreamSpecification, + ValidationCodeException, ) from .http_requests import delete_all_connections @@ -106,3 +108,23 @@ def test_connection_reconnection() -> None: environment.close() assert disconnected is True + + +def test_reconnection_parameters() -> None: + + exception = False + + environment = Environment( + "amqp://guest:guest@localhost:5672/", + recovery_configuration=RecoveryConfiguration( + active_recovery=True, + back_off_reconnect_interval=timedelta(milliseconds=100), + ), + ) + + try: + environment.connection() + except ValidationCodeException: + exception = True + + assert exception is True