From a2e59932a39e0f9a186cbf04f07d1c12d7c08214 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 23 Apr 2025 12:43:30 +0200 Subject: [PATCH 01/20] use application data Signed-off-by: Gabriele Santomaggio --- examples/getting_started/getting_started.py | 5 +- .../qpid/proton/_exceptions.py | 7 ++ .../qpid/proton/_message.py | 67 +++++++++++-------- .../qpid/proton/_utils.py | 1 + 4 files changed, 49 insertions(+), 31 deletions(-) diff --git a/examples/getting_started/getting_started.py b/examples/getting_started/getting_started.py index bbd6f60..02757ab 100644 --- a/examples/getting_started/getting_started.py +++ b/examples/getting_started/getting_started.py @@ -24,7 +24,7 @@ def __init__(self): self._count = 0 def on_amqp_message(self, event: Event): - print("received message: " + str(event.message.body)) + print("received message: {} ".format(''.join(map(chr, event.message.body)))) # accepting self.delivery_context.accept(event) @@ -79,7 +79,6 @@ def create_connection(environment: Environment) -> Connection: def main() -> None: - exchange_name = "test-exchange" queue_name = "example-queue" routing_key = "routing-key" @@ -123,7 +122,7 @@ def main() -> None: # publish 10 messages for i in range(MESSAGES_TO_PUBLISH): print("publishing") - status = publisher.publish(Message(body="test")) + status = publisher.publish(Message(body=str.encode("test message {} ".format(i)))) if status.remote_state == OutcomeState.ACCEPTED: print("message accepted") elif status.remote_state == OutcomeState.RELEASED: diff --git a/rabbitmq_amqp_python_client/qpid/proton/_exceptions.py b/rabbitmq_amqp_python_client/qpid/proton/_exceptions.py index 806961a..fee9e1f 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_exceptions.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_exceptions.py @@ -58,6 +58,13 @@ class MessageException(ProtonException): pass +class ArgumentOutOfRangeException(MessageException): + """ + An exception class raised when an argument is out of range. + """ + + pass + class DataException(ProtonException): """ diff --git a/rabbitmq_amqp_python_client/qpid/proton/_message.py b/rabbitmq_amqp_python_client/qpid/proton/_message.py index 9c0b655..3911bf8 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_message.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_message.py @@ -84,7 +84,7 @@ from ._common import millis2secs, secs2millis from ._data import AnnotationDict, Data, char, symbol, ulong from ._endpoints import Link -from ._exceptions import EXCEPTIONS, MessageException +from ._exceptions import EXCEPTIONS, MessageException, ArgumentOutOfRangeException if TYPE_CHECKING: from proton._data import Described, PythonAMQPData @@ -110,17 +110,26 @@ class Message(object): """ Default AMQP message priority""" def __init__( - self, - body: Union[ - bytes, str, dict, list, int, float, "UUID", "Described", None - ] = None, - **kwargs + self, + body: Union[ + bytes, dict, None + ] = None, + **kwargs ) -> None: + # validate the types + + if not isinstance(body, (bytes, dict, type(None))): + raise ArgumentOutOfRangeException( + "Message body must be of type bytes, dict or None" + ) + self._msg = pn_message() self.instructions = None self.annotations = None self.properties = None self.body = body + self.inferred = True + for k, v in kwargs.items(): getattr(self, k) # Raise exception if it's not a valid attribute. setattr(self, k, v) @@ -236,7 +245,8 @@ def inferred(self) -> bool: :raise: :exc:`MessageException` if there is any Proton error when using the setter. """ - return pn_message_is_inferred(self._msg) + x = pn_message_is_inferred(self._msg) + return x @inferred.setter def inferred(self, value: bool) -> None: @@ -503,7 +513,7 @@ def instructions(self) -> Optional[AnnotationDict]: @instructions.setter def instructions( - self, instructions: Optional[Dict[Union[str, int], "PythonAMQPData"]] + self, instructions: Optional[Dict[Union[str, int], "PythonAMQPData"]] ) -> None: if isinstance(instructions, dict): self.instruction_dict = AnnotationDict(instructions, raise_on_error=False) @@ -526,7 +536,7 @@ def annotations(self) -> Optional[AnnotationDict]: @annotations.setter def annotations( - self, annotations: Optional[Dict[Union[str, int], "PythonAMQPData"]] + self, annotations: Optional[Dict[Union[str, int], "PythonAMQPData"]] ) -> None: if isinstance(annotations, dict): self.annotation_dict = AnnotationDict(annotations, raise_on_error=False) @@ -593,7 +603,8 @@ def send(self, sender: "Sender", tag: Optional[str] = None) -> "Delivery": return dlv @overload - def recv(self, link: "Sender") -> None: ... + def recv(self, link: "Sender") -> None: + ... def recv(self, link: "Receiver") -> Optional["Delivery"]: """ @@ -624,24 +635,24 @@ def recv(self, link: "Receiver") -> Optional["Delivery"]: def __repr__(self) -> str: props = [] for attr in ( - "inferred", - "address", - "reply_to", - "durable", - "ttl", - "priority", - "first_acquirer", - "delivery_count", - "id", - "correlation_id", - "user_id", - "group_id", - "group_sequence", - "reply_to_group_id", - "instructions", - "annotations", - "properties", - "body", + "inferred", + "address", + "reply_to", + "durable", + "ttl", + "priority", + "first_acquirer", + "delivery_count", + "id", + "correlation_id", + "user_id", + "group_id", + "group_sequence", + "reply_to_group_id", + "instructions", + "annotations", + "properties", + "body", ): value = getattr(self, attr) if value: diff --git a/rabbitmq_amqp_python_client/qpid/proton/_utils.py b/rabbitmq_amqp_python_client/qpid/proton/_utils.py index 4be2ff2..8889ea7 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_utils.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_utils.py @@ -176,6 +176,7 @@ def send( :return: Delivery object for this message. """ + delivery = self.link.send(msg) self.connection.wait( lambda: _is_settled(delivery), From 62fe4461763a7ab7b9b32c966fd161d601d4194d Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 23 Apr 2025 12:48:13 +0200 Subject: [PATCH 02/20] format Signed-off-by: Gabriele Santomaggio --- examples/getting_started/getting_started.py | 6 ++- .../qpid/proton/_exceptions.py | 1 + .../qpid/proton/_message.py | 51 ++++++++----------- .../qpid/proton/_utils.py | 1 - 4 files changed, 27 insertions(+), 32 deletions(-) diff --git a/examples/getting_started/getting_started.py b/examples/getting_started/getting_started.py index 02757ab..bfd468f 100644 --- a/examples/getting_started/getting_started.py +++ b/examples/getting_started/getting_started.py @@ -24,7 +24,7 @@ def __init__(self): self._count = 0 def on_amqp_message(self, event: Event): - print("received message: {} ".format(''.join(map(chr, event.message.body)))) + print("received message: {} ".format("".join(map(chr, event.message.body)))) # accepting self.delivery_context.accept(event) @@ -122,7 +122,9 @@ def main() -> None: # publish 10 messages for i in range(MESSAGES_TO_PUBLISH): print("publishing") - status = publisher.publish(Message(body=str.encode("test message {} ".format(i)))) + status = publisher.publish( + Message(body=str.encode("test message {} ".format(i))) + ) if status.remote_state == OutcomeState.ACCEPTED: print("message accepted") elif status.remote_state == OutcomeState.RELEASED: diff --git a/rabbitmq_amqp_python_client/qpid/proton/_exceptions.py b/rabbitmq_amqp_python_client/qpid/proton/_exceptions.py index fee9e1f..473faa2 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_exceptions.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_exceptions.py @@ -58,6 +58,7 @@ class MessageException(ProtonException): pass + class ArgumentOutOfRangeException(MessageException): """ An exception class raised when an argument is out of range. diff --git a/rabbitmq_amqp_python_client/qpid/proton/_message.py b/rabbitmq_amqp_python_client/qpid/proton/_message.py index 3911bf8..b3d3766 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_message.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_message.py @@ -109,13 +109,7 @@ class Message(object): DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY """ Default AMQP message priority""" - def __init__( - self, - body: Union[ - bytes, dict, None - ] = None, - **kwargs - ) -> None: + def __init__(self, body: Union[bytes, dict, None] = None, **kwargs) -> None: # validate the types if not isinstance(body, (bytes, dict, type(None))): @@ -513,7 +507,7 @@ def instructions(self) -> Optional[AnnotationDict]: @instructions.setter def instructions( - self, instructions: Optional[Dict[Union[str, int], "PythonAMQPData"]] + self, instructions: Optional[Dict[Union[str, int], "PythonAMQPData"]] ) -> None: if isinstance(instructions, dict): self.instruction_dict = AnnotationDict(instructions, raise_on_error=False) @@ -536,7 +530,7 @@ def annotations(self) -> Optional[AnnotationDict]: @annotations.setter def annotations( - self, annotations: Optional[Dict[Union[str, int], "PythonAMQPData"]] + self, annotations: Optional[Dict[Union[str, int], "PythonAMQPData"]] ) -> None: if isinstance(annotations, dict): self.annotation_dict = AnnotationDict(annotations, raise_on_error=False) @@ -603,8 +597,7 @@ def send(self, sender: "Sender", tag: Optional[str] = None) -> "Delivery": return dlv @overload - def recv(self, link: "Sender") -> None: - ... + def recv(self, link: "Sender") -> None: ... def recv(self, link: "Receiver") -> Optional["Delivery"]: """ @@ -635,24 +628,24 @@ def recv(self, link: "Receiver") -> Optional["Delivery"]: def __repr__(self) -> str: props = [] for attr in ( - "inferred", - "address", - "reply_to", - "durable", - "ttl", - "priority", - "first_acquirer", - "delivery_count", - "id", - "correlation_id", - "user_id", - "group_id", - "group_sequence", - "reply_to_group_id", - "instructions", - "annotations", - "properties", - "body", + "inferred", + "address", + "reply_to", + "durable", + "ttl", + "priority", + "first_acquirer", + "delivery_count", + "id", + "correlation_id", + "user_id", + "group_id", + "group_sequence", + "reply_to_group_id", + "instructions", + "annotations", + "properties", + "body", ): value = getattr(self, attr) if value: diff --git a/rabbitmq_amqp_python_client/qpid/proton/_utils.py b/rabbitmq_amqp_python_client/qpid/proton/_utils.py index 8889ea7..4be2ff2 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_utils.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_utils.py @@ -176,7 +176,6 @@ def send( :return: Delivery object for this message. """ - delivery = self.link.send(msg) self.connection.wait( lambda: _is_settled(delivery), From 26bb1b4864d63579718cf7aed2e4f0645a4e778d Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 23 Apr 2025 14:38:57 +0200 Subject: [PATCH 03/20] change tests Signed-off-by: Gabriele Santomaggio --- .ci/ubuntu/gha-setup.sh | 2 +- .gitignore | 1 + examples/getting_started/getting_started.py | 5 ++-- rabbitmq_amqp_python_client/utils.py | 26 +++++++++++++++++++++ tests/conftest.py | 3 +-- tests/test_consumer.py | 3 ++- tests/test_publisher.py | 18 +++++++------- tests/utils.py | 14 +++++------ 8 files changed, 50 insertions(+), 22 deletions(-) diff --git a/.ci/ubuntu/gha-setup.sh b/.ci/ubuntu/gha-setup.sh index 6eb91c8..09dc291 100755 --- a/.ci/ubuntu/gha-setup.sh +++ b/.ci/ubuntu/gha-setup.sh @@ -7,7 +7,7 @@ set -o xtrace script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" readonly script_dir echo "[INFO] script_dir: '$script_dir'" -readonly rabbitmq_image=rabbitmq:4.1.0-beta.4-management-alpine +readonly rabbitmq_image=rabbitmq:4.1.0-alpine readonly docker_name_prefix='rabbitmq-amqp-python-client' diff --git a/.gitignore b/.gitignore index bbc20c7..d884fbe 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ __pycache__/ local* .githooks/ .venv/ +.ci/ubuntu/log/* diff --git a/examples/getting_started/getting_started.py b/examples/getting_started/getting_started.py index bfd468f..0f7b331 100644 --- a/examples/getting_started/getting_started.py +++ b/examples/getting_started/getting_started.py @@ -13,6 +13,7 @@ OutcomeState, QuorumQueueSpecification, ) +from rabbitmq_amqp_python_client.utils import string_to_bytes, bytes_to_string MESSAGES_TO_PUBLISH = 100 @@ -24,7 +25,7 @@ def __init__(self): self._count = 0 def on_amqp_message(self, event: Event): - print("received message: {} ".format("".join(map(chr, event.message.body)))) + print("received message: {} ".format(bytes_to_string(event.message.body))) # accepting self.delivery_context.accept(event) @@ -123,7 +124,7 @@ def main() -> None: for i in range(MESSAGES_TO_PUBLISH): print("publishing") status = publisher.publish( - Message(body=str.encode("test message {} ".format(i))) + Message(body=string_to_bytes("test message {} ".format(i))) ) if status.remote_state == OutcomeState.ACCEPTED: print("message accepted") diff --git a/rabbitmq_amqp_python_client/utils.py b/rabbitmq_amqp_python_client/utils.py index cbc1a35..54fbcb6 100644 --- a/rabbitmq_amqp_python_client/utils.py +++ b/rabbitmq_amqp_python_client/utils.py @@ -7,3 +7,29 @@ def validate_annotations(annotations: []) -> bool: # type: ignore validated = False return validated return validated + + +def bytes_to_string(body: bytes) -> str: + """ + Convert the body of a message to a string. + + Args: + body: The body of the message + + Returns: + str: The string representation of the body + """ + return "".join(map(chr, body)) + + +def string_to_bytes(body: str) -> bytes: + """ + Convert a string to the body of a message. + + Args: + body: The string to convert + + Returns: + bytes: The byte representation of the string + """ + return str.encode(body) diff --git a/tests/conftest.py b/tests/conftest.py index 2552463..1ed6742 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -263,8 +263,7 @@ def __init__(self): self._received = 0 def on_message(self, event: Event): - annotations = {} - annotations[symbol("x-opt-string")] = "x-test1" + annotations = {symbol("x-opt-string"): "x-test1"} self.delivery_context.requeue_with_annotations(event, annotations) self._received = self._received + 1 if self._received == 1000: diff --git a/tests/test_consumer.py b/tests/test_consumer.py index db9bc6a..34d00e9 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -5,6 +5,7 @@ Environment, QuorumQueueSpecification, ) +from rabbitmq_amqp_python_client.utils import bytes_to_string from .conftest import ( ConsumerTestException, @@ -42,7 +43,7 @@ def test_consumer_sync_queue_accept(connection: Connection) -> None: # consumer synchronously without handler for i in range(messages_to_send): message = consumer.consume() - if message.body == "test" + str(i): + if bytes_to_string(message.body) == "test{}".format(i): consumed = consumed + 1 consumer.close() diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 4d21c7c..f0038f9 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -13,6 +13,7 @@ StreamSpecification, ValidationCodeException, ) +from rabbitmq_amqp_python_client.utils import string_to_bytes from .http_requests import delete_all_connections from .utils import create_binding, publish_per_message @@ -34,7 +35,7 @@ def test_publish_queue(connection: Connection) -> None: publisher = connection.publisher( destination=AddressHelper.queue_address(queue_name) ) - status = publisher.publish(Message(body="test")) + status = publisher.publish(Message(body=string_to_bytes("test"))) if status.remote_state == OutcomeState.ACCEPTED: accepted = True except Exception: @@ -130,7 +131,7 @@ def test_publish_to_invalid_destination(connection: Connection) -> None: publisher = None try: publisher = connection.publisher("/invalid-destination/" + queue_name) - publisher.publish(Message(body="test")) + publisher.publish(Message(body=string_to_bytes("test"))) except ArgumentOutOfRangeException: raised = True except Exception: @@ -147,7 +148,7 @@ def test_publish_per_message_to_invalid_destination(connection: Connection) -> N queue_name = "test-queue-1" raised = False - message = Message(body="test") + message = Message(body=string_to_bytes("test")) message = AddressHelper.message_to_address_helper( message, "/invalid_destination/" + queue_name ) @@ -179,7 +180,7 @@ def test_publish_per_message_both_address(connection: Connection) -> None: ) try: - message = Message(body="test") + message = Message(body=string_to_bytes("test")) message = AddressHelper.message_to_address_helper( message, AddressHelper.queue_address(queue_name) ) @@ -212,7 +213,7 @@ def test_publish_exchange(connection: Connection) -> None: try: publisher = connection.publisher(addr) - status = publisher.publish(Message(body="test")) + status = publisher.publish(Message(body=string_to_bytes("test"))) if status.ACCEPTED: accepted = True except Exception: @@ -244,7 +245,7 @@ def test_publish_purge(connection: Connection) -> None: destination=AddressHelper.queue_address(queue_name) ) for i in range(messages_to_publish): - publisher.publish(Message(body="test")) + publisher.publish(Message(body=string_to_bytes("test"))) except Exception: raised = True @@ -289,7 +290,7 @@ def test_disconnection_reconnection() -> None: # simulate a disconnection delete_all_connections() try: - publisher.publish(Message(body="test")) + publisher.publish(Message(body=string_to_bytes("test"))) except ConnectionClosed: disconnected = True @@ -331,8 +332,7 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None: ) for i in range(messages_to_send): - - publisher.publish(Message(body="test")) + publisher.publish(Message(body=string_to_bytes("test"))) def test_publish_per_message_exchange(connection: Connection) -> None: diff --git a/tests/utils.py b/tests/utils.py index 2b0a7f0..7fd3186 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -16,6 +16,7 @@ Publisher, QuorumQueueSpecification, ) +from rabbitmq_amqp_python_client.utils import string_to_bytes def publish_messages( @@ -26,25 +27,26 @@ def publish_messages( ) -> None: annotations = {} if filters is not None: - for filter in filters: - annotations = {"x-stream-filter-value": filter} + for filterItem in filters: + annotations = {"x-stream-filter-value": filterItem} publisher = connection.publisher("/queues/" + queue_name) # publish messages_to_send messages for i in range(messages_to_send): - publisher.publish(Message(body="test" + str(i), annotations=annotations)) + publisher.publish( + Message(body=string_to_bytes("test{}".format(i)), annotations=annotations) + ) publisher.close() def publish_per_message(publisher: Publisher, addr: str) -> Delivery: - message = Message(body="test") + message = Message(body=string_to_bytes("test")) message = AddressHelper.message_to_address_helper(message, addr) status = publisher.publish(message) return status def setup_dead_lettering(management: Management) -> str: - exchange_dead_lettering = "exchange-dead-letter" queue_dead_lettering = "queue-dead-letter" binding_key = "key_dead_letter" @@ -72,7 +74,6 @@ def setup_dead_lettering(management: Management) -> str: def create_binding( management: Management, exchange_name: str, queue_name: str, routing_key: str ) -> str: - management.declare_exchange(ExchangeSpecification(name=exchange_name)) management.declare_queue(QuorumQueueSpecification(name=queue_name)) @@ -89,7 +90,6 @@ def create_binding( def cleanup_dead_lettering(management: Management, bind_path: str) -> None: - exchange_dead_lettering = "exchange-dead-letter" queue_dead_lettering = "queue-dead-letter" From ac150eb19fb056007a0c00ae0672284bac768bd3 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 23 Apr 2025 14:42:33 +0200 Subject: [PATCH 04/20] format Signed-off-by: Gabriele Santomaggio --- examples/getting_started/getting_started.py | 5 ++++- rabbitmq_amqp_python_client/qpid/proton/_message.py | 6 +++++- rabbitmq_amqp_python_client/qpid/proton/_tracing.py | 6 ++++-- rabbitmq_amqp_python_client/qpid/proton/_transport.py | 3 ++- tests/test_consumer.py | 4 +++- tests/test_publisher.py | 4 +++- tests/utils.py | 4 +++- 7 files changed, 24 insertions(+), 8 deletions(-) diff --git a/examples/getting_started/getting_started.py b/examples/getting_started/getting_started.py index 0f7b331..6393966 100644 --- a/examples/getting_started/getting_started.py +++ b/examples/getting_started/getting_started.py @@ -13,7 +13,10 @@ OutcomeState, QuorumQueueSpecification, ) -from rabbitmq_amqp_python_client.utils import string_to_bytes, bytes_to_string +from rabbitmq_amqp_python_client.utils import ( + bytes_to_string, + string_to_bytes, +) MESSAGES_TO_PUBLISH = 100 diff --git a/rabbitmq_amqp_python_client/qpid/proton/_message.py b/rabbitmq_amqp_python_client/qpid/proton/_message.py index b3d3766..656f9ec 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_message.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_message.py @@ -84,7 +84,11 @@ from ._common import millis2secs, secs2millis from ._data import AnnotationDict, Data, char, symbol, ulong from ._endpoints import Link -from ._exceptions import EXCEPTIONS, MessageException, ArgumentOutOfRangeException +from ._exceptions import ( + EXCEPTIONS, + ArgumentOutOfRangeException, + MessageException, +) if TYPE_CHECKING: from proton._data import Described, PythonAMQPData diff --git a/rabbitmq_amqp_python_client/qpid/proton/_tracing.py b/rabbitmq_amqp_python_client/qpid/proton/_tracing.py index 0c02d12..ec1649b 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_tracing.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_tracing.py @@ -32,8 +32,10 @@ import proton from proton import Sender as ProtonSender -from proton.handlers import IncomingMessageHandler as ProtonIncomingMessageHandler -from proton.handlers import OutgoingMessageHandler as ProtonOutgoingMessageHandler +from proton.handlers import \ + IncomingMessageHandler as ProtonIncomingMessageHandler +from proton.handlers import \ + OutgoingMessageHandler as ProtonOutgoingMessageHandler _tracer = None _trace_key = proton.symbol("x-opt-qpid-tracestate") diff --git a/rabbitmq_amqp_python_client/qpid/proton/_transport.py b/rabbitmq_amqp_python_client/qpid/proton/_transport.py index d4a1702..1e5a31e 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_transport.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_transport.py @@ -139,7 +139,8 @@ if TYPE_CHECKING: from ._condition import Condition - from ._endpoints import Connection # would produce circular import + from ._endpoints import \ + Connection # would produce circular import class TraceAdapter: diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 34d00e9..1f38dee 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -5,7 +5,9 @@ Environment, QuorumQueueSpecification, ) -from rabbitmq_amqp_python_client.utils import bytes_to_string +from rabbitmq_amqp_python_client.utils import ( + bytes_to_string, +) from .conftest import ( ConsumerTestException, diff --git a/tests/test_publisher.py b/tests/test_publisher.py index f0038f9..fcffe35 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -13,7 +13,9 @@ StreamSpecification, ValidationCodeException, ) -from rabbitmq_amqp_python_client.utils import string_to_bytes +from rabbitmq_amqp_python_client.utils import ( + string_to_bytes, +) from .http_requests import delete_all_connections from .utils import create_binding, publish_per_message diff --git a/tests/utils.py b/tests/utils.py index 7fd3186..bdc7ad2 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -16,7 +16,9 @@ Publisher, QuorumQueueSpecification, ) -from rabbitmq_amqp_python_client.utils import string_to_bytes +from rabbitmq_amqp_python_client.utils import ( + string_to_bytes, +) def publish_messages( From e73a936cba89c1597d7b9f1b13529a003da7eef9 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 23 Apr 2025 15:27:04 +0200 Subject: [PATCH 05/20] format Signed-off-by: Gabriele Santomaggio --- rabbitmq_amqp_python_client/qpid/proton/_tracing.py | 6 ++---- rabbitmq_amqp_python_client/qpid/proton/_transport.py | 3 +-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/rabbitmq_amqp_python_client/qpid/proton/_tracing.py b/rabbitmq_amqp_python_client/qpid/proton/_tracing.py index ec1649b..0c02d12 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_tracing.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_tracing.py @@ -32,10 +32,8 @@ import proton from proton import Sender as ProtonSender -from proton.handlers import \ - IncomingMessageHandler as ProtonIncomingMessageHandler -from proton.handlers import \ - OutgoingMessageHandler as ProtonOutgoingMessageHandler +from proton.handlers import IncomingMessageHandler as ProtonIncomingMessageHandler +from proton.handlers import OutgoingMessageHandler as ProtonOutgoingMessageHandler _tracer = None _trace_key = proton.symbol("x-opt-qpid-tracestate") diff --git a/rabbitmq_amqp_python_client/qpid/proton/_transport.py b/rabbitmq_amqp_python_client/qpid/proton/_transport.py index 1e5a31e..d4a1702 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_transport.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_transport.py @@ -139,8 +139,7 @@ if TYPE_CHECKING: from ._condition import Condition - from ._endpoints import \ - Connection # would produce circular import + from ._endpoints import Connection # would produce circular import class TraceAdapter: From 847c08c1702619ccd89b2421a2b1e4267add9dce Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 23 Apr 2025 20:18:55 +0200 Subject: [PATCH 06/20] converter Signed-off-by: Gabriele Santomaggio --- .ci/ubuntu/gha-setup.sh | 2 +- examples/tls/tls_example.py | 11 ++++++---- rabbitmq_amqp_python_client/__init__.py | 2 ++ rabbitmq_amqp_python_client/converter.py | 27 ++++++++++++++++++++++++ rabbitmq_amqp_python_client/utils.py | 26 ----------------------- tests/test_consumer.py | 7 ++---- tests/test_publisher.py | 21 ++++++++---------- tests/utils.py | 18 +++++++--------- 8 files changed, 56 insertions(+), 58 deletions(-) create mode 100644 rabbitmq_amqp_python_client/converter.py diff --git a/.ci/ubuntu/gha-setup.sh b/.ci/ubuntu/gha-setup.sh index 09dc291..6eb91c8 100755 --- a/.ci/ubuntu/gha-setup.sh +++ b/.ci/ubuntu/gha-setup.sh @@ -7,7 +7,7 @@ set -o xtrace script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" readonly script_dir echo "[INFO] script_dir: '$script_dir'" -readonly rabbitmq_image=rabbitmq:4.1.0-alpine +readonly rabbitmq_image=rabbitmq:4.1.0-beta.4-management-alpine readonly docker_name_prefix='rabbitmq-amqp-python-client' diff --git a/examples/tls/tls_example.py b/examples/tls/tls_example.py index c6eb0af..68ee25b 100644 --- a/examples/tls/tls_example.py +++ b/examples/tls/tls_example.py @@ -1,4 +1,5 @@ # type: ignore +import os import sys from traceback import print_exception @@ -79,15 +80,14 @@ def create_connection(environment: Environment) -> Connection: def main() -> None: - exchange_name = "test-exchange" queue_name = "example-queue" routing_key = "routing-key" ca_p12_store = ".ci/certs/ca.p12" ca_cert_file = ".ci/certs/ca_certificate.pem" - client_cert = ".ci/certs/client_certificate.pem" - client_key = ".ci/certs/client_key.pem" - client_p12_store = ".ci/certs/client.p12" + client_cert = ".ci/certs/client_localhost_certificate.pem" + client_key = ".ci/certs/client_localhost_key.pem" + client_p12_store = ".ci/certs/client_localhost.p12" uri = "amqps://guest:guest@localhost:5671/" if sys.platform == "win32": @@ -138,6 +138,9 @@ def main() -> None: "connection failed. working directory should be project root" ) else: + print(" ca_cert_file {}".format(os.path.isfile(ca_cert_file))) + print(" client_cert {}".format(os.path.isfile(client_cert))) + print(" client_key {}".format(os.path.isfile(client_key))) environment = Environment( uri, ssl_context=PosixSslConfigurationContext( diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index cdb623a..55f3c5c 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -5,6 +5,7 @@ from .common import ExchangeType, QueueType from .connection import Connection from .consumer import Consumer +from .converter import Converter from .entities import ( ExchangeCustomSpecification, ExchangeSpecification, @@ -91,4 +92,5 @@ "ExchangeCustomSpecification", "RecoveryConfiguration", "OAuth2Options", + "Converter", ] diff --git a/rabbitmq_amqp_python_client/converter.py b/rabbitmq_amqp_python_client/converter.py new file mode 100644 index 0000000..e813716 --- /dev/null +++ b/rabbitmq_amqp_python_client/converter.py @@ -0,0 +1,27 @@ +class Converter: + + @staticmethod + def bytes_to_string(body: bytes) -> str: + """ + Convert the body of a message to a string. + + Args: + body: The body of the message + + Returns: + str: The string representation of the body + """ + return "".join(map(chr, body)) + + @staticmethod + def string_to_bytes(body: str) -> bytes: + """ + Convert a string to the body of a message. + + Args: + body: The string to convert + + Returns: + bytes: The byte representation of the string + """ + return str.encode(body) diff --git a/rabbitmq_amqp_python_client/utils.py b/rabbitmq_amqp_python_client/utils.py index 54fbcb6..cbc1a35 100644 --- a/rabbitmq_amqp_python_client/utils.py +++ b/rabbitmq_amqp_python_client/utils.py @@ -7,29 +7,3 @@ def validate_annotations(annotations: []) -> bool: # type: ignore validated = False return validated return validated - - -def bytes_to_string(body: bytes) -> str: - """ - Convert the body of a message to a string. - - Args: - body: The body of the message - - Returns: - str: The string representation of the body - """ - return "".join(map(chr, body)) - - -def string_to_bytes(body: str) -> bytes: - """ - Convert a string to the body of a message. - - Args: - body: The string to convert - - Returns: - bytes: The byte representation of the string - """ - return str.encode(body) diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 1f38dee..d4217d3 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -3,10 +3,7 @@ ArgumentOutOfRangeException, Connection, Environment, - QuorumQueueSpecification, -) -from rabbitmq_amqp_python_client.utils import ( - bytes_to_string, + QuorumQueueSpecification, Converter, ) from .conftest import ( @@ -45,7 +42,7 @@ def test_consumer_sync_queue_accept(connection: Connection) -> None: # consumer synchronously without handler for i in range(messages_to_send): message = consumer.consume() - if bytes_to_string(message.body) == "test{}".format(i): + if Converter.bytes_to_string(message.body) == "test{}".format(i): consumed = consumed + 1 consumer.close() diff --git a/tests/test_publisher.py b/tests/test_publisher.py index fcffe35..7f5687e 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -11,10 +11,7 @@ QuorumQueueSpecification, RecoveryConfiguration, StreamSpecification, - ValidationCodeException, -) -from rabbitmq_amqp_python_client.utils import ( - string_to_bytes, + ValidationCodeException, Converter, ) from .http_requests import delete_all_connections @@ -37,7 +34,7 @@ def test_publish_queue(connection: Connection) -> None: publisher = connection.publisher( destination=AddressHelper.queue_address(queue_name) ) - status = publisher.publish(Message(body=string_to_bytes("test"))) + status = publisher.publish(Message(body=Converter.string_to_bytes("test"))) if status.remote_state == OutcomeState.ACCEPTED: accepted = True except Exception: @@ -112,7 +109,7 @@ def test_publish_ssl(connection_ssl: Connection) -> None: publisher = connection_ssl.publisher( destination=AddressHelper.queue_address(queue_name) ) - publisher.publish(Message(body="test")) + publisher.publish(Message(body=Converter.string_to_bytes("test"))) except Exception: raised = True @@ -150,7 +147,7 @@ def test_publish_per_message_to_invalid_destination(connection: Connection) -> N queue_name = "test-queue-1" raised = False - message = Message(body=string_to_bytes("test")) + message = Message(body=Converter.string_to_bytes("test")) message = AddressHelper.message_to_address_helper( message, "/invalid_destination/" + queue_name ) @@ -182,7 +179,7 @@ def test_publish_per_message_both_address(connection: Connection) -> None: ) try: - message = Message(body=string_to_bytes("test")) + message = Message(body=Converter.string_to_bytes("test")) message = AddressHelper.message_to_address_helper( message, AddressHelper.queue_address(queue_name) ) @@ -215,7 +212,7 @@ def test_publish_exchange(connection: Connection) -> None: try: publisher = connection.publisher(addr) - status = publisher.publish(Message(body=string_to_bytes("test"))) + status = publisher.publish(Message(body=Converter.string_to_bytes("test"))) if status.ACCEPTED: accepted = True except Exception: @@ -247,7 +244,7 @@ def test_publish_purge(connection: Connection) -> None: destination=AddressHelper.queue_address(queue_name) ) for i in range(messages_to_publish): - publisher.publish(Message(body=string_to_bytes("test"))) + publisher.publish(Message(body=Converter.string_to_bytes("test"))) except Exception: raised = True @@ -292,7 +289,7 @@ def test_disconnection_reconnection() -> None: # simulate a disconnection delete_all_connections() try: - publisher.publish(Message(body=string_to_bytes("test"))) + publisher.publish(Message(body=Converter.string_to_bytes("test"))) except ConnectionClosed: disconnected = True @@ -334,7 +331,7 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None: ) for i in range(messages_to_send): - publisher.publish(Message(body=string_to_bytes("test"))) + publisher.publish(Message(body=Converter.string_to_bytes("test"))) def test_publish_per_message_exchange(connection: Connection) -> None: diff --git a/tests/utils.py b/tests/utils.py index bdc7ad2..1b3f062 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -15,17 +15,15 @@ Message, Publisher, QuorumQueueSpecification, -) -from rabbitmq_amqp_python_client.utils import ( - string_to_bytes, + Converter, ) def publish_messages( - connection: Connection, - messages_to_send: int, - queue_name, - filters: Optional[list[str]] = None, + connection: Connection, + messages_to_send: int, + queue_name, + filters: Optional[list[str]] = None, ) -> None: annotations = {} if filters is not None: @@ -36,13 +34,13 @@ def publish_messages( # publish messages_to_send messages for i in range(messages_to_send): publisher.publish( - Message(body=string_to_bytes("test{}".format(i)), annotations=annotations) + Message(body=Converter.string_to_bytes("test{}".format(i)), annotations=annotations) ) publisher.close() def publish_per_message(publisher: Publisher, addr: str) -> Delivery: - message = Message(body=string_to_bytes("test")) + message = Message(body=Converter.string_to_bytes("test")) message = AddressHelper.message_to_address_helper(message, addr) status = publisher.publish(message) return status @@ -74,7 +72,7 @@ def setup_dead_lettering(management: Management) -> str: def create_binding( - management: Management, exchange_name: str, queue_name: str, routing_key: str + management: Management, exchange_name: str, queue_name: str, routing_key: str ) -> str: management.declare_exchange(ExchangeSpecification(name=exchange_name)) From 9bae5ca6f831a8c5eb5bd1b3e51594c899bf0300 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 2 May 2025 15:13:06 +0200 Subject: [PATCH 07/20] converter Signed-off-by: Gabriele Santomaggio --- .ci/ubuntu/gha-setup.sh | 2 +- examples/getting_started/getting_started.py | 17 +++----- rabbitmq_amqp_python_client/connection.py | 1 + .../qpid/proton/_message.py | 43 ++++++++++--------- tests/test_connection.py | 11 +++++ 5 files changed, 40 insertions(+), 34 deletions(-) diff --git a/.ci/ubuntu/gha-setup.sh b/.ci/ubuntu/gha-setup.sh index 6eb91c8..5e5944d 100755 --- a/.ci/ubuntu/gha-setup.sh +++ b/.ci/ubuntu/gha-setup.sh @@ -7,7 +7,7 @@ set -o xtrace script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" readonly script_dir echo "[INFO] script_dir: '$script_dir'" -readonly rabbitmq_image=rabbitmq:4.1.0-beta.4-management-alpine +readonly rabbitmq_image=rabbitmq:4.1.0-management readonly docker_name_prefix='rabbitmq-amqp-python-client' diff --git a/examples/getting_started/getting_started.py b/examples/getting_started/getting_started.py index 6393966..e71b00a 100644 --- a/examples/getting_started/getting_started.py +++ b/examples/getting_started/getting_started.py @@ -11,11 +11,7 @@ ExchangeToQueueBindingSpecification, Message, OutcomeState, - QuorumQueueSpecification, -) -from rabbitmq_amqp_python_client.utils import ( - bytes_to_string, - string_to_bytes, + QuorumQueueSpecification, Converter, ) MESSAGES_TO_PUBLISH = 100 @@ -28,7 +24,7 @@ def __init__(self): self._count = 0 def on_amqp_message(self, event: Event): - print("received message: {} ".format(bytes_to_string(event.message.body))) + print("received message: {} ".format(Converter.bytes_to_string(event.message.body))) # accepting self.delivery_context.accept(event) @@ -47,13 +43,11 @@ def on_amqp_message(self, event: Event): # in case of rejection with annotations added # self.delivery_context.discard_with_annotations(event) - print("count " + str(self._count)) - self._count = self._count + 1 + print("count " + str(self._count)) if self._count == MESSAGES_TO_PUBLISH: - print("closing receiver") - # if you want you can add cleanup operations here + print("received all messages") def on_connection_closed(self, event: Event): # if you want you can add cleanup operations here @@ -125,9 +119,8 @@ def main() -> None: # publish 10 messages for i in range(MESSAGES_TO_PUBLISH): - print("publishing") status = publisher.publish( - Message(body=string_to_bytes("test message {} ".format(i))) + Message(body=Converter.string_to_bytes("test message {} ".format(i))) ) if status.remote_state == OutcomeState.ACCEPTED: print("message accepted") diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index 00e96c7..9a0fda0 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -180,6 +180,7 @@ def dial(self) -> None: self._ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT) assert self._ssl_domain + if isinstance(self._conf_ssl_context, PosixSslConfigurationContext): ca_cert = self._conf_ssl_context.ca_cert elif isinstance(self._conf_ssl_context, WinSslConfigurationContext): diff --git a/rabbitmq_amqp_python_client/qpid/proton/_message.py b/rabbitmq_amqp_python_client/qpid/proton/_message.py index 656f9ec..9f1fdaa 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_message.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_message.py @@ -511,7 +511,7 @@ def instructions(self) -> Optional[AnnotationDict]: @instructions.setter def instructions( - self, instructions: Optional[Dict[Union[str, int], "PythonAMQPData"]] + self, instructions: Optional[Dict[Union[str, int], "PythonAMQPData"]] ) -> None: if isinstance(instructions, dict): self.instruction_dict = AnnotationDict(instructions, raise_on_error=False) @@ -534,7 +534,7 @@ def annotations(self) -> Optional[AnnotationDict]: @annotations.setter def annotations( - self, annotations: Optional[Dict[Union[str, int], "PythonAMQPData"]] + self, annotations: Optional[Dict[Union[str, int], "PythonAMQPData"]] ) -> None: if isinstance(annotations, dict): self.annotation_dict = AnnotationDict(annotations, raise_on_error=False) @@ -601,7 +601,8 @@ def send(self, sender: "Sender", tag: Optional[str] = None) -> "Delivery": return dlv @overload - def recv(self, link: "Sender") -> None: ... + def recv(self, link: "Sender") -> None: + ... def recv(self, link: "Receiver") -> Optional["Delivery"]: """ @@ -632,24 +633,24 @@ def recv(self, link: "Receiver") -> Optional["Delivery"]: def __repr__(self) -> str: props = [] for attr in ( - "inferred", - "address", - "reply_to", - "durable", - "ttl", - "priority", - "first_acquirer", - "delivery_count", - "id", - "correlation_id", - "user_id", - "group_id", - "group_sequence", - "reply_to_group_id", - "instructions", - "annotations", - "properties", - "body", + "inferred", + "address", + "reply_to", + "durable", + "ttl", + "priority", + "first_acquirer", + "delivery_count", + "id", + "correlation_id", + "user_id", + "group_id", + "group_sequence", + "reply_to_group_id", + "instructions", + "annotations", + "properties", + "body", ): value = getattr(self, attr) if value: diff --git a/tests/test_connection.py b/tests/test_connection.py index 6190761..3e42e68 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,5 +1,6 @@ import time from datetime import datetime, timedelta +from pathlib import Path from rabbitmq_amqp_python_client import ( ConnectionClosed, @@ -37,7 +38,17 @@ def test_connection_ssl(ssl_context) -> None: environment = Environment( "amqps://guest:guest@localhost:5671/", ssl_context=ssl_context, + ) + path = Path(ssl_context.ca_cert) + assert path.is_file() is True + assert path.exists() is True + + path = Path(ssl_context.client_cert.client_cert) + assert path.is_file() is True + + path = Path(ssl_context.client_cert.client_key) + assert path.is_file() is True connection = environment.connection() connection.dial() From 58e1ae2aa561ba3cc9e2ee43607fdb5198e6e17e Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 2 May 2025 15:16:42 +0200 Subject: [PATCH 08/20] TLS example Signed-off-by: Gabriele Santomaggio --- examples/tls/tls_example.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/tls/tls_example.py b/examples/tls/tls_example.py index 68ee25b..1a2136e 100644 --- a/examples/tls/tls_example.py +++ b/examples/tls/tls_example.py @@ -138,9 +138,9 @@ def main() -> None: "connection failed. working directory should be project root" ) else: - print(" ca_cert_file {}".format(os.path.isfile(ca_cert_file))) - print(" client_cert {}".format(os.path.isfile(client_cert))) - print(" client_key {}".format(os.path.isfile(client_key))) + print(" ca_cert_file exists: {}".format(os.path.isfile(ca_cert_file))) + print(" client_cert exists: {}".format(os.path.isfile(client_cert))) + print(" client_key exists: {}".format(os.path.isfile(client_key))) environment = Environment( uri, ssl_context=PosixSslConfigurationContext( @@ -190,7 +190,7 @@ def main() -> None: # publish 10 messages for i in range(messages_to_publish): - status = publisher.publish(Message(body="test")) + status = publisher.publish(Message(body=Converter.string_to_bytes("test"))) if status.ACCEPTED: print("message accepted") elif status.RELEASED: From ce3d20e3a5116802d4c2840c46ae85f36aa2322a Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 2 May 2025 15:17:54 +0200 Subject: [PATCH 09/20] TLS example [skip ci] Signed-off-by: Gabriele Santomaggio --- examples/tls/tls_example.py | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/tls/tls_example.py b/examples/tls/tls_example.py index 1a2136e..8d4e6fe 100644 --- a/examples/tls/tls_example.py +++ b/examples/tls/tls_example.py @@ -20,6 +20,7 @@ QuorumQueueSpecification, WinClientCert, WinSslConfigurationContext, + Converter, ) from rabbitmq_amqp_python_client.ssl_configuration import ( FriendlyName, From c6329c392d2d07e6463415838d1c4722f725ec64 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 5 May 2025 14:15:44 +0200 Subject: [PATCH 10/20] refact --- examples/oauth/oaut.py | 13 ++-- examples/reconnection/reconnection_example.py | 8 +-- rabbitmq_amqp_python_client/management.py | 71 ++++++++++--------- rabbitmq_amqp_python_client/publisher.py | 10 +++ .../qpid/proton/_message.py | 10 +-- 5 files changed, 58 insertions(+), 54 deletions(-) diff --git a/examples/oauth/oaut.py b/examples/oauth/oaut.py index 56f16e8..15cd3cc 100644 --- a/examples/oauth/oaut.py +++ b/examples/oauth/oaut.py @@ -18,6 +18,7 @@ OAuth2Options, OutcomeState, QuorumQueueSpecification, + Converter ) MESSAGES_TO_PUBLISH = 100 @@ -30,7 +31,7 @@ def __init__(self): self._count = 0 def on_amqp_message(self, event: Event): - print("received message: " + str(event.message.body)) + print("received message: " + Converter.bytes_to_string(event.message.body)) # accepting self.delivery_context.accept(event) @@ -85,7 +86,6 @@ def create_connection(environment: Environment) -> Connection: def main() -> None: - exchange_name = "test-exchange" queue_name = "example-queue" routing_key = "routing-key" @@ -144,14 +144,13 @@ def main() -> None: # publish 10 messages for i in range(MESSAGES_TO_PUBLISH): - print("publishing") - status = publisher.publish(Message(body="test")) + status = publisher.publish(Message(body=Converter.string_to_bytes("test_{}".format(i)))) if status.remote_state == OutcomeState.ACCEPTED: - print("message accepted") + print("message: test_{} accepted".format(i)) elif status.remote_state == OutcomeState.RELEASED: - print("message not routed") + print("message: test_{} not routed".format(i)) elif status.remote_state == OutcomeState.REJECTED: - print("message not rejected") + print("message: test_{} rejected".format(i)) publisher.close() diff --git a/examples/reconnection/reconnection_example.py b/examples/reconnection/reconnection_example.py index 161b8d4..35a6b5e 100644 --- a/examples/reconnection/reconnection_example.py +++ b/examples/reconnection/reconnection_example.py @@ -9,13 +9,12 @@ ExchangeSpecification, ExchangeToQueueBindingSpecification, Message, - QuorumQueueSpecification, + QuorumQueueSpecification, Converter, ) # here we keep track of the objects we need to reconnect MESSAGES_TO_PUBLISH = 50000 - environment = Environment( uri="amqp://guest:guest@localhost:5672/", ) @@ -29,7 +28,7 @@ def __init__(self): def on_message(self, event: Event): if self._count % 1000 == 0: - print("received 100 message: " + str(event.message.body)) + print("received 100 message: " + Converter.bytes_to_string(event.message.body)) # accepting self.delivery_context.accept(event) @@ -79,7 +78,6 @@ def create_connection() -> Connection: def main() -> None: - exchange_name = "test-exchange" queue_name = "example-queue" routing_key = "routing-key" @@ -128,7 +126,7 @@ def main() -> None: print("published 1000 messages...") try: if publisher is not None: - publisher.publish(Message(body="test")) + publisher.publish(Message(body=Converter.string_to_bytes("test"))) except ConnectionClosed: print("publisher closing exception, resubmitting") # publisher = connection.publisher(addr) diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index 9ba75ac..9abd12e 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -98,11 +98,11 @@ def close(self) -> None: self._receiver.close() def request( - self, - body: Any, - path: str, - method: str, - expected_response_codes: list[int], + self, + body: Any, + path: str, + method: str, + expected_response_codes: list[int], ) -> Message: """ Send a management request with a new UUID. @@ -124,16 +124,17 @@ def request( ) def _request( - self, - id: str, - body: Any, - path: str, - method: str, - expected_response_codes: list[int], + self, + id: str, + body: Any, + path: str, + method: str, + expected_response_codes: list[int], ) -> Message: amq_message = Message( id=id, body=body, + inferred=False, reply_to="$me", address=path, subject=method, @@ -151,10 +152,10 @@ def _request( return msg def declare_exchange( - self, - exchange_specification: Union[ - ExchangeSpecification, ExchangeCustomSpecification - ], + self, + exchange_specification: Union[ + ExchangeSpecification, ExchangeCustomSpecification + ], ) -> Union[ExchangeSpecification, ExchangeCustomSpecification]: """ Declare a new exchange in RabbitMQ. @@ -195,10 +196,10 @@ def declare_exchange( return exchange_specification def declare_queue( - self, - queue_specification: Union[ - ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification - ], + self, + queue_specification: Union[ + ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification + ], ) -> Union[ ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification ]: @@ -219,7 +220,7 @@ def declare_queue( logger.debug("declare_queue operation called") if isinstance(queue_specification, ClassicQueueSpecification) or isinstance( - queue_specification, QuorumQueueSpecification + queue_specification, QuorumQueueSpecification ): body = self._declare_queue(queue_specification) @@ -242,8 +243,8 @@ def declare_queue( return queue_specification def _declare_queue( - self, - queue_specification: Union[ClassicQueueSpecification, QuorumQueueSpecification], + self, + queue_specification: Union[ClassicQueueSpecification, QuorumQueueSpecification], ) -> dict[str, Any]: body = {} @@ -311,7 +312,7 @@ def _declare_queue( return body def _declare_stream( - self, stream_specification: StreamSpecification + self, stream_specification: StreamSpecification ) -> dict[str, Any]: body = {} @@ -324,7 +325,7 @@ def _declare_stream( if stream_specification.max_age is not None: args["x-max-age"] = ( - str(int(stream_specification.max_age.total_seconds())) + "s" + str(int(stream_specification.max_age.total_seconds())) + "s" ) if stream_specification.stream_max_segment_size_bytes is not None: @@ -392,7 +393,7 @@ def delete_queue(self, name: str) -> None: ) def _validate_reponse_code( - self, response_code: int, expected_response_codes: list[int] + self, response_code: int, expected_response_codes: list[int] ) -> None: if response_code == CommonValues.response_code_409.value: raise ValidationCodeException("ErrPreconditionFailed") @@ -406,10 +407,10 @@ def _validate_reponse_code( ) def bind( - self, - bind_specification: Union[ - ExchangeToQueueBindingSpecification, ExchangeToExchangeBindingSpecification - ], + self, + bind_specification: Union[ + ExchangeToQueueBindingSpecification, ExchangeToExchangeBindingSpecification + ], ) -> str: """ Create a binding between exchanges or between an exchange and a queue. @@ -462,12 +463,12 @@ def bind( return binding_path def unbind( - self, - bind_specification: Union[ - str, - ExchangeToQueueBindingSpecification, - ExchangeToExchangeBindingSpecification, - ], + self, + bind_specification: Union[ + str, + ExchangeToQueueBindingSpecification, + ExchangeToExchangeBindingSpecification, + ], ) -> None: """ Remove a binding between exchanges or between an exchange and a queue. diff --git a/rabbitmq_amqp_python_client/publisher.py b/rabbitmq_amqp_python_client/publisher.py index dc63d5d..1eb0a3b 100644 --- a/rabbitmq_amqp_python_client/publisher.py +++ b/rabbitmq_amqp_python_client/publisher.py @@ -80,6 +80,16 @@ def publish(self, message: Message) -> Delivery: "address specified in both message and publisher" ) + if not isinstance(message.body, (bytes, type(None))): + raise ArgumentOutOfRangeException( + "Message body must be of type bytes or None" + ) + + if not message.inferred: + raise ArgumentOutOfRangeException( + "Message inferred must be True" + ) + if self._addr != "": if self._sender is not None: return self._sender.send(message) diff --git a/rabbitmq_amqp_python_client/qpid/proton/_message.py b/rabbitmq_amqp_python_client/qpid/proton/_message.py index 9f1fdaa..2388134 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_message.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_message.py @@ -113,20 +113,16 @@ class Message(object): DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY """ Default AMQP message priority""" - def __init__(self, body: Union[bytes, dict, None] = None, **kwargs) -> None: + def __init__(self, body: Union[str, bytes, dict, None] = None, inferred = True, **kwargs) -> None: # validate the types - if not isinstance(body, (bytes, dict, type(None))): - raise ArgumentOutOfRangeException( - "Message body must be of type bytes, dict or None" - ) - self._msg = pn_message() self.instructions = None self.annotations = None self.properties = None self.body = body - self.inferred = True + self.inferred = inferred + for k, v in kwargs.items(): getattr(self, k) # Raise exception if it's not a valid attribute. From aaa7fbe0d2d029c41230ea5dbe0fc88008b5dfad Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 5 May 2025 14:23:54 +0200 Subject: [PATCH 11/20] refactoring --- examples/getting_started/getting_started.py | 3 ++- examples/oauth/oaut.py | 2 +- examples/reconnection/reconnection_example.py | 3 ++- examples/tls/tls_example.py | 2 +- tests/test_consumer.py | 3 ++- tests/test_publisher.py | 3 ++- tests/utils.py | 2 +- 7 files changed, 11 insertions(+), 7 deletions(-) diff --git a/examples/getting_started/getting_started.py b/examples/getting_started/getting_started.py index e71b00a..b86b86b 100644 --- a/examples/getting_started/getting_started.py +++ b/examples/getting_started/getting_started.py @@ -5,13 +5,14 @@ AddressHelper, AMQPMessagingHandler, Connection, + Converter, Environment, Event, ExchangeSpecification, ExchangeToQueueBindingSpecification, Message, OutcomeState, - QuorumQueueSpecification, Converter, + QuorumQueueSpecification, ) MESSAGES_TO_PUBLISH = 100 diff --git a/examples/oauth/oaut.py b/examples/oauth/oaut.py index 15cd3cc..daf9cac 100644 --- a/examples/oauth/oaut.py +++ b/examples/oauth/oaut.py @@ -10,6 +10,7 @@ AddressHelper, AMQPMessagingHandler, Connection, + Converter, Environment, Event, ExchangeSpecification, @@ -18,7 +19,6 @@ OAuth2Options, OutcomeState, QuorumQueueSpecification, - Converter ) MESSAGES_TO_PUBLISH = 100 diff --git a/examples/reconnection/reconnection_example.py b/examples/reconnection/reconnection_example.py index 35a6b5e..e8e9945 100644 --- a/examples/reconnection/reconnection_example.py +++ b/examples/reconnection/reconnection_example.py @@ -4,12 +4,13 @@ AMQPMessagingHandler, Connection, ConnectionClosed, + Converter, Environment, Event, ExchangeSpecification, ExchangeToQueueBindingSpecification, Message, - QuorumQueueSpecification, Converter, + QuorumQueueSpecification, ) # here we keep track of the objects we need to reconnect diff --git a/examples/tls/tls_example.py b/examples/tls/tls_example.py index 8d4e6fe..3d7eaee 100644 --- a/examples/tls/tls_example.py +++ b/examples/tls/tls_example.py @@ -7,6 +7,7 @@ AddressHelper, AMQPMessagingHandler, Connection, + Converter, CurrentUserStore, Environment, Event, @@ -20,7 +21,6 @@ QuorumQueueSpecification, WinClientCert, WinSslConfigurationContext, - Converter, ) from rabbitmq_amqp_python_client.ssl_configuration import ( FriendlyName, diff --git a/tests/test_consumer.py b/tests/test_consumer.py index d4217d3..1026fc6 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -2,8 +2,9 @@ AddressHelper, ArgumentOutOfRangeException, Connection, + Converter, Environment, - QuorumQueueSpecification, Converter, + QuorumQueueSpecification, ) from .conftest import ( diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 7f5687e..c4389ea 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -5,13 +5,14 @@ ArgumentOutOfRangeException, Connection, ConnectionClosed, + Converter, Environment, Message, OutcomeState, QuorumQueueSpecification, RecoveryConfiguration, StreamSpecification, - ValidationCodeException, Converter, + ValidationCodeException, ) from .http_requests import delete_all_connections diff --git a/tests/utils.py b/tests/utils.py index 1b3f062..f6ef067 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -7,6 +7,7 @@ from rabbitmq_amqp_python_client import ( AddressHelper, Connection, + Converter, Delivery, ExchangeSpecification, ExchangeToQueueBindingSpecification, @@ -15,7 +16,6 @@ Message, Publisher, QuorumQueueSpecification, - Converter, ) From a00336409414beeaa1fb7f7b654c88d66b095b91 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 5 May 2025 14:35:05 +0200 Subject: [PATCH 12/20] formatting --- examples/getting_started/getting_started.py | 6 +- examples/oauth/oaut.py | 4 +- examples/reconnection/reconnection_example.py | 4 +- rabbitmq_amqp_python_client/connection.py | 1 - rabbitmq_amqp_python_client/management.py | 70 +++++++++---------- rabbitmq_amqp_python_client/publisher.py | 4 +- .../qpid/proton/_message.py | 48 ++++++------- tests/test_connection.py | 1 - tests/utils.py | 15 ++-- 9 files changed, 80 insertions(+), 73 deletions(-) diff --git a/examples/getting_started/getting_started.py b/examples/getting_started/getting_started.py index b86b86b..6e5e018 100644 --- a/examples/getting_started/getting_started.py +++ b/examples/getting_started/getting_started.py @@ -25,7 +25,11 @@ def __init__(self): self._count = 0 def on_amqp_message(self, event: Event): - print("received message: {} ".format(Converter.bytes_to_string(event.message.body))) + print( + "received message: {} ".format( + Converter.bytes_to_string(event.message.body) + ) + ) # accepting self.delivery_context.accept(event) diff --git a/examples/oauth/oaut.py b/examples/oauth/oaut.py index daf9cac..574cc50 100644 --- a/examples/oauth/oaut.py +++ b/examples/oauth/oaut.py @@ -144,7 +144,9 @@ def main() -> None: # publish 10 messages for i in range(MESSAGES_TO_PUBLISH): - status = publisher.publish(Message(body=Converter.string_to_bytes("test_{}".format(i)))) + status = publisher.publish( + Message(body=Converter.string_to_bytes("test_{}".format(i))) + ) if status.remote_state == OutcomeState.ACCEPTED: print("message: test_{} accepted".format(i)) elif status.remote_state == OutcomeState.RELEASED: diff --git a/examples/reconnection/reconnection_example.py b/examples/reconnection/reconnection_example.py index e8e9945..bbd5f33 100644 --- a/examples/reconnection/reconnection_example.py +++ b/examples/reconnection/reconnection_example.py @@ -29,7 +29,9 @@ def __init__(self): def on_message(self, event: Event): if self._count % 1000 == 0: - print("received 100 message: " + Converter.bytes_to_string(event.message.body)) + print( + "received 100 message: " + Converter.bytes_to_string(event.message.body) + ) # accepting self.delivery_context.accept(event) diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index 9a0fda0..00e96c7 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -180,7 +180,6 @@ def dial(self) -> None: self._ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT) assert self._ssl_domain - if isinstance(self._conf_ssl_context, PosixSslConfigurationContext): ca_cert = self._conf_ssl_context.ca_cert elif isinstance(self._conf_ssl_context, WinSslConfigurationContext): diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index 9abd12e..6ff7487 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -98,11 +98,11 @@ def close(self) -> None: self._receiver.close() def request( - self, - body: Any, - path: str, - method: str, - expected_response_codes: list[int], + self, + body: Any, + path: str, + method: str, + expected_response_codes: list[int], ) -> Message: """ Send a management request with a new UUID. @@ -124,12 +124,12 @@ def request( ) def _request( - self, - id: str, - body: Any, - path: str, - method: str, - expected_response_codes: list[int], + self, + id: str, + body: Any, + path: str, + method: str, + expected_response_codes: list[int], ) -> Message: amq_message = Message( id=id, @@ -152,10 +152,10 @@ def _request( return msg def declare_exchange( - self, - exchange_specification: Union[ - ExchangeSpecification, ExchangeCustomSpecification - ], + self, + exchange_specification: Union[ + ExchangeSpecification, ExchangeCustomSpecification + ], ) -> Union[ExchangeSpecification, ExchangeCustomSpecification]: """ Declare a new exchange in RabbitMQ. @@ -196,10 +196,10 @@ def declare_exchange( return exchange_specification def declare_queue( - self, - queue_specification: Union[ - ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification - ], + self, + queue_specification: Union[ + ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification + ], ) -> Union[ ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification ]: @@ -220,7 +220,7 @@ def declare_queue( logger.debug("declare_queue operation called") if isinstance(queue_specification, ClassicQueueSpecification) or isinstance( - queue_specification, QuorumQueueSpecification + queue_specification, QuorumQueueSpecification ): body = self._declare_queue(queue_specification) @@ -243,8 +243,8 @@ def declare_queue( return queue_specification def _declare_queue( - self, - queue_specification: Union[ClassicQueueSpecification, QuorumQueueSpecification], + self, + queue_specification: Union[ClassicQueueSpecification, QuorumQueueSpecification], ) -> dict[str, Any]: body = {} @@ -312,7 +312,7 @@ def _declare_queue( return body def _declare_stream( - self, stream_specification: StreamSpecification + self, stream_specification: StreamSpecification ) -> dict[str, Any]: body = {} @@ -325,7 +325,7 @@ def _declare_stream( if stream_specification.max_age is not None: args["x-max-age"] = ( - str(int(stream_specification.max_age.total_seconds())) + "s" + str(int(stream_specification.max_age.total_seconds())) + "s" ) if stream_specification.stream_max_segment_size_bytes is not None: @@ -393,7 +393,7 @@ def delete_queue(self, name: str) -> None: ) def _validate_reponse_code( - self, response_code: int, expected_response_codes: list[int] + self, response_code: int, expected_response_codes: list[int] ) -> None: if response_code == CommonValues.response_code_409.value: raise ValidationCodeException("ErrPreconditionFailed") @@ -407,10 +407,10 @@ def _validate_reponse_code( ) def bind( - self, - bind_specification: Union[ - ExchangeToQueueBindingSpecification, ExchangeToExchangeBindingSpecification - ], + self, + bind_specification: Union[ + ExchangeToQueueBindingSpecification, ExchangeToExchangeBindingSpecification + ], ) -> str: """ Create a binding between exchanges or between an exchange and a queue. @@ -463,12 +463,12 @@ def bind( return binding_path def unbind( - self, - bind_specification: Union[ - str, - ExchangeToQueueBindingSpecification, - ExchangeToExchangeBindingSpecification, - ], + self, + bind_specification: Union[ + str, + ExchangeToQueueBindingSpecification, + ExchangeToExchangeBindingSpecification, + ], ) -> None: """ Remove a binding between exchanges or between an exchange and a queue. diff --git a/rabbitmq_amqp_python_client/publisher.py b/rabbitmq_amqp_python_client/publisher.py index 1eb0a3b..824170c 100644 --- a/rabbitmq_amqp_python_client/publisher.py +++ b/rabbitmq_amqp_python_client/publisher.py @@ -86,9 +86,7 @@ def publish(self, message: Message) -> Delivery: ) if not message.inferred: - raise ArgumentOutOfRangeException( - "Message inferred must be True" - ) + raise ArgumentOutOfRangeException("Message inferred must be True") if self._addr != "": if self._sender is not None: diff --git a/rabbitmq_amqp_python_client/qpid/proton/_message.py b/rabbitmq_amqp_python_client/qpid/proton/_message.py index 2388134..a1a7a50 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_message.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_message.py @@ -113,7 +113,9 @@ class Message(object): DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY """ Default AMQP message priority""" - def __init__(self, body: Union[str, bytes, dict, None] = None, inferred = True, **kwargs) -> None: + def __init__( + self, body: Union[str, bytes, dict, None] = None, inferred=True, **kwargs + ) -> None: # validate the types self._msg = pn_message() @@ -123,7 +125,6 @@ def __init__(self, body: Union[str, bytes, dict, None] = None, inferred = True, self.body = body self.inferred = inferred - for k, v in kwargs.items(): getattr(self, k) # Raise exception if it's not a valid attribute. setattr(self, k, v) @@ -507,7 +508,7 @@ def instructions(self) -> Optional[AnnotationDict]: @instructions.setter def instructions( - self, instructions: Optional[Dict[Union[str, int], "PythonAMQPData"]] + self, instructions: Optional[Dict[Union[str, int], "PythonAMQPData"]] ) -> None: if isinstance(instructions, dict): self.instruction_dict = AnnotationDict(instructions, raise_on_error=False) @@ -530,7 +531,7 @@ def annotations(self) -> Optional[AnnotationDict]: @annotations.setter def annotations( - self, annotations: Optional[Dict[Union[str, int], "PythonAMQPData"]] + self, annotations: Optional[Dict[Union[str, int], "PythonAMQPData"]] ) -> None: if isinstance(annotations, dict): self.annotation_dict = AnnotationDict(annotations, raise_on_error=False) @@ -597,8 +598,7 @@ def send(self, sender: "Sender", tag: Optional[str] = None) -> "Delivery": return dlv @overload - def recv(self, link: "Sender") -> None: - ... + def recv(self, link: "Sender") -> None: ... def recv(self, link: "Receiver") -> Optional["Delivery"]: """ @@ -629,24 +629,24 @@ def recv(self, link: "Receiver") -> Optional["Delivery"]: def __repr__(self) -> str: props = [] for attr in ( - "inferred", - "address", - "reply_to", - "durable", - "ttl", - "priority", - "first_acquirer", - "delivery_count", - "id", - "correlation_id", - "user_id", - "group_id", - "group_sequence", - "reply_to_group_id", - "instructions", - "annotations", - "properties", - "body", + "inferred", + "address", + "reply_to", + "durable", + "ttl", + "priority", + "first_acquirer", + "delivery_count", + "id", + "correlation_id", + "user_id", + "group_id", + "group_sequence", + "reply_to_group_id", + "instructions", + "annotations", + "properties", + "body", ): value = getattr(self, attr) if value: diff --git a/tests/test_connection.py b/tests/test_connection.py index 3e42e68..5636020 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -38,7 +38,6 @@ def test_connection_ssl(ssl_context) -> None: environment = Environment( "amqps://guest:guest@localhost:5671/", ssl_context=ssl_context, - ) path = Path(ssl_context.ca_cert) assert path.is_file() is True diff --git a/tests/utils.py b/tests/utils.py index f6ef067..644c182 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -20,10 +20,10 @@ def publish_messages( - connection: Connection, - messages_to_send: int, - queue_name, - filters: Optional[list[str]] = None, + connection: Connection, + messages_to_send: int, + queue_name, + filters: Optional[list[str]] = None, ) -> None: annotations = {} if filters is not None: @@ -34,7 +34,10 @@ def publish_messages( # publish messages_to_send messages for i in range(messages_to_send): publisher.publish( - Message(body=Converter.string_to_bytes("test{}".format(i)), annotations=annotations) + Message( + body=Converter.string_to_bytes("test{}".format(i)), + annotations=annotations, + ) ) publisher.close() @@ -72,7 +75,7 @@ def setup_dead_lettering(management: Management) -> str: def create_binding( - management: Management, exchange_name: str, queue_name: str, routing_key: str + management: Management, exchange_name: str, queue_name: str, routing_key: str ) -> str: management.declare_exchange(ExchangeSpecification(name=exchange_name)) From cf5ba703409bf716c3c2a196bb60c5aeb3ba483a Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 5 May 2025 15:00:51 +0200 Subject: [PATCH 13/20] formatting --- examples/tls/tls_example.py | 2 +- rabbitmq_amqp_python_client/converter.py | 27 ------------ .../qpid/proton/_message.py | 3 +- rabbitmq_amqp_python_client/utils.py | 42 +++++++++++++++++++ tests/test_publisher.py | 2 +- 5 files changed, 45 insertions(+), 31 deletions(-) delete mode 100644 rabbitmq_amqp_python_client/converter.py diff --git a/examples/tls/tls_example.py b/examples/tls/tls_example.py index 3d7eaee..6bf4c87 100644 --- a/examples/tls/tls_example.py +++ b/examples/tls/tls_example.py @@ -36,7 +36,7 @@ def __init__(self): self._count = 0 def on_message(self, event: Event): - print("received message: " + str(event.message.body)) + print("received message: " + Converter.bytes_to_string(event.message.body)) # accepting self.delivery_context.accept(event) diff --git a/rabbitmq_amqp_python_client/converter.py b/rabbitmq_amqp_python_client/converter.py deleted file mode 100644 index e813716..0000000 --- a/rabbitmq_amqp_python_client/converter.py +++ /dev/null @@ -1,27 +0,0 @@ -class Converter: - - @staticmethod - def bytes_to_string(body: bytes) -> str: - """ - Convert the body of a message to a string. - - Args: - body: The body of the message - - Returns: - str: The string representation of the body - """ - return "".join(map(chr, body)) - - @staticmethod - def string_to_bytes(body: str) -> bytes: - """ - Convert a string to the body of a message. - - Args: - body: The string to convert - - Returns: - bytes: The byte representation of the string - """ - return str.encode(body) diff --git a/rabbitmq_amqp_python_client/qpid/proton/_message.py b/rabbitmq_amqp_python_client/qpid/proton/_message.py index a1a7a50..b7387f5 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_message.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_message.py @@ -86,12 +86,11 @@ from ._endpoints import Link from ._exceptions import ( EXCEPTIONS, - ArgumentOutOfRangeException, MessageException, ) if TYPE_CHECKING: - from proton._data import Described, PythonAMQPData + from proton._data import PythonAMQPData from proton._delivery import Delivery from proton._endpoints import Receiver, Sender diff --git a/rabbitmq_amqp_python_client/utils.py b/rabbitmq_amqp_python_client/utils.py index cbc1a35..7c67b08 100644 --- a/rabbitmq_amqp_python_client/utils.py +++ b/rabbitmq_amqp_python_client/utils.py @@ -7,3 +7,45 @@ def validate_annotations(annotations: []) -> bool: # type: ignore validated = False return validated return validated + + +def string_to_bytes(body: str) -> bytes: + """ + Convert a string to the body of a message. + + Args: + body: The string to convert + + Returns: + bytes: The byte representation of the string + """ + return str.encode(body) + + +class Converter: + + @staticmethod + def bytes_to_string(body: bytes) -> str: + """ + Convert the body of a message to a string. + + Args: + body: The body of the message + + Returns: + str: The string representation of the body + """ + return "".join(map(chr, body)) + + @staticmethod + def string_to_bytes(body: str) -> bytes: + """ + Convert a string to the body of a message. + + Args: + body: The string to convert + + Returns: + bytes: The byte representation of the string + """ + return str.encode(body) diff --git a/tests/test_publisher.py b/tests/test_publisher.py index c4389ea..a22d37c 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -131,7 +131,7 @@ def test_publish_to_invalid_destination(connection: Connection) -> None: publisher = None try: publisher = connection.publisher("/invalid-destination/" + queue_name) - publisher.publish(Message(body=string_to_bytes("test"))) + publisher.publish(Message(body=Converter.string_to_bytes("test"))) except ArgumentOutOfRangeException: raised = True except Exception: From 144c4f9c01061f33cca2ae79688989d4392e4731 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 5 May 2025 15:12:13 +0200 Subject: [PATCH 14/20] formatting --- rabbitmq_amqp_python_client/__init__.py | 2 -- tests/test_consumer.py | 2 +- tests/test_publisher.py | 2 +- tests/utils.py | 2 +- 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 55f3c5c..cdb623a 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -5,7 +5,6 @@ from .common import ExchangeType, QueueType from .connection import Connection from .consumer import Consumer -from .converter import Converter from .entities import ( ExchangeCustomSpecification, ExchangeSpecification, @@ -92,5 +91,4 @@ "ExchangeCustomSpecification", "RecoveryConfiguration", "OAuth2Options", - "Converter", ] diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 1026fc6..87a6ef8 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -2,10 +2,10 @@ AddressHelper, ArgumentOutOfRangeException, Connection, - Converter, Environment, QuorumQueueSpecification, ) +from rabbitmq_amqp_python_client.utils import Converter from .conftest import ( ConsumerTestException, diff --git a/tests/test_publisher.py b/tests/test_publisher.py index a22d37c..51410ac 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -5,7 +5,6 @@ ArgumentOutOfRangeException, Connection, ConnectionClosed, - Converter, Environment, Message, OutcomeState, @@ -14,6 +13,7 @@ StreamSpecification, ValidationCodeException, ) +from rabbitmq_amqp_python_client.utils import Converter from .http_requests import delete_all_connections from .utils import create_binding, publish_per_message diff --git a/tests/utils.py b/tests/utils.py index 644c182..1dd5337 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -7,7 +7,6 @@ from rabbitmq_amqp_python_client import ( AddressHelper, Connection, - Converter, Delivery, ExchangeSpecification, ExchangeToQueueBindingSpecification, @@ -17,6 +16,7 @@ Publisher, QuorumQueueSpecification, ) +from rabbitmq_amqp_python_client.utils import Converter def publish_messages( From 366db97be828887f1546f7cc904be05df1f2de3c Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 5 May 2025 16:33:34 +0200 Subject: [PATCH 15/20] formatting --- README.md | 2 +- examples/README.md | 2 +- examples/getting_started/getting_started.py | 2 +- examples/oauth/{oaut.py => oAuth2.py} | 6 +++--- examples/streams/example_with_streams.py | 8 +++++--- examples/tls/tls_example.py | 6 +++--- rabbitmq_amqp_python_client/__init__.py | 3 +++ rabbitmq_amqp_python_client/utils.py | 13 ------------- 8 files changed, 17 insertions(+), 25 deletions(-) rename examples/oauth/{oaut.py => oAuth2.py} (98%) diff --git a/README.md b/README.md index ca395d6..46f483e 100644 --- a/README.md +++ b/README.md @@ -149,7 +149,7 @@ You can check the [`ssl example`](./examples/tls/tls_example.py) to see how to e The client supports oauth2 authentication. -You can check the [`oauth2 example`](./examples/oauth/oaut.py) to see how to establish and refresh a connection using an oauth2 token +You can check the [`oauth2 example`](examples/oauth/oAuth2.py) to see how to establish and refresh a connection using an oauth2 token ### Managing disconnections diff --git a/examples/README.md b/examples/README.md index 123539c..c9442f2 100644 --- a/examples/README.md +++ b/examples/README.md @@ -4,4 +4,4 @@ Client examples - [Reconnection](./reconnection/reconnection_example.py) - Producer and Consumer example with reconnection - [TLS](./tls/tls_example.py) - Producer and Consumer using a TLS connection - [Streams](./streams/example_with_streams.py) - Example supporting stream capabilities - - [Oauth](./oauth/oauth.py) - Connection through Oauth token \ No newline at end of file + - [Oauth](./oauth/oAuth2.py) - Connection through Oauth token \ No newline at end of file diff --git a/examples/getting_started/getting_started.py b/examples/getting_started/getting_started.py index 6e5e018..88e5c10 100644 --- a/examples/getting_started/getting_started.py +++ b/examples/getting_started/getting_started.py @@ -5,7 +5,6 @@ AddressHelper, AMQPMessagingHandler, Connection, - Converter, Environment, Event, ExchangeSpecification, @@ -13,6 +12,7 @@ Message, OutcomeState, QuorumQueueSpecification, + Converter, ) MESSAGES_TO_PUBLISH = 100 diff --git a/examples/oauth/oaut.py b/examples/oauth/oAuth2.py similarity index 98% rename from examples/oauth/oaut.py rename to examples/oauth/oAuth2.py index 574cc50..43d62b1 100644 --- a/examples/oauth/oaut.py +++ b/examples/oauth/oAuth2.py @@ -86,9 +86,9 @@ def create_connection(environment: Environment) -> Connection: def main() -> None: - exchange_name = "test-exchange" - queue_name = "example-queue" - routing_key = "routing-key" + exchange_name = "oAuth2-test-exchange" + queue_name = "oAuth2-example-queue" + routing_key = "oAuth2-routing-key" print("connection to amqp server") oaut_token = token( diff --git a/examples/streams/example_with_streams.py b/examples/streams/example_with_streams.py index cb53e73..09730b3 100644 --- a/examples/streams/example_with_streams.py +++ b/examples/streams/example_with_streams.py @@ -11,6 +11,7 @@ OffsetSpecification, StreamOptions, StreamSpecification, + Converter, ) MESSAGES_TO_PUBLISH = 100 @@ -26,7 +27,7 @@ def on_amqp_message(self, event: Event): # just messages with banana filters get received print( "received message from stream: " - + str(event.message.body) + + Converter.bytes_to_string(event.message.body) + " with offset: " + str(event.message.annotations["x-stream-offset"]) ) @@ -118,7 +119,8 @@ def main() -> None: for i in range(MESSAGES_TO_PUBLISH): publisher.publish( Message( - body="apple: " + str(i), annotations={"x-stream-filter-value": "apple"} + Converter.string_to_bytes(body="apple: " + str(i)), + annotations={"x-stream-filter-value": "apple"}, ) ) @@ -126,7 +128,7 @@ def main() -> None: for i in range(MESSAGES_TO_PUBLISH): publisher.publish( Message( - body="banana: " + str(i), + body=Converter.string_to_bytes("banana: " + str(i)), annotations={"x-stream-filter-value": "banana"}, ) ) diff --git a/examples/tls/tls_example.py b/examples/tls/tls_example.py index 6bf4c87..bd953fc 100644 --- a/examples/tls/tls_example.py +++ b/examples/tls/tls_example.py @@ -81,9 +81,9 @@ def create_connection(environment: Environment) -> Connection: def main() -> None: - exchange_name = "test-exchange" - queue_name = "example-queue" - routing_key = "routing-key" + exchange_name = "tls-test-exchange" + queue_name = "tls-example-queue" + routing_key = "tls-routing-key" ca_p12_store = ".ci/certs/ca.p12" ca_cert_file = ".ci/certs/ca_certificate.pem" client_cert = ".ci/certs/client_localhost_certificate.pem" diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index cdb623a..a8320a2 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -15,6 +15,8 @@ RecoveryConfiguration, StreamOptions, ) + +from .utils import Converter from .environment import Environment from .exceptions import ( ArgumentOutOfRangeException, @@ -91,4 +93,5 @@ "ExchangeCustomSpecification", "RecoveryConfiguration", "OAuth2Options", + "Converter", ] diff --git a/rabbitmq_amqp_python_client/utils.py b/rabbitmq_amqp_python_client/utils.py index 7c67b08..4c8b75d 100644 --- a/rabbitmq_amqp_python_client/utils.py +++ b/rabbitmq_amqp_python_client/utils.py @@ -9,19 +9,6 @@ def validate_annotations(annotations: []) -> bool: # type: ignore return validated -def string_to_bytes(body: str) -> bytes: - """ - Convert a string to the body of a message. - - Args: - body: The string to convert - - Returns: - bytes: The byte representation of the string - """ - return str.encode(body) - - class Converter: @staticmethod From 11ca5d9662e1ac35c372aa301a568a5c54ec9349 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 5 May 2025 16:35:30 +0200 Subject: [PATCH 16/20] formatting --- examples/getting_started/getting_started.py | 2 +- examples/streams/example_with_streams.py | 2 +- rabbitmq_amqp_python_client/__init__.py | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/examples/getting_started/getting_started.py b/examples/getting_started/getting_started.py index 88e5c10..6e5e018 100644 --- a/examples/getting_started/getting_started.py +++ b/examples/getting_started/getting_started.py @@ -5,6 +5,7 @@ AddressHelper, AMQPMessagingHandler, Connection, + Converter, Environment, Event, ExchangeSpecification, @@ -12,7 +13,6 @@ Message, OutcomeState, QuorumQueueSpecification, - Converter, ) MESSAGES_TO_PUBLISH = 100 diff --git a/examples/streams/example_with_streams.py b/examples/streams/example_with_streams.py index 09730b3..79776a6 100644 --- a/examples/streams/example_with_streams.py +++ b/examples/streams/example_with_streams.py @@ -5,13 +5,13 @@ AMQPMessagingHandler, Connection, ConnectionClosed, + Converter, Environment, Event, Message, OffsetSpecification, StreamOptions, StreamSpecification, - Converter, ) MESSAGES_TO_PUBLISH = 100 diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index a8320a2..487e878 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -15,8 +15,6 @@ RecoveryConfiguration, StreamOptions, ) - -from .utils import Converter from .environment import Environment from .exceptions import ( ArgumentOutOfRangeException, @@ -44,6 +42,7 @@ WinClientCert, WinSslConfigurationContext, ) +from .utils import Converter try: __version__ = metadata.version(__package__) From d84ad73d5d548e17f48d93bb38631ec0bd77e5be Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 6 May 2025 16:12:01 +0200 Subject: [PATCH 17/20] add amqp 091 test --- Makefile | 8 +++++ poetry.lock | 57 ++++++++++++++++++++++++++++++++--- pyproject.toml | 5 +++- tests/test_amqp_091.py | 68 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 133 insertions(+), 5 deletions(-) create mode 100644 tests/test_amqp_091.py diff --git a/Makefile b/Makefile index 616547b..05988c1 100644 --- a/Makefile +++ b/Makefile @@ -7,5 +7,13 @@ rabbitmq-server: rabbitmq-server-stop: ./.ci/ubuntu/gha-setup.sh stop +format: + poetry run isort --skip rabbitmq_amqp_python_client/qpid . + poetry run black rabbitmq_amqp_python_client/ + poetry run black tests/ + poetry run flake8 --exclude=venv,local_tests,docs/examples,rabbitmq_amqp_python_client/qpid --max-line-length=120 --ignore=E203,W503 + +test: format + poetry run pytest . help: cat Makefile diff --git a/poetry.lock b/poetry.lock index e9c8b6d..01df370 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. [[package]] name = "black" @@ -6,6 +6,7 @@ version = "24.10.0" description = "The uncompromising code formatter." optional = false python-versions = ">=3.9" +groups = ["dev"] files = [ {file = "black-24.10.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e6668650ea4b685440857138e5fe40cde4d652633b1bdffc62933d0db4ed9812"}, {file = "black-24.10.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:1c536fcf674217e87b8cc3657b81809d3c085d7bf3ef262ead700da345bfa6ea"}, @@ -52,6 +53,7 @@ version = "2025.1.31" description = "Python package for providing Mozilla's CA Bundle." optional = false python-versions = ">=3.6" +groups = ["dev"] files = [ {file = "certifi-2025.1.31-py3-none-any.whl", hash = "sha256:ca78db4565a652026a4db2bcdf68f2fb589ea80d0be70e03929ed730746b84fe"}, {file = "certifi-2025.1.31.tar.gz", hash = "sha256:3d5da6925056f6f18f119200434a4780a94263f10d1c21d032a6f6b2baa20651"}, @@ -63,6 +65,7 @@ version = "1.17.1" description = "Foreign Function Interface for Python calling C code." optional = false python-versions = ">=3.8" +groups = ["main", "dev"] files = [ {file = "cffi-1.17.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:df8b1c11f177bc2313ec4b2d46baec87a5f3e71fc8b45dab2ee7cae86d9aba14"}, {file = "cffi-1.17.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8f2cdc858323644ab277e9bb925ad72ae0e67f69e804f4898c070998d50b1a67"}, @@ -142,6 +145,7 @@ version = "3.4.1" description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "charset_normalizer-3.4.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:91b36a978b5ae0ee86c394f5a54d6ef44db1de0815eb43de826d41d21e4af3de"}, {file = "charset_normalizer-3.4.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7461baadb4dc00fd9e0acbe254e3d7d2112e7f92ced2adc96e54ef6501c5f176"}, @@ -243,6 +247,7 @@ version = "8.1.8" description = "Composable command line interface toolkit" optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "click-8.1.8-py3-none-any.whl", hash = "sha256:63c132bbbed01578a06712a2d1f497bb62d9c1c0d329b7903a866228027263b2"}, {file = "click-8.1.8.tar.gz", hash = "sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a"}, @@ -257,6 +262,8 @@ version = "0.4.6" description = "Cross-platform colored terminal text." optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +groups = ["dev"] +markers = "sys_platform == \"win32\" or platform_system == \"Windows\"" files = [ {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, @@ -268,6 +275,8 @@ version = "1.2.2" description = "Backport of PEP 654 (exception groups)" optional = false python-versions = ">=3.7" +groups = ["dev"] +markers = "python_version < \"3.11\"" files = [ {file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"}, {file = "exceptiongroup-1.2.2.tar.gz", hash = "sha256:47c2edf7c6738fafb49fd34290706d1a1a2f4d1c6df275526b62cbb4aa5393cc"}, @@ -282,6 +291,7 @@ version = "7.1.2" description = "the modular source code checker: pep8 pyflakes and co" optional = false python-versions = ">=3.8.1" +groups = ["dev"] files = [ {file = "flake8-7.1.2-py2.py3-none-any.whl", hash = "sha256:1cbc62e65536f65e6d754dfe6f1bada7f5cf392d6f5db3c2b85892466c3e7c1a"}, {file = "flake8-7.1.2.tar.gz", hash = "sha256:c586ffd0b41540951ae41af572e6790dbd49fc12b3aa2541685d253d9bd504bd"}, @@ -298,6 +308,7 @@ version = "3.10" description = "Internationalized Domain Names in Applications (IDNA)" optional = false python-versions = ">=3.6" +groups = ["dev"] files = [ {file = "idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3"}, {file = "idna-3.10.tar.gz", hash = "sha256:12f65c9b470abda6dc35cf8e63cc574b1c52b11df2c86030af0ac09b01b13ea9"}, @@ -312,6 +323,7 @@ version = "2.0.0" description = "brain-dead simple config-ini parsing" optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"}, {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, @@ -323,6 +335,7 @@ version = "5.13.2" description = "A Python utility / library to sort Python imports." optional = false python-versions = ">=3.8.0" +groups = ["dev"] files = [ {file = "isort-5.13.2-py3-none-any.whl", hash = "sha256:8ca5e72a8d85860d5a3fa69b8745237f2939afe12dbf656afbcb47fe72d947a6"}, {file = "isort-5.13.2.tar.gz", hash = "sha256:48fdfcb9face5d58a4f6dde2e72a1fb8dcaf8ab26f95ab49fab84c2ddefb0109"}, @@ -337,6 +350,7 @@ version = "0.7.0" description = "McCabe checker, plugin for flake8" optional = false python-versions = ">=3.6" +groups = ["dev"] files = [ {file = "mccabe-0.7.0-py2.py3-none-any.whl", hash = "sha256:6c2d30ab6be0e4a46919781807b4f0d834ebdd6c6e3dca0bda5a15f863427b6e"}, {file = "mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325"}, @@ -348,6 +362,7 @@ version = "0.910" description = "Optional static typing for Python" optional = false python-versions = ">=3.5" +groups = ["dev"] files = [ {file = "mypy-0.910-cp35-cp35m-macosx_10_9_x86_64.whl", hash = "sha256:a155d80ea6cee511a3694b108c4494a39f42de11ee4e61e72bc424c490e46457"}, {file = "mypy-0.910-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:b94e4b785e304a04ea0828759172a15add27088520dc7e49ceade7834275bedb"}, @@ -389,6 +404,7 @@ version = "0.4.4" description = "Experimental type system extensions for programs checked with the mypy typechecker." optional = false python-versions = ">=2.7" +groups = ["dev"] files = [ {file = "mypy_extensions-0.4.4.tar.gz", hash = "sha256:c8b707883a96efe9b4bb3aaf0dcc07e7e217d7d8368eec4db4049ee9e142f4fd"}, ] @@ -399,6 +415,7 @@ version = "24.2" description = "Core utilities for Python packages" optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "packaging-24.2-py3-none-any.whl", hash = "sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759"}, {file = "packaging-24.2.tar.gz", hash = "sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f"}, @@ -410,17 +427,36 @@ version = "0.12.1" description = "Utility library for gitignore style pattern matching of file paths." optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "pathspec-0.12.1-py3-none-any.whl", hash = "sha256:a0d503e138a4c123b27490a4f7beda6a01c6f288df0e4a8b79c7eb0dc7b4cc08"}, {file = "pathspec-0.12.1.tar.gz", hash = "sha256:a482d51503a1ab33b1c67a6c3813a26953dbdc71c31dacaef9a838c4e29f5712"}, ] +[[package]] +name = "pika" +version = "1.3.2" +description = "Pika Python AMQP Client Library" +optional = false +python-versions = ">=3.7" +groups = ["test"] +files = [ + {file = "pika-1.3.2-py3-none-any.whl", hash = "sha256:0779a7c1fafd805672796085560d290213a465e4f6f76a6fb19e378d8041a14f"}, + {file = "pika-1.3.2.tar.gz", hash = "sha256:b2a327ddddf8570b4965b3576ac77091b850262d34ce8c1d8cb4e4146aa4145f"}, +] + +[package.extras] +gevent = ["gevent"] +tornado = ["tornado"] +twisted = ["twisted"] + [[package]] name = "platformdirs" version = "4.3.6" description = "A small Python package for determining appropriate platform-specific dirs, e.g. a `user data dir`." optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "platformdirs-4.3.6-py3-none-any.whl", hash = "sha256:73e575e1408ab8103900836b97580d5307456908a03e92031bab39e4554cc3fb"}, {file = "platformdirs-4.3.6.tar.gz", hash = "sha256:357fb2acbc885b0419afd3ce3ed34564c13c9b95c89360cd9563f73aa5e2b907"}, @@ -437,6 +473,7 @@ version = "1.5.0" description = "plugin and hook calling mechanisms for python" optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, @@ -452,6 +489,7 @@ version = "2.12.1" description = "Python style guide checker" optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "pycodestyle-2.12.1-py2.py3-none-any.whl", hash = "sha256:46f0fb92069a7c28ab7bb558f05bfc0110dac69a0cd23c61ea0040283a9d78b3"}, {file = "pycodestyle-2.12.1.tar.gz", hash = "sha256:6838eae08bbce4f6accd5d5572075c63626a15ee3e6f842df996bf62f6d73521"}, @@ -463,6 +501,7 @@ version = "2.22" description = "C parser in Python" optional = false python-versions = ">=3.8" +groups = ["main", "dev"] files = [ {file = "pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc"}, {file = "pycparser-2.22.tar.gz", hash = "sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6"}, @@ -474,6 +513,7 @@ version = "3.2.0" description = "passive checker of Python programs" optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "pyflakes-3.2.0-py2.py3-none-any.whl", hash = "sha256:84b5be138a2dfbb40689ca07e2152deb896a65c3a3e24c251c5c62489568074a"}, {file = "pyflakes-3.2.0.tar.gz", hash = "sha256:1c61603ff154621fb2a9172037d84dca3500def8c8b630657d1701f026f8af3f"}, @@ -485,6 +525,7 @@ version = "2.10.1" description = "JSON Web Token implementation in Python" optional = false python-versions = ">=3.9" +groups = ["test"] files = [ {file = "PyJWT-2.10.1-py3-none-any.whl", hash = "sha256:dcdd193e30abefd5debf142f9adfcdd2b58004e644f25406ffaebd50bd98dacb"}, {file = "pyjwt-2.10.1.tar.gz", hash = "sha256:3cc5772eb20009233caf06e9d8a0577824723b44e6648ee0a2aedb6cf9381953"}, @@ -502,6 +543,7 @@ version = "8.3.5" description = "pytest: simple powerful testing with Python" optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "pytest-8.3.5-py3-none-any.whl", hash = "sha256:c69214aa47deac29fad6c2a4f590b9c4a9fdb16a403176fe154b79c0b4d4d820"}, {file = "pytest-8.3.5.tar.gz", hash = "sha256:f4efe70cc14e511565ac476b57c279e12a855b11f48f212af1080ef2263d3845"}, @@ -524,6 +566,7 @@ version = "0.39.0" description = "An AMQP based messaging library." optional = false python-versions = "*" +groups = ["main", "dev"] files = [ {file = "python-qpid-proton-0.39.0.tar.gz", hash = "sha256:362055ae6ab4c7f1437247c602757f30328d55c0a6986d5b68ca9798de9fce02"}, {file = "python_qpid_proton-0.39.0-cp38-abi3-macosx_11_0_x86_64.whl", hash = "sha256:f69da296ffa9e3b22f88a53fe9e27c4f4844e088a9f041061bd4f75f74f2a0af"}, @@ -542,6 +585,7 @@ version = "2.32.3" description = "Python HTTP for Humans." optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "requests-2.32.3-py3-none-any.whl", hash = "sha256:70761cfe03c773ceb22aa2f671b4757976145175cdfca038c02654d061d6dcc6"}, {file = "requests-2.32.3.tar.gz", hash = "sha256:55365417734eb18255590a9ff9eb97e9e1da868d4ccd6402399eaf68af20a760"}, @@ -563,6 +607,7 @@ version = "0.10.2" description = "Python Library for Tom's Obvious, Minimal Language" optional = false python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" +groups = ["dev"] files = [ {file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"}, {file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"}, @@ -574,6 +619,8 @@ version = "2.2.1" description = "A lil' TOML parser" optional = false python-versions = ">=3.8" +groups = ["dev"] +markers = "python_version < \"3.11\"" files = [ {file = "tomli-2.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249"}, {file = "tomli-2.2.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:023aa114dd824ade0100497eb2318602af309e5a55595f76b626d6d9f3b7b0a6"}, @@ -615,6 +662,7 @@ version = "4.13.0" description = "Backported and Experimental Type Hints for Python 3.8+" optional = false python-versions = ">=3.8" +groups = ["main", "dev"] files = [ {file = "typing_extensions-4.13.0-py3-none-any.whl", hash = "sha256:c8dd92cc0d6425a97c18fbb9d1954e5ff92c1ca881a309c45f06ebc0b79058e5"}, {file = "typing_extensions-4.13.0.tar.gz", hash = "sha256:0a4ac55a5820789d87e297727d229866c9650f6521b64206413c4fbada24d95b"}, @@ -626,18 +674,19 @@ version = "2.3.0" description = "HTTP library with thread-safe connection pooling, file post, and more." optional = false python-versions = ">=3.9" +groups = ["dev"] files = [ {file = "urllib3-2.3.0-py3-none-any.whl", hash = "sha256:1cee9ad369867bfdbbb48b7dd50374c0967a0bb7710050facf0dd6911440e3df"}, {file = "urllib3-2.3.0.tar.gz", hash = "sha256:f8c5449b3cf0861679ce7e0503c7b44b5ec981bec0d1d3795a07f1ba96f0204d"}, ] [package.extras] -brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"] +brotli = ["brotli (>=1.0.9) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; platform_python_implementation != \"CPython\""] h2 = ["h2 (>=4,<5)"] socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] zstd = ["zstandard (>=0.18.0)"] [metadata] -lock-version = "2.0" +lock-version = "2.1" python-versions = "^3.9" -content-hash = "ed04935820eda364360c0d157a4bd3d271f8a6f087145950202c5a57a1b113e0" +content-hash = "01dd978b9b94a970a759676e47739c4fe414021e2b9dc1b193750e6eb546c075" diff --git a/pyproject.toml b/pyproject.toml index 59b9df0..a7f7045 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,9 +9,12 @@ readme = "README.md" [tool.poetry.dependencies] python = "^3.9" python-qpid-proton = "^0.39.0" -pyjwt = "^2.10.1" typing-extensions = "^4.13.0" +[tool.poetry.group.test.dependencies] +pyjwt = "^2.10.1" +pika = "^1.3.2" + [tool.poetry.group.dev.dependencies] flake8 = "^7.1.1" isort = "^5.9.3" diff --git a/tests/test_amqp_091.py b/tests/test_amqp_091.py new file mode 100644 index 0000000..e0c494c --- /dev/null +++ b/tests/test_amqp_091.py @@ -0,0 +1,68 @@ +import functools + +import pika + +from rabbitmq_amqp_python_client import ( + AddressHelper, + Connection, + Converter, + OutcomeState, + QuorumQueueSpecification, +) +from rabbitmq_amqp_python_client.qpid.proton import Message + + +def test_publish_queue(connection: Connection) -> None: + queue_name = "amqp091-queue" + management = connection.management() + + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + + raised = False + + publisher = None + accepted = False + + try: + publisher = connection.publisher( + destination=AddressHelper.queue_address(queue_name) + ) + status = publisher.publish( + Message(body=Converter.string_to_bytes("my_test_string_for_amqp")) + ) + if status.remote_state == OutcomeState.ACCEPTED: + accepted = True + except Exception: + raised = True + + if publisher is not None: + publisher.close() + + assert accepted is True + assert raised is False + + credentials = pika.PlainCredentials("guest", "guest") + parameters = pika.ConnectionParameters("localhost", credentials=credentials) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + + def on_message(chan, method_frame, header_frame, body, userdata=None): + """Called when a message is received. Log message and ack it.""" + chan.basic_ack(delivery_tag=method_frame.delivery_tag) + assert body is not None + body_text = Converter.bytes_to_string(body) + assert body_text is not None + assert body_text == "my_test_string_for_amqp" + channel.stop_consuming() + + on_message_callback = functools.partial(on_message, userdata="on_message_userdata") + channel.basic_qos( + prefetch_count=1, + ) + channel.basic_consume(queue_name, on_message_callback) + + channel.start_consuming() + connection.close() + + management.delete_queue(queue_name) + management.close() From 6bb4edaa64efbc326bf1b3f073abfe0e11f6192a Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 6 May 2025 16:19:14 +0200 Subject: [PATCH 18/20] documentation --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 46f483e..a41bac0 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +from rabbitmq_amqp_python_client import Converter + # RabbitMQ AMQP 1.0 Python Client This library is in early stages of development. It is meant to be used with RabbitMQ 4.0. @@ -83,7 +85,7 @@ For example: # publish messages for i in range(messages_to_publish): - publisher.publish(Message(body="test")) + publisher.publish(Message(body=Converter.string_to_bytes("test"))) publisher.close() ``` From 872dbb3a8f806d38fb680e44dceaa52e7e7e10b8 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 6 May 2025 16:42:06 +0200 Subject: [PATCH 19/20] validate message --- examples/streams/example_with_streams.py | 2 +- .../qpid/proton/_message.py | 2 +- tests/test_publisher.py | 25 +++++++++++++++++++ 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/examples/streams/example_with_streams.py b/examples/streams/example_with_streams.py index 79776a6..dc031ca 100644 --- a/examples/streams/example_with_streams.py +++ b/examples/streams/example_with_streams.py @@ -85,7 +85,7 @@ def create_connection(environment: Environment) -> Connection: def main() -> None: - queue_name = "example-queue" + queue_name = "stream-example-queue" print("connection to amqp server") environment = Environment("amqp://guest:guest@localhost:5672/") diff --git a/rabbitmq_amqp_python_client/qpid/proton/_message.py b/rabbitmq_amqp_python_client/qpid/proton/_message.py index b7387f5..ad11cd4 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_message.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_message.py @@ -113,7 +113,7 @@ class Message(object): """ Default AMQP message priority""" def __init__( - self, body: Union[str, bytes, dict, None] = None, inferred=True, **kwargs + self, body: Union[bytes, None] = None, inferred=True, **kwargs ) -> None: # validate the types diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 51410ac..0847717 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -19,6 +19,31 @@ from .utils import create_binding, publish_per_message +def test_validate_message_for_publishing(connection: Connection) -> None: + queue_name = "validate-publishing" + management = connection.management() + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + publisher = connection.publisher( + destination=AddressHelper.queue_address(queue_name) + ) + try: + publisher.publish( + Message(body=Converter.string_to_bytes("test"), inferred=False) + ) + except ArgumentOutOfRangeException as e: + assert e.msg == "Message inferred must be True" + + try: + publisher.publish(Message(body="test")) + except ArgumentOutOfRangeException as e: + assert e.msg == "Message body must be of type bytes or None" + + try: + publisher.publish(Message(body={"key": "value"})) + except ArgumentOutOfRangeException as e: + assert e.msg == "Message body must be of type bytes or None" + + def test_publish_queue(connection: Connection) -> None: queue_name = "test-queue" From 73ee42ca21b27b08be44a5f5f2dcc33200067735 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 7 May 2025 09:34:19 +0200 Subject: [PATCH 20/20] dependecies --- poetry.lock | 6 +++--- pyproject.toml | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/poetry.lock b/poetry.lock index 01df370..2705966 100644 --- a/poetry.lock +++ b/poetry.lock @@ -439,7 +439,7 @@ version = "1.3.2" description = "Pika Python AMQP Client Library" optional = false python-versions = ">=3.7" -groups = ["test"] +groups = ["dev"] files = [ {file = "pika-1.3.2-py3-none-any.whl", hash = "sha256:0779a7c1fafd805672796085560d290213a465e4f6f76a6fb19e378d8041a14f"}, {file = "pika-1.3.2.tar.gz", hash = "sha256:b2a327ddddf8570b4965b3576ac77091b850262d34ce8c1d8cb4e4146aa4145f"}, @@ -525,7 +525,7 @@ version = "2.10.1" description = "JSON Web Token implementation in Python" optional = false python-versions = ">=3.9" -groups = ["test"] +groups = ["dev"] files = [ {file = "PyJWT-2.10.1-py3-none-any.whl", hash = "sha256:dcdd193e30abefd5debf142f9adfcdd2b58004e644f25406ffaebd50bd98dacb"}, {file = "pyjwt-2.10.1.tar.gz", hash = "sha256:3cc5772eb20009233caf06e9d8a0577824723b44e6648ee0a2aedb6cf9381953"}, @@ -689,4 +689,4 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.1" python-versions = "^3.9" -content-hash = "01dd978b9b94a970a759676e47739c4fe414021e2b9dc1b193750e6eb546c075" +content-hash = "70ddd7eaf9b665c8bd6255196cb8f0d738ad8830a11d418490f76d99f627f34a" diff --git a/pyproject.toml b/pyproject.toml index a7f7045..9461e1e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,11 +11,10 @@ python = "^3.9" python-qpid-proton = "^0.39.0" typing-extensions = "^4.13.0" -[tool.poetry.group.test.dependencies] -pyjwt = "^2.10.1" -pika = "^1.3.2" [tool.poetry.group.dev.dependencies] +pyjwt = "^2.10.1" +pika = "^1.3.2" flake8 = "^7.1.1" isort = "^5.9.3" mypy = "^0.910"