diff --git a/README.md b/README.md index 5785d09..1892079 100644 --- a/README.md +++ b/README.md @@ -147,8 +147,7 @@ You can check the [`ssl example`](./examples/tls/tls_example.py) to see how to e ### Managing disconnections -At this stage the client doesn't support auto-reconnect but a callback is invoked everytime a remote disconnection is detected. -You can use this callback to implement your own logic and eventually attempt a reconnection. +The client supports automatic reconnection with the ability to reconnect Managements, Producers and Consumers You can check the [`reconnection example`](./examples/reconnection/reconnection_example.py) to see how to manage disconnections and eventually attempt a reconnection diff --git a/examples/getting_started/getting_started.py b/examples/getting_started/getting_started.py index 589f2e5..bbd6f60 100644 --- a/examples/getting_started/getting_started.py +++ b/examples/getting_started/getting_started.py @@ -23,7 +23,7 @@ def __init__(self): super().__init__() self._count = 0 - def on_message(self, event: Event): + def on_amqp_message(self, event: Event): print("received message: " + str(event.message.body)) # accepting @@ -147,7 +147,7 @@ def main() -> None: consumer.close() # once we finish consuming if we close the connection we need to create a new one # connection = create_connection() - # management = connection.management() + management = connection.management() print("unbind") management.unbind(bind_name) diff --git a/examples/reconnection/README.md b/examples/reconnection/README.md new file mode 100644 index 0000000..2a31a6b --- /dev/null +++ b/examples/reconnection/README.md @@ -0,0 +1,18 @@ +Automatic reconnection +=== + +You can use this example to test automatic reconnection. + +The scenario is publishing and consuming a lot of messages in a queue. + +From the RabbitMQ UI you can break a connection to see the automatic reconnection happening. + +Same for Consumers. + +In case of streams the connection will restart consuming from the last consumed offset. + +You can control some reconnection parameters with the RecoveryConfiguration dataclass, where you can specify +the backoff interval and the maximum_retries before the client gives up. + +To disable automatic reconnection you can set active_recovery of RecoveryConfiguration to False + diff --git a/examples/reconnection/reconnection_example.py b/examples/reconnection/reconnection_example.py index 6504f11..161b8d4 100644 --- a/examples/reconnection/reconnection_example.py +++ b/examples/reconnection/reconnection_example.py @@ -1,75 +1,23 @@ # type: ignore - - -import time -from dataclasses import dataclass -from typing import Optional - from rabbitmq_amqp_python_client import ( AddressHelper, AMQPMessagingHandler, Connection, ConnectionClosed, - Consumer, Environment, Event, ExchangeSpecification, ExchangeToQueueBindingSpecification, - Management, Message, - Publisher, QuorumQueueSpecification, ) - # here we keep track of the objects we need to reconnect -@dataclass -class ConnectionConfiguration: - connection: Optional[Connection] = None - management: Optional[Management] = None - publisher: Optional[Publisher] = None - consumer: Optional[Consumer] = None - - -connection_configuration = ConnectionConfiguration() -MESSAGES_TO_PUBLSH = 50000 - - -# disconnection callback -# here you can cleanup or reconnect -def on_disconnection(): - - print("disconnected") - global environment - exchange_name = "test-exchange" - queue_name = "example-queue" - routing_key = "routing-key" - - global connection_configuration - - addr = AddressHelper.exchange_address(exchange_name, routing_key) - addr_queue = AddressHelper.queue_address(queue_name) - - if connection_configuration.connection is not None: - connection_configuration.connection = create_connection() - if connection_configuration.management is not None: - connection_configuration.management = ( - connection_configuration.connection.management() - ) - if connection_configuration.publisher is not None: - connection_configuration.publisher = ( - connection_configuration.connection.publisher(addr) - ) - if connection_configuration.consumer is not None: - connection_configuration.consumer = ( - connection_configuration.connection.consumer( - addr_queue, message_handler=MyMessageHandler() - ) - ) +MESSAGES_TO_PUBLISH = 50000 environment = Environment( - uri="amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnection + uri="amqp://guest:guest@localhost:5672/", ) @@ -102,7 +50,7 @@ def on_message(self, event: Event): self._count = self._count + 1 - if self._count == MESSAGES_TO_PUBLSH: + if self._count == MESSAGES_TO_PUBLISH: print("closing receiver") # if you want you can add cleanup operations here @@ -136,29 +84,22 @@ def main() -> None: queue_name = "example-queue" routing_key = "routing-key" - global connection_configuration - print("connection to amqp server") - if connection_configuration.connection is None: - connection_configuration.connection = create_connection() - - if connection_configuration.management is None: - connection_configuration.management = ( - connection_configuration.connection.management() - ) + connection = create_connection() + management = connection.management() + publisher = None + consumer = None print("declaring exchange and queue") - connection_configuration.management.declare_exchange( - ExchangeSpecification(name=exchange_name) - ) + management.declare_exchange(ExchangeSpecification(name=exchange_name)) - connection_configuration.management.declare_queue( + management.declare_queue( QuorumQueueSpecification(name=queue_name) # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter") ) print("binding queue to exchange") - bind_name = connection_configuration.management.bind( + bind_name = management.bind( ExchangeToQueueBindingSpecification( source_exchange=exchange_name, destination_queue=queue_name, @@ -171,34 +112,32 @@ def main() -> None: addr_queue = AddressHelper.queue_address(queue_name) print("create a publisher and publish a test message") - if connection_configuration.publisher is None: - connection_configuration.publisher = ( - connection_configuration.connection.publisher(addr) - ) + if publisher is None: + publisher = connection.publisher(addr) print("purging the queue") - messages_purged = connection_configuration.management.purge_queue(queue_name) + messages_purged = management.purge_queue(queue_name) print("messages purged: " + str(messages_purged)) - # management.close() # publishing messages while True: - for i in range(MESSAGES_TO_PUBLSH): + for i in range(MESSAGES_TO_PUBLISH): if i % 1000 == 0: print("published 1000 messages...") try: - if connection_configuration.publisher is not None: - connection_configuration.publisher.publish(Message(body="test")) + if publisher is not None: + publisher.publish(Message(body="test")) except ConnectionClosed: print("publisher closing exception, resubmitting") + # publisher = connection.publisher(addr) continue print("closing publisher") try: - if connection_configuration.publisher is not None: - connection_configuration.publisher.close() + if publisher is not None: + publisher.close() except ConnectionClosed: print("publisher closing exception, resubmitting") continue @@ -207,20 +146,15 @@ def main() -> None: print( "create a consumer and consume the test message - press control + c to terminate to consume" ) - if connection_configuration.consumer is None: - connection_configuration.consumer = ( - connection_configuration.connection.consumer( - addr_queue, message_handler=MyMessageHandler() - ) - ) + if consumer is None: + consumer = connection.consumer(addr_queue, message_handler=MyMessageHandler()) while True: try: - connection_configuration.consumer.run() + consumer.run() except KeyboardInterrupt: pass except ConnectionClosed: - time.sleep(1) continue except Exception as e: print("consumer exited for exception " + str(e)) @@ -228,22 +162,19 @@ def main() -> None: break print("cleanup") - connection_configuration.consumer.close() - # once we finish consuming if we close the connection we need to create a new one - # connection = create_connection() - # management = connection.management() + consumer.close() print("unbind") - connection_configuration.management.unbind(bind_name) + management.unbind(bind_name) print("delete queue") - connection_configuration.management.delete_queue(queue_name) + management.delete_queue(queue_name) print("delete exchange") - connection_configuration.management.delete_exchange(exchange_name) + management.delete_exchange(exchange_name) print("closing connections") - connection_configuration.management.close() + management.close() print("after management closing") environment.close() print("after connection closing") diff --git a/examples/streams/example_with_streams.py b/examples/streams/example_with_streams.py index 8cd41c7..57cc7c0 100644 --- a/examples/streams/example_with_streams.py +++ b/examples/streams/example_with_streams.py @@ -4,6 +4,7 @@ AddressHelper, AMQPMessagingHandler, Connection, + ConnectionClosed, Environment, Event, Message, @@ -12,7 +13,7 @@ StreamSpecification, ) -MESSAGES_TO_PUBLISH = 100 +MESSAGES_TO_PUBLISH = 1 class MyMessageHandler(AMQPMessagingHandler): @@ -21,7 +22,7 @@ def __init__(self): super().__init__() self._count = 0 - def on_message(self, event: Event): + def on_amqp_message(self, event: Event): # just messages with banana filters get received print( "received message from stream: " @@ -86,7 +87,7 @@ def main() -> None: queue_name = "example-queue" print("connection to amqp server") - environment = Environment("amqp://guest:guest@localhost:5672/") + environment = Environment("amqp://guest:guest@localhost:5672/", reconnect=True) connection = create_connection(environment) management = connection.management() @@ -134,14 +135,22 @@ def main() -> None: publisher.close() - try: - consumer.run() - except KeyboardInterrupt: - pass + while True: + try: + consumer.run() + except KeyboardInterrupt: + pass + except ConnectionClosed: + print("connection closed") + continue + except Exception as e: + print("consumer exited for exception " + str(e)) + + break # print("delete queue") - management.delete_queue(queue_name) + # management.delete_queue(queue_name) print("closing connections") management.close() diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 72742eb..a41a13d 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -11,6 +11,7 @@ ExchangeToExchangeBindingSpecification, ExchangeToQueueBindingSpecification, OffsetSpecification, + RecoveryConfiguration, StreamOptions, ) from .environment import Environment @@ -87,4 +88,5 @@ "OutcomeState", "Environment", "ExchangeCustomSpecification", + "RecoveryConfiguration", ] diff --git a/rabbitmq_amqp_python_client/amqp_consumer_handler.py b/rabbitmq_amqp_python_client/amqp_consumer_handler.py index a1c4612..85b3e54 100644 --- a/rabbitmq_amqp_python_client/amqp_consumer_handler.py +++ b/rabbitmq_amqp_python_client/amqp_consumer_handler.py @@ -1,4 +1,5 @@ from .delivery_context import DeliveryContext +from .qpid.proton._events import Event from .qpid.proton.handlers import MessagingHandler """ @@ -20,3 +21,16 @@ def __init__(self, auto_accept: bool = False, auto_settle: bool = True): """ super().__init__(auto_accept=auto_accept, auto_settle=auto_settle) self.delivery_context: DeliveryContext = DeliveryContext() + self._offset = 0 + + def on_amqp_message(self, event: Event) -> None: + pass + + def on_message(self, event: Event) -> None: + if "x-stream-offset" in event.message.annotations: + self._offset = int(event.message.annotations["x-stream-offset"]) + self.on_amqp_message(event) + + @property + def offset(self) -> int: + return self._offset diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index 49c0064..d43e2a4 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -1,4 +1,7 @@ import logging +import random +import time +from datetime import timedelta from typing import ( Annotated, Callable, @@ -11,10 +14,14 @@ from .address_helper import validate_address from .consumer import Consumer -from .entities import StreamOptions -from .exceptions import ArgumentOutOfRangeException +from .entities import RecoveryConfiguration, StreamOptions +from .exceptions import ( + ArgumentOutOfRangeException, + ValidationCodeException, +) from .management import Management from .publisher import Publisher +from .qpid.proton._exceptions import ConnectionException from .qpid.proton._handlers import MessagingHandler from .qpid.proton._transport import SSLDomain from .qpid.proton.utils import BlockingConnection @@ -53,7 +60,7 @@ def __init__( ssl_context: Union[ PosixSslConfigurationContext, WinSslConfigurationContext, None ] = None, - on_disconnection_handler: Optional[CB] = None, # type: ignore + recovery_configuration: RecoveryConfiguration = RecoveryConfiguration(), ): """ Initialize a new Connection instance. @@ -62,7 +69,7 @@ def __init__( uri: Single node connection URI uris: List of URIs for multi-node setup ssl_context: SSL configuration for secure connections - on_disconnection_handler: Callback for handling disconnection events + reconnect: Ability to automatically reconnect in case of disconnections from the server Raises: ValueError: If neither uri nor uris is provided @@ -76,18 +83,59 @@ def __init__( self._addr: Optional[str] = uri self._addrs: Optional[list[str]] = uris self._conn: BlockingConnection - self._management: Management - self._on_disconnection_handler = on_disconnection_handler self._conf_ssl_context: Union[ PosixSslConfigurationContext, WinSslConfigurationContext, None ] = ssl_context + self._managements: list[Management] = [] + self._recovery_configuration: RecoveryConfiguration = recovery_configuration self._ssl_domain = None self._connections = [] # type: ignore self._index: int = -1 + self._publishers: list[Publisher] = [] + self._consumers: list[Consumer] = [] + + # Some recovery_configuration validation + if recovery_configuration.back_off_reconnect_interval < timedelta(seconds=1): + raise ValidationCodeException( + "back_off_reconnect_interval must be > 1 second" + ) + + if recovery_configuration.MaxReconnectAttempts < 1: + raise ValidationCodeException("MaxReconnectAttempts must be at least 1") def _set_environment_connection_list(self, connections: []): # type: ignore self._connections = connections + def _open_connections(self, reconnect_handlers: bool = False) -> None: + + if self._recovery_configuration.active_recovery is False: + self._conn = BlockingConnection( + url=self._addr, + urls=self._addrs, + ssl_domain=self._ssl_domain, + ) + else: + self._conn = BlockingConnection( + url=self._addr, + urls=self._addrs, + ssl_domain=self._ssl_domain, + on_disconnection_handler=self._on_disconnection, + ) + + if reconnect_handlers is True: + + for i, management in enumerate(self._managements): + # Update the broken connection and sender in the management + self._managements[i]._update_connection(self._conn) + + for i, publisher in enumerate(self._publishers): + # Update the broken connection and sender in the publisher + self._publishers[i]._update_connection(self._conn) + + for i, consumer in enumerate(self._consumers): + # Update the broken connection and sender in the consumer + self._consumers[i]._update_connection(self._conn) + def dial(self) -> None: """ Establish a connection to the AMQP server. @@ -141,13 +189,8 @@ def dial(self) -> None: client_key, password, ) - self._conn = BlockingConnection( - url=self._addr, - urls=self._addrs, - ssl_domain=self._ssl_domain, - on_disconnection_handler=self._on_disconnection_handler, - ) - self._open() + + self._open_connections() logger.debug("Connection to the server established") def _win_store_to_cert( @@ -174,7 +217,12 @@ def management(self) -> Management: Returns: Management: The management interface for performing administrative tasks """ - return self._management + if len(self._managements) == 0: + management = Management(self._conn) + management.open() + self._managements.append(management) + + return self._managements[0] # closes the connection to the AMQP 1.0 server. def close(self) -> None: @@ -185,6 +233,10 @@ def close(self) -> None: """ logger.debug("Closing connection") try: + for publisher in self._publishers[:]: + publisher.close() + for consumer in self._consumers[:]: + consumer.close() self._conn.close() except Exception as e: logger.error(f"Error closing connection: {e}") @@ -213,6 +265,8 @@ def publisher(self, destination: str = "") -> Publisher: "destination address must start with /queues or /exchanges" ) publisher = Publisher(self._conn, destination) + publisher._set_publishers_list(self._publishers) + self._publishers.append(publisher) return publisher def consumer( @@ -244,4 +298,62 @@ def consumer( consumer = Consumer( self._conn, destination, message_handler, stream_filter_options, credit ) + self._consumers.append(consumer) return consumer + + def _on_disconnection(self) -> None: + + logger.debug("_on_disconnection: disconnection detected") + if self in self._connections: + self._connections.remove(self) + + base_delay = self._recovery_configuration.back_off_reconnect_interval + max_delay = timedelta(minutes=1) + + for attempt in range(self._recovery_configuration.MaxReconnectAttempts): + + logger.debug("attempting a reconnection") + jitter = timedelta(milliseconds=random.randint(0, 500)) + delay = base_delay + jitter + + if delay > max_delay: + delay = max_delay + + time.sleep(delay.total_seconds()) + + try: + + self._open_connections(reconnect_handlers=True) + + self._connections.append(self) + + except ConnectionException as e: + base_delay *= 2 + logger.debug( + "Reconnection attempt failed", + "attempt", + attempt, + "error", + str(e), + ) + # maximum attempts reached without establishing a connection + if attempt == self._recovery_configuration.MaxReconnectAttempts - 1: + logger.debug("Not able to reconnect") + raise ConnectionException + else: + continue + + # connection established + else: + logger.debug("reconnected successful") + return + + @property + def active_producers(self) -> int: + """Returns the number of active publishers""" + return len(self._publishers) + + @property + def active_consumers(self) -> int: + """Returns the number of active consumers""" + return len(self._consumers) diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 3da4dec..15ef4c9 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -1,12 +1,12 @@ import logging from typing import Literal, Optional, Union, cast +from .amqp_consumer_handler import AMQPMessagingHandler from .entities import StreamOptions from .options import ( ReceiverOptionUnsettled, ReceiverOptionUnsettledWithFilters, ) -from .qpid.proton._handlers import MessagingHandler from .qpid.proton._message import Message from .qpid.proton.utils import ( BlockingConnection, @@ -37,7 +37,7 @@ def __init__( self, conn: BlockingConnection, addr: str, - handler: Optional[MessagingHandler] = None, + handler: Optional[AMQPMessagingHandler] = None, stream_options: Optional[StreamOptions] = None, credit: Optional[int] = None, ): @@ -57,6 +57,7 @@ def __init__( self._handler = handler self._stream_options = stream_options self._credit = credit + self._consumers: list[Consumer] = [] self._open() def _open(self) -> None: @@ -64,6 +65,29 @@ def _open(self) -> None: logger.debug("Creating Sender") self._receiver = self._create_receiver(self._addr) + def _update_connection(self, conn: BlockingConnection) -> None: + self._conn = conn + if self._stream_options is None: + logger.debug("creating new receiver without stream") + self._receiver = self._conn.create_receiver( + self._addr, + options=ReceiverOptionUnsettled(self._addr), + handler=self._handler, + ) + else: + logger.debug("creating new stream receiver") + self._stream_options.offset(self._handler.offset - 1) # type: ignore + self._receiver = self._conn.create_receiver( + self._addr, + options=ReceiverOptionUnsettledWithFilters( + self._addr, self._stream_options + ), + handler=self._handler, + ) + + def _set_consumers_list(self, consumers: []) -> None: # type: ignore + self._consumers = consumers + def consume(self, timeout: Union[None, Literal[False], float] = False) -> Message: """ Consume a message from the queue. @@ -93,6 +117,8 @@ def close(self) -> None: logger.debug("Closing the receiver") if self._receiver is not None: self._receiver.close() + if self in self._consumers: + self._consumers.remove(self) def run(self) -> None: """ @@ -134,3 +160,12 @@ def _create_receiver(self, addr: str) -> BlockingReceiver: receiver.credit = self._credit return receiver + + @property + def address(self) -> str: + """Get the current publisher address.""" + return self._addr + + @property + def handler(self) -> Optional[AMQPMessagingHandler]: + return self._handler diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 8e1d04a..9878a6a 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -1,4 +1,5 @@ from dataclasses import dataclass, field +from datetime import timedelta from enum import Enum from typing import Any, Dict, Optional, Union @@ -208,3 +209,27 @@ def filter_set(self) -> Dict[symbol, Described]: Dict[symbol, Described]: The current filter set configuration """ return self._filter_set + + +@dataclass +class RecoveryConfiguration: + """ + Configuration options for automatic reconnection. + + This dataclass contains parameters to manage automatic reconnection + + Attributes: + active_recovery: Define if the recovery is activated. If is not activated the connection will not try to reconnect + back_off_reconnect_interval: the time to wait before trying to createSender after a connection is closed. + time will be increased exponentially with each attempt. + Default is 5 seconds, each attempt will double the time. + The minimum value is 1 second. Avoid setting a value low values since it can cause a high + number of reconnection attempts. + MaxReconnectAttempts: MaxReconnectAttempts The maximum number of reconnection attempts. + Default is 5. + The minimum value is 1. + """ + + active_recovery: bool = True + back_off_reconnect_interval: timedelta = timedelta(seconds=5) + MaxReconnectAttempts: int = 5 diff --git a/rabbitmq_amqp_python_client/environment.py b/rabbitmq_amqp_python_client/environment.py index ad7be8b..c2adef9 100644 --- a/rabbitmq_amqp_python_client/environment.py +++ b/rabbitmq_amqp_python_client/environment.py @@ -9,6 +9,7 @@ ) from .connection import Connection +from .entities import RecoveryConfiguration from .ssl_configuration import ( PosixSslConfigurationContext, WinSslConfigurationContext, @@ -40,7 +41,7 @@ def __init__( ssl_context: Union[ PosixSslConfigurationContext, WinSslConfigurationContext, None ] = None, - on_disconnection_handler: Optional[CB] = None, # type: ignore + recovery_configuration: RecoveryConfiguration = RecoveryConfiguration(), ): """ Initialize a new Environment instance. @@ -63,7 +64,7 @@ def __init__( self._uri = uri self._uris = uris self._ssl_context = ssl_context - self._on_disconnection_handler = on_disconnection_handler + self._recovery_configuration = recovery_configuration self._connections: list[Connection] = [] def connection( @@ -85,7 +86,7 @@ def connection( uri=self._uri, uris=self._uris, ssl_context=self._ssl_context, - on_disconnection_handler=self._on_disconnection_handler, + recovery_configuration=self._recovery_configuration, ) logger.debug("Environment: Creating and returning a new connection") self._connections.append(connection) diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index 0ea7a83..1c6550b 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -53,6 +53,13 @@ def __init__(self, conn: BlockingConnection): self._receiver: Optional[BlockingReceiver] = None self._conn = conn + def _update_connection(self, conn: BlockingConnection) -> None: + self._conn = conn + self._sender = self._create_sender(CommonValues.management_node_address.value) + self._receiver = self._create_receiver( + CommonValues.management_node_address.value, + ) + def open(self) -> None: """ Open the management connection by creating sender and receiver. diff --git a/rabbitmq_amqp_python_client/publisher.py b/rabbitmq_amqp_python_client/publisher.py index 5c057df..dc63d5d 100644 --- a/rabbitmq_amqp_python_client/publisher.py +++ b/rabbitmq_amqp_python_client/publisher.py @@ -43,13 +43,21 @@ def __init__(self, conn: BlockingConnection, addr: str = ""): self._sender: Optional[BlockingSender] = None self._conn = conn self._addr = addr + self._publishers: list[Publisher] = [] self._open() + def _update_connection(self, conn: BlockingConnection) -> None: + self._conn = conn + self._sender = self._create_sender(self._addr) + def _open(self) -> None: if self._sender is None: logger.debug("Creating Sender") self._sender = self._create_sender(self._addr) + def _set_publishers_list(self, publishers: []) -> None: # type: ignore + self._publishers = publishers + def publish(self, message: Message) -> Delivery: """ Publish a message to RabbitMQ. @@ -94,6 +102,8 @@ def close(self) -> None: logger.debug("Closing Sender") if self.is_open: self._sender.close() # type: ignore + if self in self._publishers: + self._publishers.remove(self) def _create_sender(self, addr: str) -> BlockingSender: return self._conn.create_sender(addr, options=SenderOptionUnseattle(addr)) diff --git a/tests/conftest.py b/tests/conftest.py index 4df3ff6..859340c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,6 +12,7 @@ PKCS12Store, PosixClientCert, PosixSslConfigurationContext, + RecoveryConfiguration, WinClientCert, WinSslConfigurationContext, symbol, @@ -20,6 +21,8 @@ FriendlyName, ) +from .http_requests import delete_all_connections + os.chdir(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @@ -45,6 +48,21 @@ def connection(pytestconfig): environment.close() +@pytest.fixture() +def connection_with_reconnect(pytestconfig): + environment = Environment( + uri="amqp://guest:guest@localhost:5672/", + recovery_configuration=RecoveryConfiguration(active_recovery=True), + ) + connection = environment.connection() + connection.dial() + try: + yield connection + + finally: + environment.close() + + @pytest.fixture() def ssl_context(pytestconfig): if sys.platform == "win32": @@ -150,6 +168,22 @@ def on_message(self, event: Event): raise ConsumerTestException("consumed") +class MyMessageHandlerAcceptStreamOffsetReconnect(AMQPMessagingHandler): + + def __init__(self, starting_offset: Optional[int] = None): + super().__init__() + self._received = 0 + self._starting_offset = starting_offset + + def on_message(self, event: Event): + if self._received == 5: + delete_all_connections() + self.delivery_context.accept(event) + self._received = self._received + 1 + if self._received == 10: + raise ConsumerTestException("consumed") + + class MyMessageHandlerNoack(AMQPMessagingHandler): def __init__(self): diff --git a/tests/test_connection.py b/tests/test_connection.py index e006fd7..0f18410 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,9 +1,12 @@ import time +from datetime import timedelta from rabbitmq_amqp_python_client import ( ConnectionClosed, Environment, + RecoveryConfiguration, StreamSpecification, + ValidationCodeException, ) from .http_requests import delete_all_connections @@ -11,7 +14,6 @@ def on_disconnected(): - print("disconnected") global disconnected disconnected = True @@ -71,24 +73,11 @@ def test_environment_connections_management() -> None: def test_connection_reconnection() -> None: - reconnected = False - connection = None disconnected = False - def on_disconnected(): - - nonlocal connection - - # reconnect - if connection is not None: - connection = environment.connection() - connection.dial() - - nonlocal reconnected - reconnected = True - environment = Environment( - "amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnected + "amqp://guest:guest@localhost:5672/", + recovery_configuration=RecoveryConfiguration(active_recovery=True), ) connection = environment.connection() @@ -97,9 +86,11 @@ def on_disconnected(): # delay time.sleep(5) # simulate a disconnection - delete_all_connections() # raise a reconnection management = connection.management() + + delete_all_connections() + stream_name = "test_stream_info_with_validation" queue_specification = StreamSpecification( name=stream_name, @@ -111,11 +102,29 @@ def on_disconnected(): disconnected = True # check that we reconnected - management = connection.management() management.declare_queue(queue_specification) management.delete_queue(stream_name) - environment.close() management.close() + environment.close() assert disconnected is True - assert reconnected is True + + +def test_reconnection_parameters() -> None: + + exception = False + + environment = Environment( + "amqp://guest:guest@localhost:5672/", + recovery_configuration=RecoveryConfiguration( + active_recovery=True, + back_off_reconnect_interval=timedelta(milliseconds=100), + ), + ) + + try: + environment.connection() + except ValidationCodeException: + exception = True + + assert exception is True diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 20abe10..4d21c7c 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -9,6 +9,7 @@ Message, OutcomeState, QuorumQueueSpecification, + RecoveryConfiguration, StreamSpecification, ValidationCodeException, ) @@ -260,35 +261,11 @@ def test_publish_purge(connection: Connection) -> None: def test_disconnection_reconnection() -> None: disconnected = False - reconnected = False generic_exception_raised = False - publisher = None - queue_name = "test-queue" - connection_test = None - environment = None - - def on_disconnected(): - - nonlocal publisher - nonlocal queue_name - nonlocal connection_test - nonlocal environment - - # reconnect - if connection_test is not None: - connection_test = environment.connection() - connection_test.dial() - - if publisher is not None: - publisher = connection_test.publisher( - destination=AddressHelper.queue_address(queue_name) - ) - - nonlocal reconnected - reconnected = True environment = Environment( - "amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnected + "amqp://guest:guest@localhost:5672/", + recovery_configuration=RecoveryConfiguration(active_recovery=True), ) connection_test = environment.connection() @@ -297,13 +274,11 @@ def on_disconnected(): # delay time.sleep(5) messages_to_publish = 10000 - queue_name = "test-queue" + queue_name = "test-queue-reconnection" management = connection_test.management() management.declare_queue(QuorumQueueSpecification(name=queue_name)) - management.close() - publisher = connection_test.publisher( destination=AddressHelper.queue_address(queue_name) ) @@ -327,14 +302,6 @@ def on_disconnected(): publisher.close() - # cleanup, we need to create a new connection as the previous one - # was closed by the test - - connection_test = environment.connection() - connection_test.dial() - - management = connection_test.management() - # purge the queue and check number of published messages message_purged = management.purge_queue(queue_name) @@ -345,7 +312,6 @@ def on_disconnected(): assert generic_exception_raised is False assert disconnected is True - assert reconnected is True assert message_purged == messages_to_publish - 1 @@ -360,14 +326,10 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None: management = connection.management() management.declare_queue(queue_specification) - print("before creating publisher") - publisher = connection.publisher( destination=AddressHelper.queue_address(stream_name) ) - print("after creating publisher") - for i in range(messages_to_send): publisher.publish(Message(body="test")) @@ -417,3 +379,55 @@ def test_publish_per_message_exchange(connection: Connection) -> None: assert accepted_2 is True assert purged_messages_queue == 2 assert raised is False + + +def test_multiple_publishers(environment: Environment) -> None: + + stream_name = "test_multiple_publisher_1" + stream_name_2 = "test_multiple_publisher_2" + connection = environment.connection() + connection.dial() + + stream_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(stream_specification) + + stream_specification = StreamSpecification( + name=stream_name_2, + ) + management.declare_queue(stream_specification) + + destination = AddressHelper.queue_address(stream_name) + destination_2 = AddressHelper.queue_address(stream_name_2) + connection.publisher(destination) + + assert connection.active_producers == 1 + + publisher_2 = connection.publisher(destination_2) + + assert connection.active_producers == 2 + + publisher_2.close() + + assert connection.active_producers == 1 + + connection.publisher(destination_2) + + assert connection.active_producers == 2 + + connection.close() + + assert connection.active_producers == 0 + + # cleanup + connection = environment.connection() + connection.dial() + management = connection.management() + + management.delete_queue(stream_name) + + management.delete_queue(stream_name_2) + + management.close() diff --git a/tests/test_streams.py b/tests/test_streams.py index 535874a..890ab94 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -10,6 +10,7 @@ from .conftest import ( ConsumerTestException, MyMessageHandlerAcceptStreamOffset, + MyMessageHandlerAcceptStreamOffsetReconnect, ) from .utils import publish_messages @@ -370,3 +371,44 @@ def test_stream_match_unfiltered( consumer.close() management.delete_queue(stream_name) + + +def test_stream_reconnection( + connection_with_reconnect: Connection, environment: Environment +) -> None: + + consumer = None + stream_name = "test_stream_info_with_filtering" + messages_to_send = 10 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection_with_reconnect.management() + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + # consume and then publish + try: + stream_filter_options = StreamOptions() + stream_filter_options.filter_values(["banana"]) + stream_filter_options.filter_match_unfiltered(True) + connection_consumer = environment.connection() + connection_consumer.dial() + consumer = connection_consumer.consumer( + addr_queue, + # disconnection and check happens here + message_handler=MyMessageHandlerAcceptStreamOffsetReconnect(), + stream_filter_options=stream_filter_options, + ) + # send with annotations filter banana + publish_messages(connection_with_reconnect, messages_to_send, stream_name) + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + consumer.close() + + management.delete_queue(stream_name)