From 62949a3c1ad05f1359951b0858a717538f7d40ea Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Mon, 10 Feb 2025 16:56:57 +0100 Subject: [PATCH 1/8] Introducing Environment --- README.md | 5 +- examples/getting_started/basic_example.py | 21 +++++- .../getting_started/example_with_streams.py | 23 ++++-- .../getting_started/reconnection_example.py | 10 ++- examples/getting_started/tls_example.py | 10 ++- rabbitmq_amqp_python_client/__init__.py | 2 + rabbitmq_amqp_python_client/connection.py | 2 +- rabbitmq_amqp_python_client/environment.py | 37 ++++++++++ tests/conftest.py | 33 ++++++--- tests/test_connection.py | 17 +++-- tests/test_consumer.py | 45 ++++++++---- tests/test_publisher.py | 10 ++- tests/test_streams.py | 73 ++++++++++++++----- tests/utils.py | 7 -- 14 files changed, 219 insertions(+), 76 deletions(-) create mode 100644 rabbitmq_amqp_python_client/environment.py diff --git a/README.md b/README.md index eb18249..8dd1621 100644 --- a/README.md +++ b/README.md @@ -38,12 +38,13 @@ poetry run python ./examples/getting_started/main.py ### Creating a connection -A connection to the RabbitMQ AMQP 1.0 server can be established using the Connection object. +A connection to the RabbitMQ AMQP 1.0 server can be established using the Environment object. For example: ```python - connection = Connection("amqp://guest:guest@localhost:5672/") + environment = Environment() + connection = environment.connection("amqp://guest:guest@localhost:5672/") connection.dial() ``` diff --git a/examples/getting_started/basic_example.py b/examples/getting_started/basic_example.py index 038f934..932500d 100644 --- a/examples/getting_started/basic_example.py +++ b/examples/getting_started/basic_example.py @@ -11,6 +11,7 @@ ExchangeSpecification, Message, QuorumQueueSpecification, + Environment, ) MESSAGES_TO_PUBLISH = 100 @@ -61,8 +62,19 @@ def on_link_closed(self, event: Event) -> None: print("link closed") -def create_connection() -> Connection: - connection = Connection("amqp://guest:guest@localhost:5672/") +def create_connection(environment: Environment) -> Connection: + connection = environment.connection("amqp://guest:guest@localhost:5672/") + # in case of SSL enablement + # ca_cert_file = ".ci/certs/ca_certificate.pem" + # client_cert = ".ci/certs/client_certificate.pem" + # client_key = ".ci/certs/client_key.pem" + # connection = Connection( + # "amqps://guest:guest@localhost:5671/", + # ssl_context=SslConfigurationContext( + # ca_cert=ca_cert_file, + # client_cert=ClientCert(client_cert=client_cert, client_key=client_key), + # ), + # ) connection.dial() return connection @@ -75,7 +87,8 @@ def main() -> None: routing_key = "routing-key" print("connection to amqp server") - connection = create_connection() + environment = Environment() + connection = create_connection(environment) management = connection.management() @@ -150,7 +163,7 @@ def main() -> None: print("closing connections") management.close() print("after management closing") - connection.close() + environment.close() print("after connection closing") diff --git a/examples/getting_started/example_with_streams.py b/examples/getting_started/example_with_streams.py index 4448ec8..9fd0b7a 100644 --- a/examples/getting_started/example_with_streams.py +++ b/examples/getting_started/example_with_streams.py @@ -9,6 +9,7 @@ OffsetSpecification, StreamOptions, StreamSpecification, + Environment, ) MESSAGES_TO_PUBLISH = 100 @@ -65,8 +66,19 @@ def on_link_closed(self, event: Event) -> None: print("link closed") -def create_connection() -> Connection: - connection = Connection("amqp://guest:guest@localhost:5672/") +def create_connection(environment: Environment) -> Connection: + connection = environment.connection("amqp://guest:guest@localhost:5672/") + # in case of SSL enablement + # ca_cert_file = ".ci/certs/ca_certificate.pem" + # client_cert = ".ci/certs/client_certificate.pem" + # client_key = ".ci/certs/client_key.pem" + # connection = Connection( + # "amqps://guest:guest@localhost:5671/", + # ssl_context=SslConfigurationContext( + # ca_cert=ca_cert_file, + # client_cert=ClientCert(client_cert=client_cert, client_key=client_key), + # ), + # ) connection.dial() return connection @@ -76,7 +88,8 @@ def main() -> None: queue_name = "example-queue" print("connection to amqp server") - connection = create_connection() + environment = Environment() + connection = create_connection(environment) management = connection.management() @@ -84,7 +97,7 @@ def main() -> None: addr_queue = AddressHelper.queue_address(queue_name) - consumer_connection = create_connection() + consumer_connection = create_connection(environment) stream_filter_options = StreamOptions() # can be first, last, next or an offset long @@ -135,7 +148,7 @@ def main() -> None: print("closing connections") management.close() print("after management closing") - connection.close() + environment.close() print("after connection closing") diff --git a/examples/getting_started/reconnection_example.py b/examples/getting_started/reconnection_example.py index 9396d88..ff9079b 100644 --- a/examples/getting_started/reconnection_example.py +++ b/examples/getting_started/reconnection_example.py @@ -18,9 +18,10 @@ Message, Publisher, QuorumQueueSpecification, + Environment, ) - +environment = Environment() # here we keep track of the objects we need to reconnect @dataclass class ConnectionConfiguration: @@ -118,8 +119,9 @@ def create_connection() -> Connection: # "amqp://ha_tls-rabbit_node2-1:5602/", # ] # connection = Connection(uris=uris, on_disconnection_handler=on_disconnected) - connection = Connection( - uri="amqp://guest:guest@localhost:5672/", + + connection = environment.connection( + url="amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnection, ) connection.dial() @@ -242,7 +244,7 @@ def main() -> None: print("closing connections") connection_configuration.management.close() print("after management closing") - connection_configuration.connection.close() + environment.close() print("after connection closing") diff --git a/examples/getting_started/tls_example.py b/examples/getting_started/tls_example.py index f358d64..c18f7a6 100644 --- a/examples/getting_started/tls_example.py +++ b/examples/getting_started/tls_example.py @@ -12,6 +12,7 @@ Message, QuorumQueueSpecification, SslConfigurationContext, + Environment, ) messages_to_publish = 100 @@ -62,12 +63,12 @@ def on_link_closed(self, event: Event) -> None: print("link closed") -def create_connection() -> Connection: +def create_connection(environment: Environment) -> Connection: # in case of SSL enablement ca_cert_file = ".ci/certs/ca_certificate.pem" client_cert = ".ci/certs/client_certificate.pem" client_key = ".ci/certs/client_key.pem" - connection = Connection( + connection = environment.connection( "amqps://guest:guest@localhost:5671/", ssl_context=SslConfigurationContext( ca_cert=ca_cert_file, @@ -84,9 +85,10 @@ def main() -> None: exchange_name = "test-exchange" queue_name = "example-queue" routing_key = "routing-key" + environment = Environment() print("connection to amqp server") - connection = create_connection() + connection = create_connection(environment) management = connection.management() @@ -160,7 +162,7 @@ def main() -> None: print("closing connections") management.close() print("after management closing") - connection.close() + environment.close() print("after connection closing") diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 3e8f86c..cc0f828 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -11,6 +11,7 @@ OffsetSpecification, StreamOptions, ) +from .environment import Environment from .exceptions import ArgumentOutOfRangeException from .management import Management from .publisher import Publisher @@ -66,4 +67,5 @@ "StreamOptions", "OffsetSpecification", "Disposition", + "Environment", ] diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index c2fb7de..b991f34 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -72,7 +72,7 @@ def management(self) -> Management: return self._management # closes the connection to the AMQP 1.0 server. - def close(self) -> None: + def _close(self) -> None: logger.debug("Closing connection") self._conn.close() diff --git a/rabbitmq_amqp_python_client/environment.py b/rabbitmq_amqp_python_client/environment.py new file mode 100644 index 0000000..14c3390 --- /dev/null +++ b/rabbitmq_amqp_python_client/environment.py @@ -0,0 +1,37 @@ +from typing import Annotated, Callable, Optional, TypeVar + +from .connection import Connection +from .ssl_configuration import SslConfigurationContext + +MT = TypeVar("MT") +CB = Annotated[Callable[[MT], None], "Message callback type"] + + +class Environment: + + def __init__(self): # type: ignore + + self._connections = [] + + def connection( + self, + # single-node mode + url: Optional[str] = None, + # multi-node mode + urls: Optional[list[str]] = None, + ssl_context: Optional[SslConfigurationContext] = None, + on_disconnection_handler: Optional[CB] = None, # type: ignore + ) -> Connection: + connection = Connection( + url=url, + urls=urls, + ssl_context=ssl_context, + on_disconnection_handler=on_disconnection_handler, + ) + + self._connections.append(connection) + return connection + + def close(self) -> None: + for connection in self._connections: + connection._close() diff --git a/tests/conftest.py b/tests/conftest.py index edb33fd..1211d0d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,30 +6,42 @@ AddressHelper, AMQPMessagingHandler, ClientCert, - Connection, + Environment, Event, SslConfigurationContext, symbol, ) +@pytest.fixture() +def environment(pytestconfig): + environment = Environment() + try: + yield environment + + finally: + environment.close() + + @pytest.fixture() def connection(pytestconfig): - connection = Connection("amqp://guest:guest@localhost:5672/") + environment = Environment() + connection = environment.connection("amqp://guest:guest@localhost:5672/") connection.dial() try: yield connection finally: - connection.close() + environment.close() @pytest.fixture() def connection_ssl(pytestconfig): + environment = Environment() ca_cert_file = ".ci/certs/ca_certificate.pem" client_cert = ".ci/certs/client_certificate.pem" client_key = ".ci/certs/client_key.pem" - connection = Connection( + connection = environment.connection( "amqps://guest:guest@localhost:5671/", ssl_context=SslConfigurationContext( ca_cert=ca_cert_file, @@ -41,25 +53,26 @@ def connection_ssl(pytestconfig): yield connection finally: - connection.close() + environment.close() @pytest.fixture() def management(pytestconfig): - connection = Connection("amqp://guest:guest@localhost:5672/") + environment = Environment() + connection = environment.connection("amqp://guest:guest@localhost:5672/") connection.dial() try: management = connection.management() yield management finally: - management.close() - connection.close() + environment.close() @pytest.fixture() def consumer(pytestconfig): - connection = Connection("amqp://guest:guest@localhost:5672/") + environment = Environment() + connection = environment.connection("amqp://guest:guest@localhost:5672/") connection.dial() try: queue_name = "test-queue" @@ -69,7 +82,7 @@ def consumer(pytestconfig): finally: consumer.close() - connection.close() + environment.close() class ConsumerTestException(BaseException): diff --git a/tests/test_connection.py b/tests/test_connection.py index 2c8270c..92969b0 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -4,6 +4,7 @@ ClientCert, Connection, ConnectionClosed, + Environment, SslConfigurationContext, StreamSpecification, ) @@ -19,16 +20,18 @@ def on_disconnected(): def test_connection() -> None: - connection = Connection("amqp://guest:guest@localhost:5672/") + environment = Environment() + connection = environment.connection("amqp://guest:guest@localhost:5672/") connection.dial() - connection.close() + environment.close() def test_connection_ssl() -> None: + environment = Environment() ca_cert_file = ".ci/certs/ca_certificate.pem" client_cert = ".ci/certs/client_certificate.pem" client_key = ".ci/certs/client_key.pem" - connection = Connection( + connection = environment.connection( "amqps://guest:guest@localhost:5671/", ssl_context=SslConfigurationContext( ca_cert=ca_cert_file, @@ -37,6 +40,8 @@ def test_connection_ssl() -> None: ) connection.dial() + environment.close() + def test_connection_reconnection() -> None: @@ -56,7 +61,9 @@ def on_disconnected(): nonlocal reconnected reconnected = True - connection = Connection( + environment = Environment() + + connection = environment.connection( "amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnected ) connection.dial() @@ -81,8 +88,8 @@ def on_disconnected(): management = connection.management() management.declare_queue(queue_specification) management.delete_queue(stream_name) + environment.close() management.close() - connection.close() assert disconnected is True assert reconnected is True diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 05ef5f1..c02991e 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -2,6 +2,7 @@ AddressHelper, ArgumentOutOfRangeException, Connection, + Environment, QuorumQueueSpecification, ) @@ -17,7 +18,6 @@ ) from .utils import ( cleanup_dead_lettering, - create_connection, publish_messages, setup_dead_lettering, ) @@ -71,7 +71,9 @@ def test_consumer_invalid_destination(connection: Connection) -> None: assert raised is True -def test_consumer_async_queue_accept(connection: Connection) -> None: +def test_consumer_async_queue_accept( + connection: Connection, environment: Environment +) -> None: messages_to_send = 1000 @@ -86,7 +88,8 @@ def test_consumer_async_queue_accept(connection: Connection) -> None: publish_messages(connection, messages_to_send, queue_name) # we closed the connection so we need to open a new one - connection_consumer = create_connection() + connection_consumer = environment.connection("amqp://guest:guest@localhost:5672/") + connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAccept() ) @@ -108,7 +111,9 @@ def test_consumer_async_queue_accept(connection: Connection) -> None: assert message_count == 0 -def test_consumer_async_queue_no_ack(connection: Connection) -> None: +def test_consumer_async_queue_no_ack( + connection: Connection, environment: Environment +) -> None: messages_to_send = 1000 @@ -123,7 +128,8 @@ def test_consumer_async_queue_no_ack(connection: Connection) -> None: publish_messages(connection, messages_to_send, queue_name) # we closed the connection so we need to open a new one - connection_consumer = create_connection() + connection_consumer = environment.connection("amqp://guest:guest@localhost:5672/") + connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerNoack() @@ -146,7 +152,9 @@ def test_consumer_async_queue_no_ack(connection: Connection) -> None: assert message_count > 0 -def test_consumer_async_queue_with_discard(connection: Connection) -> None: +def test_consumer_async_queue_with_discard( + connection: Connection, environment: Environment +) -> None: messages_to_send = 1000 queue_dead_lettering = "queue-dead-letter" @@ -171,7 +179,8 @@ def test_consumer_async_queue_with_discard(connection: Connection) -> None: publish_messages(connection, messages_to_send, queue_name) # we closed the connection so we need to open a new one - connection_consumer = create_connection() + connection_consumer = environment.connection("amqp://guest:guest@localhost:5672/") + connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerDiscard() @@ -201,7 +210,7 @@ def test_consumer_async_queue_with_discard(connection: Connection) -> None: def test_consumer_async_queue_with_discard_with_annotations( - connection: Connection, + connection: Connection, environment: Environment ) -> None: messages_to_send = 1000 @@ -227,7 +236,8 @@ def test_consumer_async_queue_with_discard_with_annotations( addr_queue_dl = AddressHelper.queue_address(queue_dead_lettering) # we closed the connection so we need to open a new one - connection_consumer = create_connection() + connection_consumer = environment.connection("amqp://guest:guest@localhost:5672/") + connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerDiscardWithAnnotations() @@ -263,9 +273,12 @@ def test_consumer_async_queue_with_discard_with_annotations( assert message_count_dead_lettering == messages_to_send -def test_consumer_async_queue_with_requeue(connection: Connection) -> None: +def test_consumer_async_queue_with_requeue( + connection: Connection, environment: Environment +) -> None: messages_to_send = 1000 + environment = Environment() queue_name = "test-queue-async-requeue" management = connection.management() @@ -277,7 +290,8 @@ def test_consumer_async_queue_with_requeue(connection: Connection) -> None: publish_messages(connection, messages_to_send, queue_name) # we closed the connection so we need to open a new one - connection_consumer = create_connection() + connection_consumer = environment.connection("amqp://guest:guest@localhost:5672/") + connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerRequeue() @@ -300,7 +314,7 @@ def test_consumer_async_queue_with_requeue(connection: Connection) -> None: def test_consumer_async_queue_with_requeue_with_annotations( - connection: Connection, + connection: Connection, environment: Environment ) -> None: messages_to_send = 1000 @@ -315,7 +329,8 @@ def test_consumer_async_queue_with_requeue_with_annotations( publish_messages(connection, messages_to_send, queue_name) # we closed the connection so we need to open a new one - connection_consumer = create_connection() + connection_consumer = environment.connection("amqp://guest:guest@localhost:5672/") + connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerRequeueWithAnnotations() @@ -346,6 +361,7 @@ def test_consumer_async_queue_with_requeue_with_annotations( def test_consumer_async_queue_with_requeue_with_invalid_annotations( connection: Connection, + environment: Environment, ) -> None: messages_to_send = 1000 test_failure = True @@ -361,7 +377,8 @@ def test_consumer_async_queue_with_requeue_with_invalid_annotations( publish_messages(connection, messages_to_send, queue_name) # we closed the connection so we need to open a new one - connection_consumer = create_connection() + connection_consumer = environment.connection("amqp://guest:guest@localhost:5672/") + connection_consumer.dial() try: consumer = connection_consumer.consumer( diff --git a/tests/test_publisher.py b/tests/test_publisher.py index f707b66..a539686 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -6,6 +6,7 @@ BindingSpecification, Connection, ConnectionClosed, + Environment, ExchangeSpecification, Message, QuorumQueueSpecification, @@ -166,6 +167,7 @@ def test_disconnection_reconnection() -> None: publisher = None queue_name = "test-queue" connection_test = None + environment = Environment() def on_disconnected(): @@ -175,7 +177,9 @@ def on_disconnected(): # reconnect if connection_test is not None: - connection_test = Connection("amqp://guest:guest@localhost:5672/") + connection_test = environment.connection( + "amqp://guest:guest@localhost:5672/" + ) connection_test.dial() if publisher is not None: @@ -184,7 +188,7 @@ def on_disconnected(): nonlocal reconnected reconnected = True - connection_test = Connection( + connection_test = environment.connection( "amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnected ) connection_test.dial() @@ -233,7 +237,7 @@ def on_disconnected(): management.delete_queue(queue_name) management.close() - connection_test.close() + environment.close() assert generic_exception_raised is False assert disconnected is True diff --git a/tests/test_streams.py b/tests/test_streams.py index af9a714..6c3cc73 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -1,6 +1,7 @@ from rabbitmq_amqp_python_client import ( AddressHelper, Connection, + Environment, OffsetSpecification, StreamOptions, StreamSpecification, @@ -10,10 +11,12 @@ ConsumerTestException, MyMessageHandlerAcceptStreamOffset, ) -from .utils import create_connection, publish_messages +from .utils import publish_messages -def test_stream_read_from_last_default(connection: Connection) -> None: +def test_stream_read_from_last_default( + connection: Connection, environment: Environment +) -> None: consumer = None stream_name = "test_stream_info_with_validation" @@ -29,7 +32,10 @@ def test_stream_read_from_last_default(connection: Connection) -> None: # consume and then publish try: - connection_consumer = create_connection() + connection_consumer = environment.connection( + "amqp://guest:guest@localhost:5672/" + ) + connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset() ) @@ -44,7 +50,9 @@ def test_stream_read_from_last_default(connection: Connection) -> None: management.delete_queue(stream_name) -def test_stream_read_from_last(connection: Connection) -> None: +def test_stream_read_from_last( + connection: Connection, environment: Environment +) -> None: consumer = None stream_name = "test_stream_info_with_validation" @@ -63,7 +71,10 @@ def test_stream_read_from_last(connection: Connection) -> None: # consume and then publish try: - connection_consumer = create_connection() + connection_consumer = environment.connection( + "amqp://guest:guest@localhost:5672/" + ) + connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), @@ -80,7 +91,9 @@ def test_stream_read_from_last(connection: Connection) -> None: management.delete_queue(stream_name) -def test_stream_read_from_offset_zero(connection: Connection) -> None: +def test_stream_read_from_offset_zero( + connection: Connection, environment: Environment +) -> None: consumer = None stream_name = "test_stream_info_with_validation" @@ -101,7 +114,10 @@ def test_stream_read_from_offset_zero(connection: Connection) -> None: stream_filter_options.offset(0) try: - connection_consumer = create_connection() + connection_consumer = environment.connection( + "amqp://guest:guest@localhost:5672/" + ) + connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(0), @@ -118,7 +134,9 @@ def test_stream_read_from_offset_zero(connection: Connection) -> None: management.delete_queue(stream_name) -def test_stream_read_from_offset_first(connection: Connection) -> None: +def test_stream_read_from_offset_first( + connection: Connection, environment: Environment +) -> None: consumer = None stream_name = "test_stream_info_with_validation" @@ -139,7 +157,10 @@ def test_stream_read_from_offset_first(connection: Connection) -> None: stream_filter_options.offset(OffsetSpecification.first) try: - connection_consumer = create_connection() + connection_consumer = environment.connection( + "amqp://guest:guest@localhost:5672/" + ) + connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(0), @@ -156,7 +177,9 @@ def test_stream_read_from_offset_first(connection: Connection) -> None: management.delete_queue(stream_name) -def test_stream_read_from_offset_ten(connection: Connection) -> None: +def test_stream_read_from_offset_ten( + connection: Connection, environment: Environment +) -> None: consumer = None stream_name = "test_stream_info_with_validation" @@ -177,7 +200,10 @@ def test_stream_read_from_offset_ten(connection: Connection) -> None: stream_filter_options.offset(10) try: - connection_consumer = create_connection() + connection_consumer = environment.connection( + "amqp://guest:guest@localhost:5672/" + ) + connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(10), @@ -195,7 +221,7 @@ def test_stream_read_from_offset_ten(connection: Connection) -> None: management.delete_queue(stream_name) -def test_stream_filtering(connection: Connection) -> None: +def test_stream_filtering(connection: Connection, environment: Environment) -> None: consumer = None stream_name = "test_stream_info_with_filtering" @@ -213,7 +239,11 @@ def test_stream_filtering(connection: Connection) -> None: try: stream_filter_options = StreamOptions() stream_filter_options.filter_values(["banana"]) - connection_consumer = create_connection() + connection_consumer = environment.connection( + "amqp://guest:guest@localhost:5672/" + ) + connection_consumer.dial() + consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), @@ -231,7 +261,9 @@ def test_stream_filtering(connection: Connection) -> None: management.delete_queue(stream_name) -def test_stream_filtering_not_present(connection: Connection) -> None: +def test_stream_filtering_not_present( + connection: Connection, environment: Environment +) -> None: raised = False stream_name = "test_stream_info_with_filtering" @@ -248,7 +280,9 @@ def test_stream_filtering_not_present(connection: Connection) -> None: # consume and then publish stream_filter_options = StreamOptions() stream_filter_options.filter_values(["apple"]) - connection_consumer = create_connection() + connection_consumer = environment.connection("amqp://guest:guest@localhost:5672/") + connection_consumer.dial() + consumer = connection_consumer.consumer( addr_queue, stream_filter_options=stream_filter_options ) @@ -268,7 +302,9 @@ def test_stream_filtering_not_present(connection: Connection) -> None: assert raised is True -def test_stream_match_unfiltered(connection: Connection) -> None: +def test_stream_match_unfiltered( + connection: Connection, environment: Environment +) -> None: consumer = None stream_name = "test_stream_info_with_filtering" @@ -287,7 +323,10 @@ def test_stream_match_unfiltered(connection: Connection) -> None: stream_filter_options = StreamOptions() stream_filter_options.filter_values(["banana"]) stream_filter_options.filter_match_unfiltered(True) - connection_consumer = create_connection() + connection_consumer = environment.connection( + "amqp://guest:guest@localhost:5672/" + ) + connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), diff --git a/tests/utils.py b/tests/utils.py index 46c8de2..fc6db91 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -11,13 +11,6 @@ ) -def create_connection() -> Connection: - connection_consumer = Connection("amqp://guest:guest@localhost:5672/") - connection_consumer.dial() - - return connection_consumer - - def publish_messages( connection: Connection, messages_to_send: int, From f2807b30d0c945fb868e29bebce34a2b23764191 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Mon, 10 Feb 2025 17:07:04 +0100 Subject: [PATCH 2/8] introducing environment class --- examples/getting_started/basic_example.py | 2 +- examples/getting_started/example_with_streams.py | 2 +- examples/getting_started/reconnection_example.py | 4 +++- examples/getting_started/tls_example.py | 2 +- rabbitmq_amqp_python_client/environment.py | 8 ++++---- 5 files changed, 10 insertions(+), 8 deletions(-) diff --git a/examples/getting_started/basic_example.py b/examples/getting_started/basic_example.py index 932500d..f39501b 100644 --- a/examples/getting_started/basic_example.py +++ b/examples/getting_started/basic_example.py @@ -7,11 +7,11 @@ BindingSpecification, Connection, Disposition, + Environment, Event, ExchangeSpecification, Message, QuorumQueueSpecification, - Environment, ) MESSAGES_TO_PUBLISH = 100 diff --git a/examples/getting_started/example_with_streams.py b/examples/getting_started/example_with_streams.py index 9fd0b7a..49994f5 100644 --- a/examples/getting_started/example_with_streams.py +++ b/examples/getting_started/example_with_streams.py @@ -4,12 +4,12 @@ AddressHelper, AMQPMessagingHandler, Connection, + Environment, Event, Message, OffsetSpecification, StreamOptions, StreamSpecification, - Environment, ) MESSAGES_TO_PUBLISH = 100 diff --git a/examples/getting_started/reconnection_example.py b/examples/getting_started/reconnection_example.py index ff9079b..9c55c7d 100644 --- a/examples/getting_started/reconnection_example.py +++ b/examples/getting_started/reconnection_example.py @@ -12,16 +12,18 @@ Connection, ConnectionClosed, Consumer, + Environment, Event, ExchangeSpecification, Management, Message, Publisher, QuorumQueueSpecification, - Environment, ) environment = Environment() + + # here we keep track of the objects we need to reconnect @dataclass class ConnectionConfiguration: diff --git a/examples/getting_started/tls_example.py b/examples/getting_started/tls_example.py index c18f7a6..3985945 100644 --- a/examples/getting_started/tls_example.py +++ b/examples/getting_started/tls_example.py @@ -7,12 +7,12 @@ BindingSpecification, ClientCert, Connection, + Environment, Event, ExchangeSpecification, Message, QuorumQueueSpecification, SslConfigurationContext, - Environment, ) messages_to_publish = 100 diff --git a/rabbitmq_amqp_python_client/environment.py b/rabbitmq_amqp_python_client/environment.py index 14c3390..6b20c17 100644 --- a/rabbitmq_amqp_python_client/environment.py +++ b/rabbitmq_amqp_python_client/environment.py @@ -16,15 +16,15 @@ def __init__(self): # type: ignore def connection( self, # single-node mode - url: Optional[str] = None, + uri: Optional[str] = None, # multi-node mode - urls: Optional[list[str]] = None, + uris: Optional[list[str]] = None, ssl_context: Optional[SslConfigurationContext] = None, on_disconnection_handler: Optional[CB] = None, # type: ignore ) -> Connection: connection = Connection( - url=url, - urls=urls, + uri=uri, + uris=uris, ssl_context=ssl_context, on_disconnection_handler=on_disconnection_handler, ) From 42e7dbd13090d9079c60d25d1de2f58fc6e4a445 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Tue, 11 Feb 2025 14:35:12 +0100 Subject: [PATCH 3/8] adding closing connections logic --- rabbitmq_amqp_python_client/connection.py | 7 ++++++ rabbitmq_amqp_python_client/environment.py | 13 ++++++++-- tests/test_connection.py | 28 ++++++++++++++++++++++ 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index b991f34..b5e81f0 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -37,6 +37,11 @@ def __init__( self._on_disconnection_handler = on_disconnection_handler self._conf_ssl_context: Optional[SslConfigurationContext] = ssl_context self._ssl_domain = None + self._connections = [] # type: ignore + self._index: int = -1 + + def _set_environment_connection_list(self, connections: []): # type: ignore + self._connections = connections def dial(self) -> None: logger.debug("Establishing a connection to the amqp server") @@ -72,9 +77,11 @@ def management(self) -> Management: return self._management # closes the connection to the AMQP 1.0 server. + # This method should be called just from Environment and not from the user def _close(self) -> None: logger.debug("Closing connection") self._conn.close() + self._connections.remove(self) def publisher(self, destination: str) -> Publisher: if validate_address(destination) is False: diff --git a/rabbitmq_amqp_python_client/environment.py b/rabbitmq_amqp_python_client/environment.py index 6b20c17..5e7165e 100644 --- a/rabbitmq_amqp_python_client/environment.py +++ b/rabbitmq_amqp_python_client/environment.py @@ -1,8 +1,12 @@ +# For the moment this is just a Connection pooler to keep compatibility with other clients +import logging from typing import Annotated, Callable, Optional, TypeVar from .connection import Connection from .ssl_configuration import SslConfigurationContext +logger = logging.getLogger(__name__) + MT = TypeVar("MT") CB = Annotated[Callable[[MT], None], "Message callback type"] @@ -11,7 +15,7 @@ class Environment: def __init__(self): # type: ignore - self._connections = [] + self._connections: list[Connection] = [] def connection( self, @@ -28,10 +32,15 @@ def connection( ssl_context=ssl_context, on_disconnection_handler=on_disconnection_handler, ) - + logger.debug("Environment: Creating and returning a new connection") self._connections.append(connection) + connection._set_environment_connection_list(self._connections) return connection def close(self) -> None: + logger.debug("Environment: Closing all pending connections") for connection in self._connections: connection._close() + + def connections(self) -> list[Connection]: + return self._connections diff --git a/tests/test_connection.py b/tests/test_connection.py index 92969b0..0ff71a7 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -43,6 +43,34 @@ def test_connection_ssl() -> None: environment.close() +def test_environment_connections_management() -> None: + + environment = Environment() + connection = environment.connection("amqp://guest:guest@localhost:5672/") + connection.dial() + connection2 = environment.connection("amqp://guest:guest@localhost:5672/") + connection2.dial() + connection3 = environment.connection("amqp://guest:guest@localhost:5672/") + connection3.dial() + + assert len(environment.connections()) == 3 + + # this shouldn't happen but we test it anyway + connection._close() + + assert len(environment.connections()) == 2 + + connection2._close() + + assert len(environment.connections()) == 1 + + connection3._close() + + assert len(environment.connections()) == 0 + + environment.close() + + def test_connection_reconnection() -> None: reconnected = False From 8553249d0e885c924f522a2548f7bb977c5c8916 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Wed, 12 Feb 2025 09:40:54 +0100 Subject: [PATCH 4/8] replace Disposition to OutcomeState --- examples/getting_started/basic_example.py | 8 ++++---- rabbitmq_amqp_python_client/__init__.py | 5 +++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/examples/getting_started/basic_example.py b/examples/getting_started/basic_example.py index f39501b..992b823 100644 --- a/examples/getting_started/basic_example.py +++ b/examples/getting_started/basic_example.py @@ -6,11 +6,11 @@ AMQPMessagingHandler, BindingSpecification, Connection, - Disposition, Environment, Event, ExchangeSpecification, Message, + OutcomeState, QuorumQueueSpecification, ) @@ -126,11 +126,11 @@ def main() -> None: for i in range(MESSAGES_TO_PUBLISH): print("publishing") status = publisher.publish(Message(body="test")) - if status.remote_state == Disposition.ACCEPTED: + if status.remote_state == OutcomeState.ACCEPTED: print("message accepted") - elif status.remote_state == Disposition.RELEASED: + elif status.remote_state == OutcomeState.RELEASED: print("message not routed") - elif status.remote_state == Disposition.REJECTED: + elif status.remote_state == OutcomeState.REJECTED: print("message not rejected") publisher.close() diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index cc0f828..a7e2df1 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -40,6 +40,8 @@ del metadata +OutcomeState = Disposition + __all__ = [ "Connection", "Management", @@ -62,10 +64,9 @@ "ArgumentOutOfRangeException", "SslConfigurationContext", "ClientCert", - "Delivery", "ConnectionClosed", "StreamOptions", "OffsetSpecification", - "Disposition", + "OutcomeState", "Environment", ] From e348ffc46d630a62cfe95bf123ea5174bdacdebd Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Wed, 12 Feb 2025 14:13:14 +0100 Subject: [PATCH 5/8] making exchange arguments optional --- examples/getting_started/basic_example.py | 2 +- examples/getting_started/reconnection_example.py | 2 +- examples/getting_started/tls_example.py | 2 +- rabbitmq_amqp_python_client/entities.py | 6 +++--- tests/test_management.py | 4 ++-- tests/test_publisher.py | 2 +- 6 files changed, 9 insertions(+), 9 deletions(-) diff --git a/examples/getting_started/basic_example.py b/examples/getting_started/basic_example.py index 992b823..ecbbc4a 100644 --- a/examples/getting_started/basic_example.py +++ b/examples/getting_started/basic_example.py @@ -93,7 +93,7 @@ def main() -> None: management = connection.management() print("declaring exchange and queue") - management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) + management.declare_exchange(ExchangeSpecification(name=exchange_name)) management.declare_queue( QuorumQueueSpecification(name=queue_name) diff --git a/examples/getting_started/reconnection_example.py b/examples/getting_started/reconnection_example.py index 9c55c7d..751fd82 100644 --- a/examples/getting_started/reconnection_example.py +++ b/examples/getting_started/reconnection_example.py @@ -150,7 +150,7 @@ def main() -> None: print("declaring exchange and queue") connection_configuration.management.declare_exchange( - ExchangeSpecification(name=exchange_name, arguments={}) + ExchangeSpecification(name=exchange_name) ) connection_configuration.management.declare_queue( diff --git a/examples/getting_started/tls_example.py b/examples/getting_started/tls_example.py index 3985945..48ec488 100644 --- a/examples/getting_started/tls_example.py +++ b/examples/getting_started/tls_example.py @@ -93,7 +93,7 @@ def main() -> None: management = connection.management() print("declaring exchange and queue") - management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) + management.declare_exchange(ExchangeSpecification(name=exchange_name)) management.declare_queue( QuorumQueueSpecification(name=queue_name) diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index da85283..54c3e4a 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass +from dataclasses import dataclass, field from enum import Enum from typing import Any, Dict, Optional, Union @@ -13,7 +13,7 @@ @dataclass class ExchangeSpecification: name: str - arguments: dict[str, str] + arguments: dict[str, str] = field(default_factory=dict) exchange_type: ExchangeType = ExchangeType.direct is_auto_delete: bool = False is_internal: bool = False @@ -24,7 +24,7 @@ class ExchangeSpecification: class QueueInfo: name: str arguments: dict[str, Any] - queue_type: QueueType = QueueType.quorum + queue_type: QueueType = QueueType.classic is_exclusive: Optional[bool] = None is_auto_delete: bool = False is_durable: bool = True diff --git a/tests/test_management.py b/tests/test_management.py index f25b5f7..f3aba80 100644 --- a/tests/test_management.py +++ b/tests/test_management.py @@ -17,7 +17,7 @@ def test_declare_delete_exchange(management: Management) -> None: exchange_name = "test-exchange" exchange_info = management.declare_exchange( - ExchangeSpecification(name=exchange_name, arguments={}) + ExchangeSpecification(name=exchange_name) ) assert exchange_info.name == exchange_name @@ -43,7 +43,7 @@ def test_bind_exchange_to_queue(management: Management) -> None: queue_name = "test-bind-exchange-to-queue-queue" routing_key = "routing-key" - management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) + management.declare_exchange(ExchangeSpecification(name=exchange_name)) management.declare_queue(QuorumQueueSpecification(name=queue_name)) diff --git a/tests/test_publisher.py b/tests/test_publisher.py index a539686..6b8e512 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -97,7 +97,7 @@ def test_publish_exchange(connection: Connection) -> None: management = connection.management() routing_key = "routing-key" - management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) + management.declare_exchange(ExchangeSpecification(name=exchange_name)) management.declare_queue(QuorumQueueSpecification(name=queue_name)) From ec7b46d5eeb05ff083ad2d0f5233a41010884e5d Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Wed, 12 Feb 2025 14:38:43 +0100 Subject: [PATCH 6/8] adding message/consumer count in QueueInfo --- rabbitmq_amqp_python_client/management.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index fce5123..75edee1 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -369,4 +369,6 @@ def queue_info(self, name: str) -> QueueInfo: leader=queue_info["leader"], members=queue_info["replicas"], arguments=queue_info["arguments"], + message_count=queue_info["message_count"], + consumer_count=queue_info["consumer_count"], ) From e49cd9cd0be81363215c1fa7300c125ca4075d66 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Wed, 12 Feb 2025 17:33:58 +0100 Subject: [PATCH 7/8] moving examples and making close() method of connection public again --- README.md | 10 +++++----- examples/README.md | 6 ++++++ .../{basic_example.py => getting_started.py} | 0 .../reconnection_example.py | 0 .../example_with_streams.py | 0 examples/{getting_started => tls}/tls_example.py | 0 rabbitmq_amqp_python_client/connection.py | 3 +-- rabbitmq_amqp_python_client/environment.py | 3 ++- tests/test_connection.py | 6 +++--- 9 files changed, 17 insertions(+), 11 deletions(-) create mode 100644 examples/README.md rename examples/getting_started/{basic_example.py => getting_started.py} (100%) rename examples/{getting_started => reconnection}/reconnection_example.py (100%) rename examples/{getting_started => streams}/example_with_streams.py (100%) rename examples/{getting_started => tls}/tls_example.py (100%) diff --git a/README.md b/README.md index 8dd1621..837dad0 100644 --- a/README.md +++ b/README.md @@ -32,9 +32,9 @@ The client is distributed via [`PIP`](https://pypi.org/project/rabbitmq-amqp-pyt ## Getting Started -An example is provided [`here`](./examples/getting_started/basic_example.py) you can run it after starting a RabbitMQ 4.0 broker with: +An example is provided [`here`](./examples/getting_started/getting_started.py) you can run it after starting a RabbitMQ 4.0 broker with: -poetry run python ./examples/getting_started/main.py +poetry run python ./examples/getting_started/getting_started.py ### Creating a connection @@ -132,13 +132,13 @@ You can consume from a given offset or specify a default starting point (FIRST, Streams filtering is also supported: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering -You can check the [`stream example`](./examples/getting_started/example_with_streams.py) to see how to work with RabbitMQ streams. +You can check the [`stream example`](./examples/streams/example_with_streams.py) to see how to work with RabbitMQ streams. ### SSL connections The client supports TLS/SSL connections. -You can check the [`ssl example`](./examples/getting_started/tls_example.py) to see how to establish a secured connection +You can check the [`ssl example`](./examples/tls/tls_example.py) to see how to establish a secured connection ### Managing disconnections @@ -146,7 +146,7 @@ You can check the [`ssl example`](./examples/getting_started/tls_example.py) to At this stage the client doesn't support auto-reconnect but a callback is invoked everytime a remote disconnection is detected. You can use this callback to implement your own logic and eventually attempt a reconnection. -You can check the [`reconnection example`](./examples/getting_started/reconnection_example.py) to see how to manage disconnections and +You can check the [`reconnection example`](./examples/reconnection/reconnection_example.py) to see how to manage disconnections and eventually attempt a reconnection diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..4c12d6b --- /dev/null +++ b/examples/README.md @@ -0,0 +1,6 @@ +Client examples +=== + - [Getting started](./getting_started/getting_started.py) - Producer and Consumer example without reconnection + - [Reconnection](./reconnection/reconnection_example.py) - Producer and Consumer example with reconnection + - [TLS](./tls/tls_example.py) - Producer and Consumer using a TLS connection + - [Streams](./streams/example_with_streams.py) - Example supporting stream capabilities \ No newline at end of file diff --git a/examples/getting_started/basic_example.py b/examples/getting_started/getting_started.py similarity index 100% rename from examples/getting_started/basic_example.py rename to examples/getting_started/getting_started.py diff --git a/examples/getting_started/reconnection_example.py b/examples/reconnection/reconnection_example.py similarity index 100% rename from examples/getting_started/reconnection_example.py rename to examples/reconnection/reconnection_example.py diff --git a/examples/getting_started/example_with_streams.py b/examples/streams/example_with_streams.py similarity index 100% rename from examples/getting_started/example_with_streams.py rename to examples/streams/example_with_streams.py diff --git a/examples/getting_started/tls_example.py b/examples/tls/tls_example.py similarity index 100% rename from examples/getting_started/tls_example.py rename to examples/tls/tls_example.py diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index b5e81f0..468d79a 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -77,8 +77,7 @@ def management(self) -> Management: return self._management # closes the connection to the AMQP 1.0 server. - # This method should be called just from Environment and not from the user - def _close(self) -> None: + def close(self) -> None: logger.debug("Closing connection") self._conn.close() self._connections.remove(self) diff --git a/rabbitmq_amqp_python_client/environment.py b/rabbitmq_amqp_python_client/environment.py index 5e7165e..4e1a515 100644 --- a/rabbitmq_amqp_python_client/environment.py +++ b/rabbitmq_amqp_python_client/environment.py @@ -37,10 +37,11 @@ def connection( connection._set_environment_connection_list(self._connections) return connection + # closes all active connections def close(self) -> None: logger.debug("Environment: Closing all pending connections") for connection in self._connections: - connection._close() + connection.close() def connections(self) -> list[Connection]: return self._connections diff --git a/tests/test_connection.py b/tests/test_connection.py index 0ff71a7..763a01f 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -56,15 +56,15 @@ def test_environment_connections_management() -> None: assert len(environment.connections()) == 3 # this shouldn't happen but we test it anyway - connection._close() + connection.close() assert len(environment.connections()) == 2 - connection2._close() + connection2.close() assert len(environment.connections()) == 1 - connection3._close() + connection3.close() assert len(environment.connections()) == 0 From c10c248ce9d90860c0d4ae3aad3c523fa7c79c7f Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Thu, 13 Feb 2025 09:19:31 +0100 Subject: [PATCH 8/8] bumping version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 4d743ef..6638a19 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "rabbitmq-amqp-python-client" -version = "0.1.0-alpha.2" +version = "0.1.0-alpha.3" description = "Python RabbitMQ client for AMQP 1.0 protocol" authors = ["RabbitMQ team"] license = "Apache-2.0 license"