From e05f872b687906c6e097dece837b5261fc0e8559 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Tue, 28 Jan 2025 09:55:37 +0100 Subject: [PATCH 01/15] managing disconnections --- .../getting_started/reconnection_example.py | 214 ++++++++++++++++++ poetry.lock | 196 +++++++++++++--- pyproject.toml | 1 + rabbitmq_amqp_python_client/__init__.py | 2 + rabbitmq_amqp_python_client/connection.py | 17 +- .../qpid/proton/_utils.py | 9 + tests/http_requests.py | 31 +++ tests/test_connection.py | 7 + tests/test_publisher.py | 43 ++++ 9 files changed, 488 insertions(+), 32 deletions(-) create mode 100644 examples/getting_started/reconnection_example.py create mode 100644 tests/http_requests.py diff --git a/examples/getting_started/reconnection_example.py b/examples/getting_started/reconnection_example.py new file mode 100644 index 0000000..11dbe51 --- /dev/null +++ b/examples/getting_started/reconnection_example.py @@ -0,0 +1,214 @@ +# type: ignore + + +import time + +from rabbitmq_amqp_python_client import ( + AddressHelper, + AMQPMessagingHandler, + BindingSpecification, + Connection, + ConnectionClosed, + Event, + ExchangeSpecification, + Message, + QuorumQueueSpecification, +) + +connection = None +management = None +publisher = None +consumer = None + + +def on_disconnected(): + + print("disconnected") + exchange_name = "test-exchange" + queue_name = "example-queue" + routing_key = "routing-key" + + global connection + global management + global publisher + global consumer + + addr = AddressHelper.exchange_address(exchange_name, routing_key) + addr_queue = AddressHelper.queue_address(queue_name) + + connection = create_connection() + if management is not None: + management = connection.management() + if publisher is not None: + publisher = connection.publisher(addr) + if consumer is not None: + consumer = connection.consumer(addr_queue, handler=MyMessageHandler()) + + +class MyMessageHandler(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._count = 0 + + def on_message(self, event: Event): + print("received message: " + str(event.message.annotations)) + + # accepting + self.delivery_context.accept(event) + + # in case of rejection (+eventually deadlettering) + # self.delivery_context.discard(event) + + # in case of requeuing + # self.delivery_context.requeue(event) + + # annotations = {} + # annotations[symbol('x-opt-string')] = 'x-test1' + # in case of requeuing with annotations added + # self.delivery_context.requeue_with_annotations(event, annotations) + + # in case of rejection with annotations added + # self.delivery_context.discard_with_annotations(event) + + print("count " + str(self._count)) + + self._count = self._count + 1 + + if self._count == 100: + print("closing receiver") + # if you want you can add cleanup operations here + # event.receiver.close() + # event.connection.close() + + def on_connection_closed(self, event: Event): + # if you want you can add cleanup operations here + print("connection closed") + + def on_link_closed(self, event: Event) -> None: + # if you want you can add cleanup operations here + print("link closed") + + +def create_connection() -> Connection: + connection = Connection( + "amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnected + ) + connection.dial() + + return connection + + +def main() -> None: + + exchange_name = "test-exchange" + queue_name = "example-queue" + routing_key = "routing-key" + messages_to_publish = 50000 + + global connection + global management + global publisher + global consumer + + print("connection to amqp server") + if connection is None: + connection = create_connection() + + if management is None: + management = connection.management() + + print("declaring exchange and queue") + management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) + + management.declare_queue( + QuorumQueueSpecification(name=queue_name) + # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter") + ) + + print("binding queue to exchange") + bind_name = management.bind( + BindingSpecification( + source_exchange=exchange_name, + destination_queue=queue_name, + binding_key=routing_key, + ) + ) + + addr = AddressHelper.exchange_address(exchange_name, routing_key) + + addr_queue = AddressHelper.queue_address(queue_name) + + print("create a publisher and publish a test message") + if publisher is None: + publisher = connection.publisher(addr) + + print("purging the queue") + messages_purged = management.purge_queue(queue_name) + + print("messages purged: " + str(messages_purged)) + # management.close() + + # publish 10 messages + while True: + for i in range(messages_to_publish): + + if i % 1000 == 0: + print("publishing") + try: + publisher.publish(Message(body="test")) + except ConnectionClosed: + print("publisher closing exception, resubmitting") + continue + + print("closing") + try: + publisher.close() + except ConnectionClosed: + print("publisher closing exception, resubmitting") + continue + break + + print( + "create a consumer and consume the test message - press control + c to terminate to consume" + ) + if consumer is None: + consumer = connection.consumer(addr_queue, handler=MyMessageHandler()) + + while True: + try: + consumer.run() + except KeyboardInterrupt: + pass + except ConnectionClosed: + time.sleep(1) + continue + except Exception as e: + print("consumer exited for exception " + str(e)) + + break + + print("cleanup") + consumer.close() + # once we finish consuming if we close the connection we need to create a new one + # connection = create_connection() + # management = connection.management() + + print("unbind") + management.unbind(bind_name) + + print("delete queue") + management.delete_queue(queue_name) + + print("delete exchange") + management.delete_exchange(exchange_name) + + print("closing connections") + management.close() + print("after management closing") + connection.close() + print("after connection closing") + + +if __name__ == "__main__": + main() diff --git a/poetry.lock b/poetry.lock index 954a78c..7e24b1a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.0.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "black" @@ -6,7 +6,6 @@ version = "24.10.0" description = "The uncompromising code formatter." optional = false python-versions = ">=3.9" -groups = ["dev"] files = [ {file = "black-24.10.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e6668650ea4b685440857138e5fe40cde4d652633b1bdffc62933d0db4ed9812"}, {file = "black-24.10.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:1c536fcf674217e87b8cc3657b81809d3c085d7bf3ef262ead700da345bfa6ea"}, @@ -47,13 +46,23 @@ d = ["aiohttp (>=3.10)"] jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"] uvloop = ["uvloop (>=0.15.2)"] +[[package]] +name = "certifi" +version = "2024.12.14" +description = "Python package for providing Mozilla's CA Bundle." +optional = false +python-versions = ">=3.6" +files = [ + {file = "certifi-2024.12.14-py3-none-any.whl", hash = "sha256:1275f7a45be9464efc1173084eaa30f866fe2e47d389406136d332ed4967ec56"}, + {file = "certifi-2024.12.14.tar.gz", hash = "sha256:b650d30f370c2b724812bee08008be0c4163b163ddaec3f2546c1caf65f191db"}, +] + [[package]] name = "cffi" version = "1.17.1" description = "Foreign Function Interface for Python calling C code." optional = false python-versions = ">=3.8" -groups = ["main", "dev"] files = [ {file = "cffi-1.17.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:df8b1c11f177bc2313ec4b2d46baec87a5f3e71fc8b45dab2ee7cae86d9aba14"}, {file = "cffi-1.17.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8f2cdc858323644ab277e9bb925ad72ae0e67f69e804f4898c070998d50b1a67"}, @@ -127,13 +136,113 @@ files = [ [package.dependencies] pycparser = "*" +[[package]] +name = "charset-normalizer" +version = "3.4.1" +description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." +optional = false +python-versions = ">=3.7" +files = [ + {file = "charset_normalizer-3.4.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:91b36a978b5ae0ee86c394f5a54d6ef44db1de0815eb43de826d41d21e4af3de"}, + {file = "charset_normalizer-3.4.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7461baadb4dc00fd9e0acbe254e3d7d2112e7f92ced2adc96e54ef6501c5f176"}, + {file = "charset_normalizer-3.4.1-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e218488cd232553829be0664c2292d3af2eeeb94b32bea483cf79ac6a694e037"}, + {file = "charset_normalizer-3.4.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:80ed5e856eb7f30115aaf94e4a08114ccc8813e6ed1b5efa74f9f82e8509858f"}, + {file = "charset_normalizer-3.4.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b010a7a4fd316c3c484d482922d13044979e78d1861f0e0650423144c616a46a"}, + {file = "charset_normalizer-3.4.1-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:4532bff1b8421fd0a320463030c7520f56a79c9024a4e88f01c537316019005a"}, + {file = "charset_normalizer-3.4.1-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:d973f03c0cb71c5ed99037b870f2be986c3c05e63622c017ea9816881d2dd247"}, + {file = "charset_normalizer-3.4.1-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:3a3bd0dcd373514dcec91c411ddb9632c0d7d92aed7093b8c3bbb6d69ca74408"}, + {file = "charset_normalizer-3.4.1-cp310-cp310-musllinux_1_2_ppc64le.whl", hash = "sha256:d9c3cdf5390dcd29aa8056d13e8e99526cda0305acc038b96b30352aff5ff2bb"}, + {file = "charset_normalizer-3.4.1-cp310-cp310-musllinux_1_2_s390x.whl", hash = "sha256:2bdfe3ac2e1bbe5b59a1a63721eb3b95fc9b6817ae4a46debbb4e11f6232428d"}, + {file = "charset_normalizer-3.4.1-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:eab677309cdb30d047996b36d34caeda1dc91149e4fdca0b1a039b3f79d9a807"}, + {file = "charset_normalizer-3.4.1-cp310-cp310-win32.whl", hash = "sha256:c0429126cf75e16c4f0ad00ee0eae4242dc652290f940152ca8c75c3a4b6ee8f"}, + {file = "charset_normalizer-3.4.1-cp310-cp310-win_amd64.whl", hash = "sha256:9f0b8b1c6d84c8034a44893aba5e767bf9c7a211e313a9605d9c617d7083829f"}, + {file = "charset_normalizer-3.4.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:8bfa33f4f2672964266e940dd22a195989ba31669bd84629f05fab3ef4e2d125"}, + {file = "charset_normalizer-3.4.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:28bf57629c75e810b6ae989f03c0828d64d6b26a5e205535585f96093e405ed1"}, + {file = "charset_normalizer-3.4.1-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f08ff5e948271dc7e18a35641d2f11a4cd8dfd5634f55228b691e62b37125eb3"}, + {file = "charset_normalizer-3.4.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:234ac59ea147c59ee4da87a0c0f098e9c8d169f4dc2a159ef720f1a61bbe27cd"}, + {file = "charset_normalizer-3.4.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fd4ec41f914fa74ad1b8304bbc634b3de73d2a0889bd32076342a573e0779e00"}, + {file = "charset_normalizer-3.4.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:eea6ee1db730b3483adf394ea72f808b6e18cf3cb6454b4d86e04fa8c4327a12"}, + {file = "charset_normalizer-3.4.1-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:c96836c97b1238e9c9e3fe90844c947d5afbf4f4c92762679acfe19927d81d77"}, + {file = "charset_normalizer-3.4.1-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:4d86f7aff21ee58f26dcf5ae81a9addbd914115cdebcbb2217e4f0ed8982e146"}, + {file = "charset_normalizer-3.4.1-cp311-cp311-musllinux_1_2_ppc64le.whl", hash = "sha256:09b5e6733cbd160dcc09589227187e242a30a49ca5cefa5a7edd3f9d19ed53fd"}, + {file = "charset_normalizer-3.4.1-cp311-cp311-musllinux_1_2_s390x.whl", hash = "sha256:5777ee0881f9499ed0f71cc82cf873d9a0ca8af166dfa0af8ec4e675b7df48e6"}, + {file = "charset_normalizer-3.4.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:237bdbe6159cff53b4f24f397d43c6336c6b0b42affbe857970cefbb620911c8"}, + {file = "charset_normalizer-3.4.1-cp311-cp311-win32.whl", hash = "sha256:8417cb1f36cc0bc7eaba8ccb0e04d55f0ee52df06df3ad55259b9a323555fc8b"}, + {file = "charset_normalizer-3.4.1-cp311-cp311-win_amd64.whl", hash = "sha256:d7f50a1f8c450f3925cb367d011448c39239bb3eb4117c36a6d354794de4ce76"}, + {file = "charset_normalizer-3.4.1-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:73d94b58ec7fecbc7366247d3b0b10a21681004153238750bb67bd9012414545"}, + {file = "charset_normalizer-3.4.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:dad3e487649f498dd991eeb901125411559b22e8d7ab25d3aeb1af367df5efd7"}, + {file = "charset_normalizer-3.4.1-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:c30197aa96e8eed02200a83fba2657b4c3acd0f0aa4bdc9f6c1af8e8962e0757"}, + {file = "charset_normalizer-3.4.1-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:2369eea1ee4a7610a860d88f268eb39b95cb588acd7235e02fd5a5601773d4fa"}, + {file = "charset_normalizer-3.4.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc2722592d8998c870fa4e290c2eec2c1569b87fe58618e67d38b4665dfa680d"}, + {file = "charset_normalizer-3.4.1-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ffc9202a29ab3920fa812879e95a9e78b2465fd10be7fcbd042899695d75e616"}, + {file = "charset_normalizer-3.4.1-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:804a4d582ba6e5b747c625bf1255e6b1507465494a40a2130978bda7b932c90b"}, + {file = "charset_normalizer-3.4.1-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:0f55e69f030f7163dffe9fd0752b32f070566451afe180f99dbeeb81f511ad8d"}, + {file = "charset_normalizer-3.4.1-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:c4c3e6da02df6fa1410a7680bd3f63d4f710232d3139089536310d027950696a"}, + {file = "charset_normalizer-3.4.1-cp312-cp312-musllinux_1_2_s390x.whl", hash = "sha256:5df196eb874dae23dcfb968c83d4f8fdccb333330fe1fc278ac5ceeb101003a9"}, + {file = "charset_normalizer-3.4.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:e358e64305fe12299a08e08978f51fc21fac060dcfcddd95453eabe5b93ed0e1"}, + {file = "charset_normalizer-3.4.1-cp312-cp312-win32.whl", hash = "sha256:9b23ca7ef998bc739bf6ffc077c2116917eabcc901f88da1b9856b210ef63f35"}, + {file = "charset_normalizer-3.4.1-cp312-cp312-win_amd64.whl", hash = "sha256:6ff8a4a60c227ad87030d76e99cd1698345d4491638dfa6673027c48b3cd395f"}, + {file = "charset_normalizer-3.4.1-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:aabfa34badd18f1da5ec1bc2715cadc8dca465868a4e73a0173466b688f29dda"}, + {file = "charset_normalizer-3.4.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:22e14b5d70560b8dd51ec22863f370d1e595ac3d024cb8ad7d308b4cd95f8313"}, + {file = "charset_normalizer-3.4.1-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:8436c508b408b82d87dc5f62496973a1805cd46727c34440b0d29d8a2f50a6c9"}, + {file = "charset_normalizer-3.4.1-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:2d074908e1aecee37a7635990b2c6d504cd4766c7bc9fc86d63f9c09af3fa11b"}, + {file = "charset_normalizer-3.4.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:955f8851919303c92343d2f66165294848d57e9bba6cf6e3625485a70a038d11"}, + {file = "charset_normalizer-3.4.1-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:44ecbf16649486d4aebafeaa7ec4c9fed8b88101f4dd612dcaf65d5e815f837f"}, + {file = "charset_normalizer-3.4.1-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:0924e81d3d5e70f8126529951dac65c1010cdf117bb75eb02dd12339b57749dd"}, + {file = "charset_normalizer-3.4.1-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:2967f74ad52c3b98de4c3b32e1a44e32975e008a9cd2a8cc8966d6a5218c5cb2"}, + {file = "charset_normalizer-3.4.1-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:c75cb2a3e389853835e84a2d8fb2b81a10645b503eca9bcb98df6b5a43eb8886"}, + {file = "charset_normalizer-3.4.1-cp313-cp313-musllinux_1_2_s390x.whl", hash = "sha256:09b26ae6b1abf0d27570633b2b078a2a20419c99d66fb2823173d73f188ce601"}, + {file = "charset_normalizer-3.4.1-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:fa88b843d6e211393a37219e6a1c1df99d35e8fd90446f1118f4216e307e48cd"}, + {file = "charset_normalizer-3.4.1-cp313-cp313-win32.whl", hash = "sha256:eb8178fe3dba6450a3e024e95ac49ed3400e506fd4e9e5c32d30adda88cbd407"}, + {file = "charset_normalizer-3.4.1-cp313-cp313-win_amd64.whl", hash = "sha256:b1ac5992a838106edb89654e0aebfc24f5848ae2547d22c2c3f66454daa11971"}, + {file = "charset_normalizer-3.4.1-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f30bf9fd9be89ecb2360c7d94a711f00c09b976258846efe40db3d05828e8089"}, + {file = "charset_normalizer-3.4.1-cp37-cp37m-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:97f68b8d6831127e4787ad15e6757232e14e12060bec17091b85eb1486b91d8d"}, + {file = "charset_normalizer-3.4.1-cp37-cp37m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:7974a0b5ecd505609e3b19742b60cee7aa2aa2fb3151bc917e6e2646d7667dcf"}, + {file = "charset_normalizer-3.4.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fc54db6c8593ef7d4b2a331b58653356cf04f67c960f584edb7c3d8c97e8f39e"}, + {file = "charset_normalizer-3.4.1-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:311f30128d7d333eebd7896965bfcfbd0065f1716ec92bd5638d7748eb6f936a"}, + {file = "charset_normalizer-3.4.1-cp37-cp37m-musllinux_1_2_aarch64.whl", hash = "sha256:7d053096f67cd1241601111b698f5cad775f97ab25d81567d3f59219b5f1adbd"}, + {file = "charset_normalizer-3.4.1-cp37-cp37m-musllinux_1_2_i686.whl", hash = "sha256:807f52c1f798eef6cf26beb819eeb8819b1622ddfeef9d0977a8502d4db6d534"}, + {file = "charset_normalizer-3.4.1-cp37-cp37m-musllinux_1_2_ppc64le.whl", hash = "sha256:dccbe65bd2f7f7ec22c4ff99ed56faa1e9f785482b9bbd7c717e26fd723a1d1e"}, + {file = "charset_normalizer-3.4.1-cp37-cp37m-musllinux_1_2_s390x.whl", hash = "sha256:2fb9bd477fdea8684f78791a6de97a953c51831ee2981f8e4f583ff3b9d9687e"}, + {file = "charset_normalizer-3.4.1-cp37-cp37m-musllinux_1_2_x86_64.whl", hash = "sha256:01732659ba9b5b873fc117534143e4feefecf3b2078b0a6a2e925271bb6f4cfa"}, + {file = "charset_normalizer-3.4.1-cp37-cp37m-win32.whl", hash = "sha256:7a4f97a081603d2050bfaffdefa5b02a9ec823f8348a572e39032caa8404a487"}, + {file = "charset_normalizer-3.4.1-cp37-cp37m-win_amd64.whl", hash = "sha256:7b1bef6280950ee6c177b326508f86cad7ad4dff12454483b51d8b7d673a2c5d"}, + {file = "charset_normalizer-3.4.1-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:ecddf25bee22fe4fe3737a399d0d177d72bc22be6913acfab364b40bce1ba83c"}, + {file = "charset_normalizer-3.4.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8c60ca7339acd497a55b0ea5d506b2a2612afb2826560416f6894e8b5770d4a9"}, + {file = "charset_normalizer-3.4.1-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:b7b2d86dd06bfc2ade3312a83a5c364c7ec2e3498f8734282c6c3d4b07b346b8"}, + {file = "charset_normalizer-3.4.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:dd78cfcda14a1ef52584dbb008f7ac81c1328c0f58184bf9a84c49c605002da6"}, + {file = "charset_normalizer-3.4.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6e27f48bcd0957c6d4cb9d6fa6b61d192d0b13d5ef563e5f2ae35feafc0d179c"}, + {file = "charset_normalizer-3.4.1-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:01ad647cdd609225c5350561d084b42ddf732f4eeefe6e678765636791e78b9a"}, + {file = "charset_normalizer-3.4.1-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:619a609aa74ae43d90ed2e89bdd784765de0a25ca761b93e196d938b8fd1dbbd"}, + {file = "charset_normalizer-3.4.1-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:89149166622f4db9b4b6a449256291dc87a99ee53151c74cbd82a53c8c2f6ccd"}, + {file = "charset_normalizer-3.4.1-cp38-cp38-musllinux_1_2_ppc64le.whl", hash = "sha256:7709f51f5f7c853f0fb938bcd3bc59cdfdc5203635ffd18bf354f6967ea0f824"}, + {file = "charset_normalizer-3.4.1-cp38-cp38-musllinux_1_2_s390x.whl", hash = "sha256:345b0426edd4e18138d6528aed636de7a9ed169b4aaf9d61a8c19e39d26838ca"}, + {file = "charset_normalizer-3.4.1-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:0907f11d019260cdc3f94fbdb23ff9125f6b5d1039b76003b5b0ac9d6a6c9d5b"}, + {file = "charset_normalizer-3.4.1-cp38-cp38-win32.whl", hash = "sha256:ea0d8d539afa5eb2728aa1932a988a9a7af94f18582ffae4bc10b3fbdad0626e"}, + {file = "charset_normalizer-3.4.1-cp38-cp38-win_amd64.whl", hash = "sha256:329ce159e82018d646c7ac45b01a430369d526569ec08516081727a20e9e4af4"}, + {file = "charset_normalizer-3.4.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:b97e690a2118911e39b4042088092771b4ae3fc3aa86518f84b8cf6888dbdb41"}, + {file = "charset_normalizer-3.4.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:78baa6d91634dfb69ec52a463534bc0df05dbd546209b79a3880a34487f4b84f"}, + {file = "charset_normalizer-3.4.1-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:1a2bc9f351a75ef49d664206d51f8e5ede9da246602dc2d2726837620ea034b2"}, + {file = "charset_normalizer-3.4.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:75832c08354f595c760a804588b9357d34ec00ba1c940c15e31e96d902093770"}, + {file = "charset_normalizer-3.4.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0af291f4fe114be0280cdd29d533696a77b5b49cfde5467176ecab32353395c4"}, + {file = "charset_normalizer-3.4.1-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0167ddc8ab6508fe81860a57dd472b2ef4060e8d378f0cc555707126830f2537"}, + {file = "charset_normalizer-3.4.1-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:2a75d49014d118e4198bcee5ee0a6f25856b29b12dbf7cd012791f8a6cc5c496"}, + {file = "charset_normalizer-3.4.1-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:363e2f92b0f0174b2f8238240a1a30142e3db7b957a5dd5689b0e75fb717cc78"}, + {file = "charset_normalizer-3.4.1-cp39-cp39-musllinux_1_2_ppc64le.whl", hash = "sha256:ab36c8eb7e454e34e60eb55ca5d241a5d18b2c6244f6827a30e451c42410b5f7"}, + {file = "charset_normalizer-3.4.1-cp39-cp39-musllinux_1_2_s390x.whl", hash = "sha256:4c0907b1928a36d5a998d72d64d8eaa7244989f7aaaf947500d3a800c83a3fd6"}, + {file = "charset_normalizer-3.4.1-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:04432ad9479fa40ec0f387795ddad4437a2b50417c69fa275e212933519ff294"}, + {file = "charset_normalizer-3.4.1-cp39-cp39-win32.whl", hash = "sha256:3bed14e9c89dcb10e8f3a29f9ccac4955aebe93c71ae803af79265c9ca5644c5"}, + {file = "charset_normalizer-3.4.1-cp39-cp39-win_amd64.whl", hash = "sha256:49402233c892a461407c512a19435d1ce275543138294f7ef013f0b63d5d3765"}, + {file = "charset_normalizer-3.4.1-py3-none-any.whl", hash = "sha256:d98b1668f06378c6dbefec3b92299716b931cd4e6061f3c875a71ced1780ab85"}, + {file = "charset_normalizer-3.4.1.tar.gz", hash = "sha256:44251f18cd68a75b56585dd00dae26183e102cd5e0f9f1466e6df5da2ed64ea3"}, +] + [[package]] name = "click" version = "8.1.8" description = "Composable command line interface toolkit" optional = false python-versions = ">=3.7" -groups = ["dev"] files = [ {file = "click-8.1.8-py3-none-any.whl", hash = "sha256:63c132bbbed01578a06712a2d1f497bb62d9c1c0d329b7903a866228027263b2"}, {file = "click-8.1.8.tar.gz", hash = "sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a"}, @@ -148,8 +257,6 @@ version = "0.4.6" description = "Cross-platform colored terminal text." optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" -groups = ["dev"] -markers = "sys_platform == \"win32\" or platform_system == \"Windows\"" files = [ {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, @@ -161,8 +268,6 @@ version = "1.2.2" description = "Backport of PEP 654 (exception groups)" optional = false python-versions = ">=3.7" -groups = ["dev"] -markers = "python_version < \"3.11\"" files = [ {file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"}, {file = "exceptiongroup-1.2.2.tar.gz", hash = "sha256:47c2edf7c6738fafb49fd34290706d1a1a2f4d1c6df275526b62cbb4aa5393cc"}, @@ -177,7 +282,6 @@ version = "7.1.1" description = "the modular source code checker: pep8 pyflakes and co" optional = false python-versions = ">=3.8.1" -groups = ["dev"] files = [ {file = "flake8-7.1.1-py2.py3-none-any.whl", hash = "sha256:597477df7860daa5aa0fdd84bf5208a043ab96b8e96ab708770ae0364dd03213"}, {file = "flake8-7.1.1.tar.gz", hash = "sha256:049d058491e228e03e67b390f311bbf88fce2dbaa8fa673e7aea87b7198b8d38"}, @@ -188,13 +292,26 @@ mccabe = ">=0.7.0,<0.8.0" pycodestyle = ">=2.12.0,<2.13.0" pyflakes = ">=3.2.0,<3.3.0" +[[package]] +name = "idna" +version = "3.10" +description = "Internationalized Domain Names in Applications (IDNA)" +optional = false +python-versions = ">=3.6" +files = [ + {file = "idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3"}, + {file = "idna-3.10.tar.gz", hash = "sha256:12f65c9b470abda6dc35cf8e63cc574b1c52b11df2c86030af0ac09b01b13ea9"}, +] + +[package.extras] +all = ["flake8 (>=7.1.1)", "mypy (>=1.11.2)", "pytest (>=8.3.2)", "ruff (>=0.6.2)"] + [[package]] name = "iniconfig" version = "2.0.0" description = "brain-dead simple config-ini parsing" optional = false python-versions = ">=3.7" -groups = ["dev"] files = [ {file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"}, {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, @@ -206,7 +323,6 @@ version = "5.13.2" description = "A Python utility / library to sort Python imports." optional = false python-versions = ">=3.8.0" -groups = ["dev"] files = [ {file = "isort-5.13.2-py3-none-any.whl", hash = "sha256:8ca5e72a8d85860d5a3fa69b8745237f2939afe12dbf656afbcb47fe72d947a6"}, {file = "isort-5.13.2.tar.gz", hash = "sha256:48fdfcb9face5d58a4f6dde2e72a1fb8dcaf8ab26f95ab49fab84c2ddefb0109"}, @@ -221,7 +337,6 @@ version = "0.7.0" description = "McCabe checker, plugin for flake8" optional = false python-versions = ">=3.6" -groups = ["dev"] files = [ {file = "mccabe-0.7.0-py2.py3-none-any.whl", hash = "sha256:6c2d30ab6be0e4a46919781807b4f0d834ebdd6c6e3dca0bda5a15f863427b6e"}, {file = "mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325"}, @@ -233,7 +348,6 @@ version = "0.910" description = "Optional static typing for Python" optional = false python-versions = ">=3.5" -groups = ["dev"] files = [ {file = "mypy-0.910-cp35-cp35m-macosx_10_9_x86_64.whl", hash = "sha256:a155d80ea6cee511a3694b108c4494a39f42de11ee4e61e72bc424c490e46457"}, {file = "mypy-0.910-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:b94e4b785e304a04ea0828759172a15add27088520dc7e49ceade7834275bedb"}, @@ -275,7 +389,6 @@ version = "0.4.4" description = "Experimental type system extensions for programs checked with the mypy typechecker." optional = false python-versions = ">=2.7" -groups = ["dev"] files = [ {file = "mypy_extensions-0.4.4.tar.gz", hash = "sha256:c8b707883a96efe9b4bb3aaf0dcc07e7e217d7d8368eec4db4049ee9e142f4fd"}, ] @@ -286,7 +399,6 @@ version = "24.2" description = "Core utilities for Python packages" optional = false python-versions = ">=3.8" -groups = ["dev"] files = [ {file = "packaging-24.2-py3-none-any.whl", hash = "sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759"}, {file = "packaging-24.2.tar.gz", hash = "sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f"}, @@ -298,7 +410,6 @@ version = "0.12.1" description = "Utility library for gitignore style pattern matching of file paths." optional = false python-versions = ">=3.8" -groups = ["dev"] files = [ {file = "pathspec-0.12.1-py3-none-any.whl", hash = "sha256:a0d503e138a4c123b27490a4f7beda6a01c6f288df0e4a8b79c7eb0dc7b4cc08"}, {file = "pathspec-0.12.1.tar.gz", hash = "sha256:a482d51503a1ab33b1c67a6c3813a26953dbdc71c31dacaef9a838c4e29f5712"}, @@ -310,7 +421,6 @@ version = "4.3.6" description = "A small Python package for determining appropriate platform-specific dirs, e.g. a `user data dir`." optional = false python-versions = ">=3.8" -groups = ["dev"] files = [ {file = "platformdirs-4.3.6-py3-none-any.whl", hash = "sha256:73e575e1408ab8103900836b97580d5307456908a03e92031bab39e4554cc3fb"}, {file = "platformdirs-4.3.6.tar.gz", hash = "sha256:357fb2acbc885b0419afd3ce3ed34564c13c9b95c89360cd9563f73aa5e2b907"}, @@ -327,7 +437,6 @@ version = "1.5.0" description = "plugin and hook calling mechanisms for python" optional = false python-versions = ">=3.8" -groups = ["dev"] files = [ {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, @@ -343,7 +452,6 @@ version = "2.12.1" description = "Python style guide checker" optional = false python-versions = ">=3.8" -groups = ["dev"] files = [ {file = "pycodestyle-2.12.1-py2.py3-none-any.whl", hash = "sha256:46f0fb92069a7c28ab7bb558f05bfc0110dac69a0cd23c61ea0040283a9d78b3"}, {file = "pycodestyle-2.12.1.tar.gz", hash = "sha256:6838eae08bbce4f6accd5d5572075c63626a15ee3e6f842df996bf62f6d73521"}, @@ -355,7 +463,6 @@ version = "2.22" description = "C parser in Python" optional = false python-versions = ">=3.8" -groups = ["main", "dev"] files = [ {file = "pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc"}, {file = "pycparser-2.22.tar.gz", hash = "sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6"}, @@ -367,7 +474,6 @@ version = "3.2.0" description = "passive checker of Python programs" optional = false python-versions = ">=3.8" -groups = ["dev"] files = [ {file = "pyflakes-3.2.0-py2.py3-none-any.whl", hash = "sha256:84b5be138a2dfbb40689ca07e2152deb896a65c3a3e24c251c5c62489568074a"}, {file = "pyflakes-3.2.0.tar.gz", hash = "sha256:1c61603ff154621fb2a9172037d84dca3500def8c8b630657d1701f026f8af3f"}, @@ -379,7 +485,6 @@ version = "7.4.4" description = "pytest: simple powerful testing with Python" optional = false python-versions = ">=3.7" -groups = ["dev"] files = [ {file = "pytest-7.4.4-py3-none-any.whl", hash = "sha256:b090cdf5ed60bf4c45261be03239c2c1c22df034fbffe691abe93cd80cea01d8"}, {file = "pytest-7.4.4.tar.gz", hash = "sha256:2cf0005922c6ace4a3e2ec8b4080eb0d9753fdc93107415332f50ce9e7994280"}, @@ -402,7 +507,6 @@ version = "0.39.0" description = "An AMQP based messaging library." optional = false python-versions = "*" -groups = ["main", "dev"] files = [ {file = "python-qpid-proton-0.39.0.tar.gz", hash = "sha256:362055ae6ab4c7f1437247c602757f30328d55c0a6986d5b68ca9798de9fce02"}, {file = "python_qpid_proton-0.39.0-cp38-abi3-macosx_11_0_x86_64.whl", hash = "sha256:f69da296ffa9e3b22f88a53fe9e27c4f4844e088a9f041061bd4f75f74f2a0af"}, @@ -415,13 +519,33 @@ cffi = ">=1.0.0" [package.extras] opentracing = ["jaeger-client", "opentracing"] +[[package]] +name = "requests" +version = "2.32.3" +description = "Python HTTP for Humans." +optional = false +python-versions = ">=3.8" +files = [ + {file = "requests-2.32.3-py3-none-any.whl", hash = "sha256:70761cfe03c773ceb22aa2f671b4757976145175cdfca038c02654d061d6dcc6"}, + {file = "requests-2.32.3.tar.gz", hash = "sha256:55365417734eb18255590a9ff9eb97e9e1da868d4ccd6402399eaf68af20a760"}, +] + +[package.dependencies] +certifi = ">=2017.4.17" +charset-normalizer = ">=2,<4" +idna = ">=2.5,<4" +urllib3 = ">=1.21.1,<3" + +[package.extras] +socks = ["PySocks (>=1.5.6,!=1.5.7)"] +use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] + [[package]] name = "toml" version = "0.10.2" description = "Python Library for Tom's Obvious, Minimal Language" optional = false python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" -groups = ["dev"] files = [ {file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"}, {file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"}, @@ -433,8 +557,6 @@ version = "2.2.1" description = "A lil' TOML parser" optional = false python-versions = ">=3.8" -groups = ["dev"] -markers = "python_version < \"3.11\"" files = [ {file = "tomli-2.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249"}, {file = "tomli-2.2.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:023aa114dd824ade0100497eb2318602af309e5a55595f76b626d6d9f3b7b0a6"}, @@ -476,13 +598,29 @@ version = "4.12.2" description = "Backported and Experimental Type Hints for Python 3.8+" optional = false python-versions = ">=3.8" -groups = ["dev"] files = [ {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"}, {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, ] +[[package]] +name = "urllib3" +version = "2.3.0" +description = "HTTP library with thread-safe connection pooling, file post, and more." +optional = false +python-versions = ">=3.9" +files = [ + {file = "urllib3-2.3.0-py3-none-any.whl", hash = "sha256:1cee9ad369867bfdbbb48b7dd50374c0967a0bb7710050facf0dd6911440e3df"}, + {file = "urllib3-2.3.0.tar.gz", hash = "sha256:f8c5449b3cf0861679ce7e0503c7b44b5ec981bec0d1d3795a07f1ba96f0204d"}, +] + +[package.extras] +brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"] +h2 = ["h2 (>=4,<5)"] +socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] +zstd = ["zstandard (>=0.18.0)"] + [metadata] -lock-version = "2.1" +lock-version = "2.0" python-versions = "^3.9" -content-hash = "73e039485fd994b8b5d294a5eb41ddd9fa3b52f525e84ad7ccd1cfa8b55a1406" +content-hash = "69fd7879f7457a4f02975ea1a6c1ebdc16e2c8e68bf542e0d7ff3efecf45cd1a" diff --git a/pyproject.toml b/pyproject.toml index fe050c8..34a3e5b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ mypy = "^0.910" pytest = "^7.4.0" black = "^24.3.0" python-qpid-proton = "^0.39.0" +requests = "^2.31.0" [build-system] requires = ["poetry-core"] diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 8bcda4f..417f401 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -16,6 +16,7 @@ from .qpid.proton._delivery import Delivery from .qpid.proton._events import Event from .qpid.proton._message import Message +from .qpid.proton._utils import ConnectionClosed from .qpid.proton.handlers import MessagingHandler from .queues import ( ClassicQueueSpecification, @@ -59,4 +60,5 @@ "SslConfigurationContext", "ClientCert", "Delivery", + "ConnectionClosed", ] diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index bd2a11a..6667a03 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -1,5 +1,5 @@ import logging -from typing import Optional +from typing import Annotated, Callable, Optional, TypeVar from .address_helper import validate_address from .consumer import Consumer @@ -13,14 +13,21 @@ logger = logging.getLogger(__name__) +MT = TypeVar("MT") +CB = Annotated[Callable[[MT], None], "Message callback type"] + class Connection: def __init__( - self, addr: str, ssl_context: Optional[SslConfigurationContext] = None + self, + addr: str, + ssl_context: Optional[SslConfigurationContext] = None, + on_disconnection_handler: Optional[CB] = None, # type: ignore ): self._addr: str = addr self._conn: BlockingConnection self._management: Management + self._on_disconnection_handler = on_disconnection_handler self._conf_ssl_context: Optional[SslConfigurationContext] = ssl_context self._ssl_domain = None @@ -41,7 +48,11 @@ def dial(self) -> None: self._conf_ssl_context.client_cert.client_key, self._conf_ssl_context.client_cert.password, ) - self._conn = BlockingConnection(self._addr, ssl_domain=self._ssl_domain) + self._conn = BlockingConnection( + self._addr, + ssl_domain=self._ssl_domain, + on_disconnection_handler=self._on_disconnection_handler, + ) self._open() logger.debug("Connection to the server established") diff --git a/rabbitmq_amqp_python_client/qpid/proton/_utils.py b/rabbitmq_amqp_python_client/qpid/proton/_utils.py index 29d0464..77b7a48 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_utils.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_utils.py @@ -71,6 +71,11 @@ class Literal(metaclass=GenericMeta): ) from ._transport import SSLDomain +from typing import Annotated, TypeVar + +MT = TypeVar("MT") +CB = Annotated[Callable[[MT], None], "Message callback type"] + class BlockingLink: def __init__( @@ -423,6 +428,7 @@ def __init__( heartbeat: Optional[float] = None, urls: Optional[List[str]] = None, reconnect: Union[None, Literal[False], "Backoff"] = None, + on_disconnection_handler: Optional[CB] = None, **kwargs ) -> None: self.disconnected = False @@ -432,6 +438,7 @@ def __init__( self.container.start() self.conn = None self.closing = False + self._on_disconnection_handler = on_disconnection_handler # Preserve previous behaviour if neither reconnect nor urls are supplied if url is not None and urls is None and reconnect is None: reconnect = False @@ -632,6 +639,8 @@ def on_connection_remote_close(self, event: "Event") -> None: Event callback for when the link peer closes the connection. """ if event.connection.state & Endpoint.LOCAL_ACTIVE: + if self._on_disconnection_handler is not None: + event.container.schedule(0, self._on_disconnection_handler()) event.connection.close() if not self.closing: raise ConnectionClosed(event.connection) diff --git a/tests/http_requests.py b/tests/http_requests.py new file mode 100644 index 0000000..90d7202 --- /dev/null +++ b/tests/http_requests.py @@ -0,0 +1,31 @@ +import urllib.parse + +import requests +from requests.auth import HTTPBasicAuth + + +def get_connections_names() -> list: + request = "http://localhost:15672/api/connections" + responses = requests.get(request, auth=HTTPBasicAuth("guest", "guest")) + responses.raise_for_status() + connections = responses.json() + connection_names = [] + for connection in connections: + connection_names.append(connection["name"]) + return connection_names + + +def delete_connections(connection_names: []) -> int: + for connection_name in connection_names: + request = ( + "http://guest:guest@localhost:15672/api/connections/" + + urllib.parse.quote(connection_name) + ) + response = requests.delete(request, auth=HTTPBasicAuth("guest", "guest")) + print("response code" + str(response)) + return response.status_code + + +def delete_all_connections() -> None: + connection_names = get_connections_names() + delete_connections(connection_names) diff --git a/tests/test_connection.py b/tests/test_connection.py index b6a7676..88933c2 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -5,6 +5,13 @@ ) +def on_disconnected(): + + print("disconnected") + global disconnected + disconnected = True + + def test_connection() -> None: connection = Connection("amqp://guest:guest@localhost:5672/") connection.dial() diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 6391c22..225745f 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -1,3 +1,5 @@ +import time + from rabbitmq_amqp_python_client import ( AddressHelper, ArgumentOutOfRangeException, @@ -8,6 +10,17 @@ QuorumQueueSpecification, ) +from .http_requests import delete_all_connections + +disconnected = False + + +def on_disconnected(): + + print("disconnected") + global disconnected + disconnected = True + def test_publish_queue(connection: Connection) -> None: @@ -151,3 +164,33 @@ def test_publish_purge(connection: Connection) -> None: assert raised is False assert message_purged == 20 + + +def test_disconnection() -> None: + connection = Connection( + "amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnected + ) + connection.dial() + # delay + time.sleep(10) + messages_to_publish = 20 + queue_name = "test-queue" + management = connection.management() + + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + + try: + publisher = connection.publisher("/queues/" + queue_name) + for i in range(messages_to_publish): + if i == 5: + # simulate a disconnection + delete_all_connections() + publisher.publish(Message(body="test")) + + except Exception: + pass + + connection.close() + + global disconnected + assert disconnected is True From 29607e53f4a30e752bd3e9cd7e5c55d27292a665 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Tue, 28 Jan 2025 15:28:03 +0100 Subject: [PATCH 02/15] adding tests --- tests/http_requests.py | 6 +-- tests/test_publisher.py | 90 +++++++++++++++++++++++++++++++---------- 2 files changed, 70 insertions(+), 26 deletions(-) diff --git a/tests/http_requests.py b/tests/http_requests.py index 90d7202..ed0445c 100644 --- a/tests/http_requests.py +++ b/tests/http_requests.py @@ -15,15 +15,13 @@ def get_connections_names() -> list: return connection_names -def delete_connections(connection_names: []) -> int: +def delete_connections(connection_names: []) -> None: for connection_name in connection_names: request = ( "http://guest:guest@localhost:15672/api/connections/" + urllib.parse.quote(connection_name) ) - response = requests.delete(request, auth=HTTPBasicAuth("guest", "guest")) - print("response code" + str(response)) - return response.status_code + requests.delete(request, auth=HTTPBasicAuth("guest", "guest")) def delete_all_connections() -> None: diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 225745f..03b4fed 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -5,6 +5,7 @@ ArgumentOutOfRangeException, BindingSpecification, Connection, + ConnectionClosed, ExchangeSpecification, Message, QuorumQueueSpecification, @@ -12,15 +13,6 @@ from .http_requests import delete_all_connections -disconnected = False - - -def on_disconnected(): - - print("disconnected") - global disconnected - disconnected = True - def test_publish_queue(connection: Connection) -> None: @@ -166,31 +158,85 @@ def test_publish_purge(connection: Connection) -> None: assert message_purged == 20 -def test_disconnection() -> None: - connection = Connection( +def test_disconnection_reconnection() -> None: + disconnected = False + reconnected = False + generic_exception_raised = False + publisher = None + queue_name = "test-queue" + connection_test = None + + time.sleep(60) + + def on_disconnected(): + + nonlocal publisher + nonlocal queue_name + nonlocal connection_test + + # reconnect + if connection_test is not None: + connection_test = Connection("amqp://guest:guest@localhost:5672/") + connection_test.dial() + + if publisher is not None: + publisher = connection_test.publisher("/queues/" + queue_name) + + nonlocal reconnected + reconnected = True + + connection_test = Connection( "amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnected ) - connection.dial() + connection_test.dial() # delay - time.sleep(10) - messages_to_publish = 20 + time.sleep(5) + messages_to_publish = 10000 queue_name = "test-queue" - management = connection.management() + management = connection_test.management() management.declare_queue(QuorumQueueSpecification(name=queue_name)) - try: - publisher = connection.publisher("/queues/" + queue_name) + management.close() + + publisher = connection_test.publisher("/queues/" + queue_name) + while True: + for i in range(messages_to_publish): if i == 5: # simulate a disconnection delete_all_connections() - publisher.publish(Message(body="test")) + try: + publisher.publish(Message(body="test")) - except Exception: - pass + except ConnectionClosed: + disconnected = True + continue + + except Exception: + generic_exception_raised = True + + break + + publisher.close() + + # cleanup, we need to create a new connection as the previous one + # was closed by the test + + connection_test = Connection("amqp://guest:guest@localhost:5672/") + connection_test.dial() + + management = connection_test.management() + + # purge the queue and check number of published messages + message_purged = management.purge_queue(queue_name) + + management.delete_queue(queue_name) + management.close() - connection.close() + connection_test.close() - global disconnected + assert generic_exception_raised is False assert disconnected is True + assert reconnected is True + assert message_purged == messages_to_publish - 1 From 26f15a0186529c09d55d567e81c42c5453d9a918 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Wed, 29 Jan 2025 11:26:58 +0100 Subject: [PATCH 03/15] adding a test --- tests/test_connection.py | 56 ++++++++++++++++++++++++++++++++++++++++ tests/test_publisher.py | 2 -- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/tests/test_connection.py b/tests/test_connection.py index 88933c2..2c8270c 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,9 +1,15 @@ +import time + from rabbitmq_amqp_python_client import ( ClientCert, Connection, + ConnectionClosed, SslConfigurationContext, + StreamSpecification, ) +from .http_requests import delete_all_connections + def on_disconnected(): @@ -30,3 +36,53 @@ def test_connection_ssl() -> None: ), ) connection.dial() + + +def test_connection_reconnection() -> None: + + reconnected = False + connection = None + disconnected = False + + def on_disconnected(): + + nonlocal connection + + # reconnect + if connection is not None: + connection = Connection("amqp://guest:guest@localhost:5672/") + connection.dial() + + nonlocal reconnected + reconnected = True + + connection = Connection( + "amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnected + ) + connection.dial() + + # delay + time.sleep(5) + # simulate a disconnection + delete_all_connections() + # raise a reconnection + management = connection.management() + stream_name = "test_stream_info_with_validation" + queue_specification = StreamSpecification( + name=stream_name, + ) + + try: + management.declare_queue(queue_specification) + except ConnectionClosed: + disconnected = True + + # check that we reconnected + management = connection.management() + management.declare_queue(queue_specification) + management.delete_queue(stream_name) + management.close() + connection.close() + + assert disconnected is True + assert reconnected is True diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 03b4fed..9cc208e 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -166,8 +166,6 @@ def test_disconnection_reconnection() -> None: queue_name = "test-queue" connection_test = None - time.sleep(60) - def on_disconnected(): nonlocal publisher From 1eebfadf8763fd0aee2ccc905e9822dc01121cf2 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Wed, 29 Jan 2025 13:51:08 +0100 Subject: [PATCH 04/15] supporting multinode in Connection --- examples/getting_started/reconnection_example.py | 6 ++++++ rabbitmq_amqp_python_client/connection.py | 13 ++++++++++--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/examples/getting_started/reconnection_example.py b/examples/getting_started/reconnection_example.py index 11dbe51..f0dc50a 100644 --- a/examples/getting_started/reconnection_example.py +++ b/examples/getting_started/reconnection_example.py @@ -91,6 +91,12 @@ def on_link_closed(self, event: Event) -> None: def create_connection() -> Connection: + # for multinode specify a list of urls and fill the field urls of Connection instead of url + # urls = [ + # "amqp://ha_tls-rabbit_node0-1:5602/", + # "amqp://ha_tls-rabbit_node0-2:5602/", + # "amqp://ha_tls-rabbit_node0-3:5602/", + # ] connection = Connection( "amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnected ) diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index 6667a03..79bdc3f 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -20,11 +20,17 @@ class Connection: def __init__( self, - addr: str, + # single-node mode + url: Optional[str] = None, + # multi-node mode + urls: Optional[list[str]] = None, ssl_context: Optional[SslConfigurationContext] = None, on_disconnection_handler: Optional[CB] = None, # type: ignore ): - self._addr: str = addr + if url is None and urls is None: + raise ValueError("You need to specify at least an addr or a list of addr") + self._addr: Optional[str] = url + self._addrs: Optional[list[str]] = urls self._conn: BlockingConnection self._management: Management self._on_disconnection_handler = on_disconnection_handler @@ -49,7 +55,8 @@ def dial(self) -> None: self._conf_ssl_context.client_cert.password, ) self._conn = BlockingConnection( - self._addr, + url=self._addr, + urls=self._addrs, ssl_domain=self._ssl_domain, on_disconnection_handler=self._on_disconnection_handler, ) From 78a49b6acb007b94392d552d204cd98d34557851 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Thu, 30 Jan 2025 13:39:21 +0100 Subject: [PATCH 05/15] multinode implementation --- examples/getting_started/main.py | 2 +- .../getting_started/reconnection_example.py | 17 ++++-- .../qpid/proton/_utils.py | 57 +++++++++++-------- 3 files changed, 45 insertions(+), 31 deletions(-) diff --git a/examples/getting_started/main.py b/examples/getting_started/main.py index fcbb76c..8e69dee 100644 --- a/examples/getting_started/main.py +++ b/examples/getting_started/main.py @@ -59,7 +59,7 @@ def on_link_closed(self, event: Event) -> None: def create_connection() -> Connection: - connection = Connection("amqps://guest:guest@localhost:5672/") + connection = Connection("amqp://guest:guest@localhost:5672/") # in case of SSL enablement # ca_cert_file = ".ci/certs/ca_certificate.pem" # client_cert = ".ci/certs/client_certificate.pem" diff --git a/examples/getting_started/reconnection_example.py b/examples/getting_started/reconnection_example.py index f0dc50a..a6aca92 100644 --- a/examples/getting_started/reconnection_example.py +++ b/examples/getting_started/reconnection_example.py @@ -21,7 +21,9 @@ consumer = None -def on_disconnected(): +# disconnection callback +# here you can cleanup or reconnect +def on_disconnection(): print("disconnected") exchange_name = "test-exchange" @@ -36,7 +38,8 @@ def on_disconnected(): addr = AddressHelper.exchange_address(exchange_name, routing_key) addr_queue = AddressHelper.queue_address(queue_name) - connection = create_connection() + if connection is not None: + connection = create_connection() if management is not None: management = connection.management() if publisher is not None: @@ -93,12 +96,14 @@ def on_link_closed(self, event: Event) -> None: def create_connection() -> Connection: # for multinode specify a list of urls and fill the field urls of Connection instead of url # urls = [ - # "amqp://ha_tls-rabbit_node0-1:5602/", - # "amqp://ha_tls-rabbit_node0-2:5602/", - # "amqp://ha_tls-rabbit_node0-3:5602/", + # "amqp://ha_tls-rabbit_node0-1:5682/", + # "amqp://ha_tls-rabbit_node1-1:5692/", + # "amqp://ha_tls-rabbit_node2-1:5602/", # ] + # connection = Connection(urls=urls, on_disconnection_handler=on_disconnected) connection = Connection( - "amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnected + url="amqp://guest:guest@localhost:5672/", + on_disconnection_handler=on_disconnection, ) connection.dial() diff --git a/rabbitmq_amqp_python_client/qpid/proton/_utils.py b/rabbitmq_amqp_python_client/qpid/proton/_utils.py index 77b7a48..ff0b504 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_utils.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_utils.py @@ -431,37 +431,46 @@ def __init__( on_disconnection_handler: Optional[CB] = None, **kwargs ) -> None: - self.disconnected = False - self.timeout = timeout or 60 - self.container = container or Container() - self.container.timeout = self.timeout - self.container.start() - self.conn = None - self.closing = False - self._on_disconnection_handler = on_disconnection_handler + # Preserve previous behaviour if neither reconnect nor urls are supplied - if url is not None and urls is None and reconnect is None: - reconnect = False - url = Url(url).defaults() - failed = True - try: + if urls is None: + urls = [] + urls.append(url) + + # multinode reimplementation (default one wasn't working properly) + attempt = 0 + for i in range(len(urls)): + attempt = attempt + 1 + self.disconnected = False + self.timeout = timeout or 60 + self.container = container or Container() + self.container.timeout = self.timeout + self.container.start() + self.conn = None + self.closing = False + self._on_disconnection_handler = on_disconnection_handler + + url_it = urls[i] self.conn = self.container.connect( - url=url, + url=Url(url_it).defaults(), handler=self, ssl_domain=ssl_domain, - reconnect=reconnect, + reconnect=False, heartbeat=heartbeat, - urls=urls, + urls=None, **kwargs ) - self.wait( - lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), - msg="Opening connection", - ) - failed = False - finally: - if failed and self.conn: - self.close() + try: + self.wait( + lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), + msg="Opening connection", + ) + + except ConnectionException: + self.conn.close() + if attempt == len(urls): + raise + continue def create_sender( self, From 06daba6b907eb48df245916f7c4c9e5faaa0b3d8 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Mon, 3 Feb 2025 14:30:33 +0100 Subject: [PATCH 06/15] stream offset implementation --- examples/getting_started/main.py | 52 +++++++++++-------- rabbitmq_amqp_python_client/__init__.py | 3 ++ rabbitmq_amqp_python_client/connection.py | 6 ++- rabbitmq_amqp_python_client/consumer.py | 19 +++++-- rabbitmq_amqp_python_client/entities.py | 19 ++++++- rabbitmq_amqp_python_client/options.py | 20 ++++++- .../qpid/proton/_reactor.py | 1 + tests/conftest.py | 3 +- tests/test_publisher.py | 26 ++++++++++ tests/test_publisher_streams.py | 47 +++++++++++++++++ tests/utils.py | 3 ++ 11 files changed, 168 insertions(+), 31 deletions(-) create mode 100644 tests/test_publisher_streams.py diff --git a/examples/getting_started/main.py b/examples/getting_started/main.py index 8e69dee..5187b75 100644 --- a/examples/getting_started/main.py +++ b/examples/getting_started/main.py @@ -10,6 +10,8 @@ ExchangeSpecification, Message, QuorumQueueSpecification, + StreamSpecification, + StreamFilterOptions ) @@ -81,7 +83,7 @@ def main() -> None: exchange_name = "test-exchange" queue_name = "example-queue" routing_key = "routing-key" - messages_to_publish = 100000 + messages_to_publish = 1000 print("connection to amqp server") connection = create_connection() @@ -89,51 +91,55 @@ def main() -> None: management = connection.management() print("declaring exchange and queue") - management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) + #management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) management.declare_queue( - QuorumQueueSpecification(name=queue_name) + StreamSpecification(name=queue_name) # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter") ) print("binding queue to exchange") - bind_name = management.bind( - BindingSpecification( - source_exchange=exchange_name, - destination_queue=queue_name, - binding_key=routing_key, - ) - ) + #bind_name = management.bind( + # BindingSpecification( + # source_exchange=exchange_name, + # destination_queue=queue_name, + # binding_key=routing_key, + # ) + #) - addr = AddressHelper.exchange_address(exchange_name, routing_key) + #addr = AddressHelper.exchange_address(exchange_name, routing_key) addr_queue = AddressHelper.queue_address(queue_name) print("create a publisher and publish a test message") - publisher = connection.publisher(addr) + publisher = connection.publisher(addr_queue) print("purging the queue") - messages_purged = management.purge_queue(queue_name) + #messages_purged = management.purge_queue(queue_name) - print("messages purged: " + str(messages_purged)) + #print("messages purged: " + str(messages_purged)) # management.close() # publish 10 messages for i in range(messages_to_publish): status = publisher.publish(Message(body="test")) - if status.ACCEPTED: - print("message accepted") - elif status.RELEASED: - print("message not routed") - elif status.REJECTED: - print("message not rejected") + #if status.ACCEPTED: + # print("message accepted") + #elif status.RELEASED: + # print("message not routed") + #elif status.REJECTED: + # print("message not rejected") publisher.close() print( "create a consumer and consume the test message - press control + c to terminate to consume" ) - consumer = connection.consumer(addr_queue, handler=MyMessageHandler()) + + stream_filter_options = StreamFilterOptions() + stream_filter_options.offset(0) + + consumer = connection.consumer(addr_queue, handler=MyMessageHandler(), stream_filter_options=stream_filter_options) try: consumer.run() @@ -147,10 +153,10 @@ def main() -> None: # management = connection.management() print("unbind") - management.unbind(bind_name) + #management.unbind(bind_name) print("delete queue") - management.delete_queue(queue_name) + #management.delete_queue(queue_name) print("delete exchange") management.delete_exchange(exchange_name) diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 417f401..fbdf143 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -28,6 +28,8 @@ SslConfigurationContext, ) +from .entities import StreamFilterOptions + try: __version__ = metadata.version(__package__) __license__ = metadata.metadata(__package__)["license"] @@ -61,4 +63,5 @@ "ClientCert", "Delivery", "ConnectionClosed", + "StreamFilterOptions", ] diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index 79bdc3f..77186ae 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -11,6 +11,8 @@ from .qpid.proton.utils import BlockingConnection from .ssl_configuration import SslConfigurationContext +from.entities import StreamFilterOptions + logger = logging.getLogger(__name__) MT = TypeVar("MT") @@ -84,11 +86,11 @@ def publisher(self, destination: str) -> Publisher: return publisher def consumer( - self, destination: str, handler: Optional[MessagingHandler] = None + self, destination: str, handler: Optional[MessagingHandler] = None, stream_filter_options: Optional[StreamFilterOptions] = None ) -> Consumer: if validate_address(destination) is False: raise ArgumentOutOfRangeException( "destination address must start with /queues or /exchanges" ) - consumer = Consumer(self._conn, destination, handler) + consumer = Consumer(self._conn, destination, handler, stream_filter_options) return consumer diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 18b2d71..60a7e8b 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -1,7 +1,7 @@ import logging from typing import Optional -from .options import ReceiverOptionUnsettled +from .options import ReceiverOptionUnsettled, ReceiverOptionUnsettledWithFilters from .qpid.proton._handlers import MessagingHandler from .qpid.proton._message import Message from .qpid.proton.utils import ( @@ -9,6 +9,8 @@ BlockingReceiver, ) +from.entities import StreamFilterOptions + logger = logging.getLogger(__name__) @@ -18,11 +20,13 @@ def __init__( conn: BlockingConnection, addr: str, handler: Optional[MessagingHandler] = None, + stream_options: Optional[StreamFilterOptions] = None ): self._receiver: Optional[BlockingReceiver] = None self._conn = conn self._addr = addr self._handler = handler + self._stream_options = stream_options self._open() def _open(self) -> None: @@ -52,6 +56,15 @@ def stop(self) -> None: def _create_receiver(self, addr: str) -> BlockingReceiver: logger.debug("Creating the receiver") - return self._conn.create_receiver( + if self._stream_options is None: + receiver = self._conn.create_receiver( addr, options=ReceiverOptionUnsettled(addr), handler=self._handler - ) + ) + else: + print("stream option is not None") + receiver = self._conn.create_receiver( + addr, options=ReceiverOptionUnsettledWithFilters(addr, self._stream_options), handler=self._handler + ) + + return receiver + diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index bed44b1..73b46a3 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -1,8 +1,12 @@ from dataclasses import dataclass -from typing import Any, Optional +from typing import Any, Optional, Dict +from .qpid.proton._data import symbol, Described from .common import ExchangeType, QueueType +STREAM_FILTER_SPEC = "rabbitmq:stream-filter" +STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec" +STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered" @dataclass class ExchangeSpecification: @@ -33,3 +37,16 @@ class BindingSpecification: source_exchange: str destination_queue: str binding_key: str + + +class StreamFilterOptions: + + def __init__(self): + self._filter_set: Dict[symbol, Described] = {} + + def offset(self, offset: int): + #self._filter_set[symbol(STREAM_FILTER_SPEC)] = Described(symbol(STREAM_FILTER_SPEC), "first") + self._filter_set[symbol('rabbitmq:stream-offset-spec')] = Described(symbol('rabbitmq:stream-offset-spec'), 0) + + def filters(self) -> Dict[symbol, Described]: + return self._filter_set diff --git a/rabbitmq_amqp_python_client/options.py b/rabbitmq_amqp_python_client/options.py index a2cd9a2..1e9e194 100644 --- a/rabbitmq_amqp_python_client/options.py +++ b/rabbitmq_amqp_python_client/options.py @@ -3,7 +3,8 @@ symbol, ) from .qpid.proton._endpoints import Link # noqa: E402 -from .qpid.proton.reactor import LinkOption # noqa: E402 +from .qpid.proton.reactor import LinkOption, Filter # noqa: E402 +from .entities import StreamFilterOptions class SenderOption(LinkOption): # type: ignore @@ -52,6 +53,23 @@ class ReceiverOptionUnsettled(LinkOption): # type: ignore def __init__(self, addr: str): self._addr = addr + + def apply(self, link: Link) -> None: + link.target.address = self._addr + link.snd_settle_mode = Link.SND_UNSETTLED + link.rcv_settle_mode = Link.RCV_FIRST + link.properties = PropertyDict({symbol("paired"): True}) + link.source.dynamic = False + + def test(self, link: Link) -> bool: + return bool(link.is_receiver) + +class ReceiverOptionUnsettledWithFilters(Filter): # type: ignore + def __init__(self, addr: str, filter_options: StreamFilterOptions): + super().__init__(filter_options.filters()) + self._addr = addr + + def apply(self, link: Link) -> None: link.target.address = self._addr link.snd_settle_mode = Link.SND_UNSETTLED diff --git a/rabbitmq_amqp_python_client/qpid/proton/_reactor.py b/rabbitmq_amqp_python_client/qpid/proton/_reactor.py index 89b2e7c..8065618 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_reactor.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_reactor.py @@ -814,6 +814,7 @@ class Filter(ReceiverOption): """ def __init__(self, filter_set: Dict[symbol, Described] = {}) -> None: + print("filterset: " + str(filter_set)) self.filter_set = filter_set def apply(self, receiver: "Receiver") -> None: diff --git a/tests/conftest.py b/tests/conftest.py index 005bf6d..34eb718 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -86,9 +86,10 @@ def __init__(self): self._received = 0 def on_message(self, event: Event): + print("received message: " + str(event.message.body)) self.delivery_context.accept(event) self._received = self._received + 1 - if self._received == 1000: + if self._received == 10: event.connection.close() raise ConsumerTestException("consumed") diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 9cc208e..c4f1e86 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -9,6 +9,7 @@ ExchangeSpecification, Message, QuorumQueueSpecification, + StreamSpecification, ) from .http_requests import delete_all_connections @@ -238,3 +239,28 @@ def on_disconnected(): assert disconnected is True assert reconnected is True assert message_purged == messages_to_publish - 1 + + +def test_queue_info_for_stream_with_validations(connection: Connection) -> None: + + stream_name = "test_stream_info_with_validation" + messages_to_send = 200 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + print("before creating publisher") + + publisher = connection.publisher("/queues/" + stream_name) + + print("after creating publisher") + + for i in range(messages_to_send): + + publisher.publish(Message(body="test")) + + + diff --git a/tests/test_publisher_streams.py b/tests/test_publisher_streams.py new file mode 100644 index 0000000..586e363 --- /dev/null +++ b/tests/test_publisher_streams.py @@ -0,0 +1,47 @@ +from rabbitmq_amqp_python_client import ( + StreamSpecification, + QueueType, + Management, + AddressHelper, + StreamFilterOptions, + Connection, +) + +from .utils import publish_messages +from .conftest import MyMessageHandlerAccept, ConsumerTestException + + +def test_queue_info_for_stream_with_validations(connection: Connection) -> None: + + stream_name = "test_stream_info_with_validation" + messages_to_send = 200 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + publish_messages(connection, messages_to_send, stream_name) + + addr_queue = AddressHelper.queue_address(stream_name) + + stream_filter_options = StreamFilterOptions() + stream_filter_options.offset(0) + + consumer = connection.consumer(addr_queue, handler=MyMessageHandlerAccept()) + + try: + print("running") + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + consumer.close() + + management.delete_queue(stream_name) + + #assert stream_info.name == stream_name + #assert stream_info.queue_type == QueueType.stream + #assert stream_info.message_count == 0 \ No newline at end of file diff --git a/tests/utils.py b/tests/utils.py index b9b3b29..ec20f41 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -17,9 +17,12 @@ def create_connection() -> Connection: def publish_messages(connection: Connection, messages_to_send: int, queue_name) -> None: + print("before creating publisher") publisher = connection.publisher("/queues/" + queue_name) + print("after creating publisher") # publish messages_to_send messages for i in range(messages_to_send): + print("sending message") publisher.publish(Message(body="test" + str(i))) publisher.close() From ae8dfef7db2d8e4c53dd1636435776b5fd749b8b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 3 Feb 2025 15:13:44 +0100 Subject: [PATCH 07/15] add credits (#27) Signed-off-by: Gabriele Santomaggio --- examples/getting_started/main.py | 71 +++++++++++++------------ rabbitmq_amqp_python_client/consumer.py | 2 + 2 files changed, 39 insertions(+), 34 deletions(-) diff --git a/examples/getting_started/main.py b/examples/getting_started/main.py index 5187b75..9706446 100644 --- a/examples/getting_started/main.py +++ b/examples/getting_started/main.py @@ -1,5 +1,5 @@ # type: ignore - +import threading from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert, AddressHelper, @@ -78,8 +78,16 @@ def create_connection() -> Connection: return connection -def main() -> None: +def threaded_function(addr_queue): + connection = create_connection() + consumer = connection.consumer(addr_queue, handler=MyMessageHandler()) + try: + consumer.run() + except KeyboardInterrupt: + pass + +def main() -> None: exchange_name = "test-exchange" queue_name = "example-queue" routing_key = "routing-key" @@ -91,7 +99,7 @@ def main() -> None: management = connection.management() print("declaring exchange and queue") - #management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) + # management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) management.declare_queue( StreamSpecification(name=queue_name) @@ -99,64 +107,59 @@ def main() -> None: ) print("binding queue to exchange") - #bind_name = management.bind( + # bind_name = management.bind( # BindingSpecification( # source_exchange=exchange_name, # destination_queue=queue_name, # binding_key=routing_key, # ) - #) + # ) - #addr = AddressHelper.exchange_address(exchange_name, routing_key) + # addr = AddressHelper.exchange_address(exchange_name, routing_key) addr_queue = AddressHelper.queue_address(queue_name) - print("create a publisher and publish a test message") + thread = threading.Thread(target=threaded_function, args=(addr_queue,)) + thread.start() + ## press control + c to terminate the consumer + + # print("create a publisher and publish a test message") publisher = connection.publisher(addr_queue) - print("purging the queue") - #messages_purged = management.purge_queue(queue_name) + # print("purging the queue") + # messages_purged = management.purge_queue(queue_name) - #print("messages purged: " + str(messages_purged)) + # print("messages purged: " + str(messages_purged)) # management.close() # publish 10 messages for i in range(messages_to_publish): status = publisher.publish(Message(body="test")) - #if status.ACCEPTED: - # print("message accepted") - #elif status.RELEASED: - # print("message not routed") - #elif status.REJECTED: - # print("message not rejected") - + # # if status.ACCEPTED: + # # print("message accepted") + # # elif status.RELEASED: + # # print("message not routed") + # # elif status.REJECTED: + # # print("message not rejected") + # publisher.close() + # + # print( + # "create a consumer and consume the test message - press control + c to terminate to consume" + # ) - print( - "create a consumer and consume the test message - press control + c to terminate to consume" - ) - - stream_filter_options = StreamFilterOptions() - stream_filter_options.offset(0) - - consumer = connection.consumer(addr_queue, handler=MyMessageHandler(), stream_filter_options=stream_filter_options) - - try: - consumer.run() - except KeyboardInterrupt: - pass - + input("Press Enter to continue...") print("cleanup") - consumer.close() + # consumer.close() # once we finish consuming if we close the connection we need to create a new one # connection = create_connection() # management = connection.management() print("unbind") - #management.unbind(bind_name) + # management.unbind(bind_name) print("delete queue") - #management.delete_queue(queue_name) + # management.delete_queue(queue_name) print("delete exchange") management.delete_exchange(exchange_name) diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 60a7e8b..18ede92 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -60,11 +60,13 @@ def _create_receiver(self, addr: str) -> BlockingReceiver: receiver = self._conn.create_receiver( addr, options=ReceiverOptionUnsettled(addr), handler=self._handler ) + receiver.credit = 1 else: print("stream option is not None") receiver = self._conn.create_receiver( addr, options=ReceiverOptionUnsettledWithFilters(addr, self._stream_options), handler=self._handler ) + receiver.credit = 1 return receiver From 11a49ca175976c706dec9baa7e6de01ed12fbbdf Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Tue, 4 Feb 2025 09:31:38 +0100 Subject: [PATCH 08/15] test updated --- examples/getting_started/main.py | 16 +++++++++------- rabbitmq_amqp_python_client/entities.py | 3 ++- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/examples/getting_started/main.py b/examples/getting_started/main.py index 9706446..3d22fc0 100644 --- a/examples/getting_started/main.py +++ b/examples/getting_started/main.py @@ -80,6 +80,8 @@ def create_connection() -> Connection: def threaded_function(addr_queue): connection = create_connection() + offset_specification = StreamFilterOptions() + offset_specification.offset(10) consumer = connection.consumer(addr_queue, handler=MyMessageHandler()) try: consumer.run() @@ -101,10 +103,10 @@ def main() -> None: print("declaring exchange and queue") # management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) - management.declare_queue( - StreamSpecification(name=queue_name) + #management.declare_queue( + # StreamSpecification(name=queue_name) # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter") - ) + #) print("binding queue to exchange") # bind_name = management.bind( @@ -124,7 +126,7 @@ def main() -> None: ## press control + c to terminate the consumer # print("create a publisher and publish a test message") - publisher = connection.publisher(addr_queue) + #publisher = connection.publisher(addr_queue) # print("purging the queue") # messages_purged = management.purge_queue(queue_name) @@ -133,8 +135,8 @@ def main() -> None: # management.close() # publish 10 messages - for i in range(messages_to_publish): - status = publisher.publish(Message(body="test")) + #for i in range(messages_to_publish): + # status = publisher.publish(Message(body="test")) # # if status.ACCEPTED: # # print("message accepted") # # elif status.RELEASED: @@ -142,7 +144,7 @@ def main() -> None: # # elif status.REJECTED: # # print("message not rejected") # - publisher.close() + #publisher.close() # # print( # "create a consumer and consume the test message - press control + c to terminate to consume" diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 73b46a3..37b084f 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -46,7 +46,8 @@ def __init__(self): def offset(self, offset: int): #self._filter_set[symbol(STREAM_FILTER_SPEC)] = Described(symbol(STREAM_FILTER_SPEC), "first") - self._filter_set[symbol('rabbitmq:stream-offset-spec')] = Described(symbol('rabbitmq:stream-offset-spec'), 0) + print("im here") + self._filter_set[symbol('rabbitmq:stream-offset-spec')] = Described(symbol('rabbitmq:stream-offset-spec'), "first") def filters(self) -> Dict[symbol, Described]: return self._filter_set From 8931dd50b9009de29fe32cc16688c118beb6f890 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 4 Feb 2025 11:53:46 +0100 Subject: [PATCH 09/15] add filters (#29) Signed-off-by: Gabriele Santomaggio --- examples/getting_started/main.py | 2 +- rabbitmq_amqp_python_client/entities.py | 2 -- rabbitmq_amqp_python_client/options.py | 8 ++------ 3 files changed, 3 insertions(+), 9 deletions(-) diff --git a/examples/getting_started/main.py b/examples/getting_started/main.py index 3d22fc0..7cd5033 100644 --- a/examples/getting_started/main.py +++ b/examples/getting_started/main.py @@ -82,7 +82,7 @@ def threaded_function(addr_queue): connection = create_connection() offset_specification = StreamFilterOptions() offset_specification.offset(10) - consumer = connection.consumer(addr_queue, handler=MyMessageHandler()) + consumer = connection.consumer(addr_queue, handler=MyMessageHandler(), stream_filter_options=offset_specification) try: consumer.run() except KeyboardInterrupt: diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 37b084f..815c9fd 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -45,8 +45,6 @@ def __init__(self): self._filter_set: Dict[symbol, Described] = {} def offset(self, offset: int): - #self._filter_set[symbol(STREAM_FILTER_SPEC)] = Described(symbol(STREAM_FILTER_SPEC), "first") - print("im here") self._filter_set[symbol('rabbitmq:stream-offset-spec')] = Described(symbol('rabbitmq:stream-offset-spec'), "first") def filters(self) -> Dict[symbol, Described]: diff --git a/rabbitmq_amqp_python_client/options.py b/rabbitmq_amqp_python_client/options.py index 1e9e194..c05f85a 100644 --- a/rabbitmq_amqp_python_client/options.py +++ b/rabbitmq_amqp_python_client/options.py @@ -1,6 +1,6 @@ from .qpid.proton._data import ( # noqa: E402 PropertyDict, - symbol, + symbol, Described, ) from .qpid.proton._endpoints import Link # noqa: E402 from .qpid.proton.reactor import LinkOption, Filter # noqa: E402 @@ -71,11 +71,7 @@ def __init__(self, addr: str, filter_options: StreamFilterOptions): def apply(self, link: Link) -> None: - link.target.address = self._addr - link.snd_settle_mode = Link.SND_UNSETTLED - link.rcv_settle_mode = Link.RCV_FIRST - link.properties = PropertyDict({symbol("paired"): True}) - link.source.dynamic = False + link.source.filter.put_dict(self.filter_set) def test(self, link: Link) -> bool: return bool(link.is_receiver) From 7a04eaf2ebeab2f66472ab1134d41d010428ec89 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Tue, 4 Feb 2025 14:05:02 +0100 Subject: [PATCH 10/15] refactoring and adding tests --- .../getting_started/example_with_streams.py | 131 ++++++++++++ examples/getting_started/main.py | 97 ++++----- rabbitmq_amqp_python_client/__init__.py | 5 +- rabbitmq_amqp_python_client/connection.py | 8 +- rabbitmq_amqp_python_client/consumer.py | 18 +- rabbitmq_amqp_python_client/entities.py | 37 +++- rabbitmq_amqp_python_client/options.py | 14 +- .../qpid/proton/_reactor.py | 1 - tests/conftest.py | 22 +- tests/test_publisher.py | 3 - tests/test_publisher_streams.py | 47 ----- tests/test_streams.py | 195 ++++++++++++++++++ tests/utils.py | 3 - 13 files changed, 447 insertions(+), 134 deletions(-) create mode 100644 examples/getting_started/example_with_streams.py delete mode 100644 tests/test_publisher_streams.py create mode 100644 tests/test_streams.py diff --git a/examples/getting_started/example_with_streams.py b/examples/getting_started/example_with_streams.py new file mode 100644 index 0000000..4a850cc --- /dev/null +++ b/examples/getting_started/example_with_streams.py @@ -0,0 +1,131 @@ +# type: ignore + +from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert, + AddressHelper, + AMQPMessagingHandler, + Connection, + Event, + Message, + OffsetSpecification, + StreamFilterOptions, + StreamSpecification, +) + + +class MyMessageHandler(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._count = 0 + + def on_message(self, event: Event): + print("received message: " + str(event.message.body)) + + # accepting + self.delivery_context.accept(event) + + # in case of rejection (+eventually deadlettering) + # self.delivery_context.discard(event) + + # in case of requeuing + # self.delivery_context.requeue(event) + + # annotations = {} + # annotations[symbol('x-opt-string')] = 'x-test1' + # in case of requeuing with annotations added + # self.delivery_context.requeue_with_annotations(event, annotations) + + # in case of rejection with annotations added + # self.delivery_context.discard_with_annotations(event) + + print("count " + str(self._count)) + + self._count = self._count + 1 + + if self._count == 100: + print("closing receiver") + # if you want you can add cleanup operations here + # event.receiver.close() + # event.connection.close() + + def on_connection_closed(self, event: Event): + # if you want you can add cleanup operations here + print("connection closed") + + def on_link_closed(self, event: Event) -> None: + # if you want you can add cleanup operations here + print("link closed") + + +def create_connection() -> Connection: + connection = Connection("amqp://guest:guest@localhost:5672/") + # in case of SSL enablement + # ca_cert_file = ".ci/certs/ca_certificate.pem" + # client_cert = ".ci/certs/client_certificate.pem" + # client_key = ".ci/certs/client_key.pem" + # connection = Connection( + # "amqps://guest:guest@localhost:5671/", + # ssl_context=SslConfigurationContext( + # ca_cert=ca_cert_file, + # client_cert=ClientCert(client_cert=client_cert, client_key=client_key), + # ), + # ) + connection.dial() + + return connection + + +def main() -> None: + queue_name = "example-queue" + messages_to_publish = 100 + + print("connection to amqp server") + connection = create_connection() + + management = connection.management() + + management.declare_queue(StreamSpecification(name=queue_name)) + + addr_queue = AddressHelper.queue_address(queue_name) + + consumer_connection = create_connection() + + stream_filter_options = StreamFilterOptions() + # can be first, last, next or an offset long + stream_filter_options.offset(OffsetSpecification.first) + + consumer = consumer_connection.consumer( + addr_queue, + handler=MyMessageHandler(), + stream_filter_options=stream_filter_options, + ) + print( + "create a consumer and consume the test message - press control + c to terminate to consume" + ) + + # print("create a publisher and publish a test message") + publisher = connection.publisher(addr_queue) + + for i in range(messages_to_publish): + publisher.publish(Message(body="test: " + str(i))) + + publisher.close() + + try: + consumer.run() + except KeyboardInterrupt: + pass + + # + print("delete queue") + # management.delete_queue(queue_name) + + print("closing connections") + management.close() + print("after management closing") + connection.close() + print("after connection closing") + + +if __name__ == "__main__": + main() diff --git a/examples/getting_started/main.py b/examples/getting_started/main.py index 7cd5033..8e69dee 100644 --- a/examples/getting_started/main.py +++ b/examples/getting_started/main.py @@ -1,5 +1,5 @@ # type: ignore -import threading + from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert, AddressHelper, @@ -10,8 +10,6 @@ ExchangeSpecification, Message, QuorumQueueSpecification, - StreamSpecification, - StreamFilterOptions ) @@ -78,22 +76,12 @@ def create_connection() -> Connection: return connection -def threaded_function(addr_queue): - connection = create_connection() - offset_specification = StreamFilterOptions() - offset_specification.offset(10) - consumer = connection.consumer(addr_queue, handler=MyMessageHandler(), stream_filter_options=offset_specification) - try: - consumer.run() - except KeyboardInterrupt: - pass - - def main() -> None: + exchange_name = "test-exchange" queue_name = "example-queue" routing_key = "routing-key" - messages_to_publish = 1000 + messages_to_publish = 100000 print("connection to amqp server") connection = create_connection() @@ -101,67 +89,68 @@ def main() -> None: management = connection.management() print("declaring exchange and queue") - # management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) + management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) - #management.declare_queue( - # StreamSpecification(name=queue_name) + management.declare_queue( + QuorumQueueSpecification(name=queue_name) # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter") - #) + ) print("binding queue to exchange") - # bind_name = management.bind( - # BindingSpecification( - # source_exchange=exchange_name, - # destination_queue=queue_name, - # binding_key=routing_key, - # ) - # ) + bind_name = management.bind( + BindingSpecification( + source_exchange=exchange_name, + destination_queue=queue_name, + binding_key=routing_key, + ) + ) - # addr = AddressHelper.exchange_address(exchange_name, routing_key) + addr = AddressHelper.exchange_address(exchange_name, routing_key) addr_queue = AddressHelper.queue_address(queue_name) - thread = threading.Thread(target=threaded_function, args=(addr_queue,)) - thread.start() - ## press control + c to terminate the consumer - - # print("create a publisher and publish a test message") - #publisher = connection.publisher(addr_queue) + print("create a publisher and publish a test message") + publisher = connection.publisher(addr) - # print("purging the queue") - # messages_purged = management.purge_queue(queue_name) + print("purging the queue") + messages_purged = management.purge_queue(queue_name) - # print("messages purged: " + str(messages_purged)) + print("messages purged: " + str(messages_purged)) # management.close() # publish 10 messages - #for i in range(messages_to_publish): - # status = publisher.publish(Message(body="test")) - # # if status.ACCEPTED: - # # print("message accepted") - # # elif status.RELEASED: - # # print("message not routed") - # # elif status.REJECTED: - # # print("message not rejected") - # - #publisher.close() - # - # print( - # "create a consumer and consume the test message - press control + c to terminate to consume" - # ) + for i in range(messages_to_publish): + status = publisher.publish(Message(body="test")) + if status.ACCEPTED: + print("message accepted") + elif status.RELEASED: + print("message not routed") + elif status.REJECTED: + print("message not rejected") + + publisher.close() + + print( + "create a consumer and consume the test message - press control + c to terminate to consume" + ) + consumer = connection.consumer(addr_queue, handler=MyMessageHandler()) + + try: + consumer.run() + except KeyboardInterrupt: + pass - input("Press Enter to continue...") print("cleanup") - # consumer.close() + consumer.close() # once we finish consuming if we close the connection we need to create a new one # connection = create_connection() # management = connection.management() print("unbind") - # management.unbind(bind_name) + management.unbind(bind_name) print("delete queue") - # management.delete_queue(queue_name) + management.delete_queue(queue_name) print("delete exchange") management.delete_exchange(exchange_name) diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index fbdf143..260f380 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -8,6 +8,8 @@ from .entities import ( BindingSpecification, ExchangeSpecification, + OffsetSpecification, + StreamFilterOptions, ) from .exceptions import ArgumentOutOfRangeException from .management import Management @@ -28,8 +30,6 @@ SslConfigurationContext, ) -from .entities import StreamFilterOptions - try: __version__ = metadata.version(__package__) __license__ = metadata.metadata(__package__)["license"] @@ -64,4 +64,5 @@ "Delivery", "ConnectionClosed", "StreamFilterOptions", + "OffsetSpecification", ] diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index 77186ae..f3a6c87 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -3,6 +3,7 @@ from .address_helper import validate_address from .consumer import Consumer +from .entities import StreamFilterOptions from .exceptions import ArgumentOutOfRangeException from .management import Management from .publisher import Publisher @@ -11,8 +12,6 @@ from .qpid.proton.utils import BlockingConnection from .ssl_configuration import SslConfigurationContext -from.entities import StreamFilterOptions - logger = logging.getLogger(__name__) MT = TypeVar("MT") @@ -86,7 +85,10 @@ def publisher(self, destination: str) -> Publisher: return publisher def consumer( - self, destination: str, handler: Optional[MessagingHandler] = None, stream_filter_options: Optional[StreamFilterOptions] = None + self, + destination: str, + handler: Optional[MessagingHandler] = None, + stream_filter_options: Optional[StreamFilterOptions] = None, ) -> Consumer: if validate_address(destination) is False: raise ArgumentOutOfRangeException( diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 18ede92..bb96660 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -1,7 +1,11 @@ import logging from typing import Optional -from .options import ReceiverOptionUnsettled, ReceiverOptionUnsettledWithFilters +from .entities import StreamFilterOptions +from .options import ( + ReceiverOptionUnsettled, + ReceiverOptionUnsettledWithFilters, +) from .qpid.proton._handlers import MessagingHandler from .qpid.proton._message import Message from .qpid.proton.utils import ( @@ -9,8 +13,6 @@ BlockingReceiver, ) -from.entities import StreamFilterOptions - logger = logging.getLogger(__name__) @@ -20,7 +22,7 @@ def __init__( conn: BlockingConnection, addr: str, handler: Optional[MessagingHandler] = None, - stream_options: Optional[StreamFilterOptions] = None + stream_options: Optional[StreamFilterOptions] = None, ): self._receiver: Optional[BlockingReceiver] = None self._conn = conn @@ -58,15 +60,15 @@ def _create_receiver(self, addr: str) -> BlockingReceiver: logger.debug("Creating the receiver") if self._stream_options is None: receiver = self._conn.create_receiver( - addr, options=ReceiverOptionUnsettled(addr), handler=self._handler + addr, options=ReceiverOptionUnsettled(addr), handler=self._handler ) receiver.credit = 1 else: - print("stream option is not None") receiver = self._conn.create_receiver( - addr, options=ReceiverOptionUnsettledWithFilters(addr, self._stream_options), handler=self._handler + addr, + options=ReceiverOptionUnsettledWithFilters(addr, self._stream_options), + handler=self._handler, ) receiver.credit = 1 return receiver - diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 815c9fd..59954d7 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -1,13 +1,15 @@ from dataclasses import dataclass -from typing import Any, Optional, Dict +from enum import Enum +from typing import Any, Dict, Optional, Union -from .qpid.proton._data import symbol, Described from .common import ExchangeType, QueueType +from .qpid.proton._data import Described, symbol STREAM_FILTER_SPEC = "rabbitmq:stream-filter" STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec" STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered" + @dataclass class ExchangeSpecification: name: str @@ -32,6 +34,12 @@ class QueueInfo: consumer_count: int = 0 +class OffsetSpecification(Enum): + first = ("first",) + next = ("next",) + last = ("last",) + + @dataclass class BindingSpecification: source_exchange: str @@ -41,11 +49,28 @@ class BindingSpecification: class StreamFilterOptions: - def __init__(self): + def __init__(self): # type: ignore self._filter_set: Dict[symbol, Described] = {} - def offset(self, offset: int): - self._filter_set[symbol('rabbitmq:stream-offset-spec')] = Described(symbol('rabbitmq:stream-offset-spec'), "first") + def offset(self, offset_spefication: Union[OffsetSpecification, int]) -> None: + if isinstance(offset_spefication, int): + self._filter_set[symbol(STREAM_OFFSET_SPEC)] = Described( + symbol(STREAM_OFFSET_SPEC), offset_spefication + ) + else: + self._filter_set[symbol(STREAM_OFFSET_SPEC)] = Described( + symbol(STREAM_OFFSET_SPEC), offset_spefication.name + ) + + def apply_filters(self, filters: list[str]) -> None: + self._filter_set[symbol(STREAM_FILTER_SPEC)] = Described( + symbol(STREAM_FILTER_SPEC), filters + ) + + def filter_match_unfiltered(self, filter_match_unfiltered: bool) -> None: + self._filter_set[symbol(STREAM_FILTER_MATCH_UNFILTERED)] = Described( + symbol(STREAM_FILTER_MATCH_UNFILTERED), filter_match_unfiltered + ) - def filters(self) -> Dict[symbol, Described]: + def filter_set(self) -> Dict[symbol, Described]: return self._filter_set diff --git a/rabbitmq_amqp_python_client/options.py b/rabbitmq_amqp_python_client/options.py index c05f85a..5c62ed8 100644 --- a/rabbitmq_amqp_python_client/options.py +++ b/rabbitmq_amqp_python_client/options.py @@ -1,10 +1,13 @@ +from .entities import StreamFilterOptions from .qpid.proton._data import ( # noqa: E402 PropertyDict, - symbol, Described, + symbol, ) from .qpid.proton._endpoints import Link # noqa: E402 -from .qpid.proton.reactor import LinkOption, Filter # noqa: E402 -from .entities import StreamFilterOptions +from .qpid.proton.reactor import ( # noqa: E402 + Filter, + LinkOption, +) class SenderOption(LinkOption): # type: ignore @@ -53,7 +56,6 @@ class ReceiverOptionUnsettled(LinkOption): # type: ignore def __init__(self, addr: str): self._addr = addr - def apply(self, link: Link) -> None: link.target.address = self._addr link.snd_settle_mode = Link.SND_UNSETTLED @@ -64,12 +66,12 @@ def apply(self, link: Link) -> None: def test(self, link: Link) -> bool: return bool(link.is_receiver) + class ReceiverOptionUnsettledWithFilters(Filter): # type: ignore def __init__(self, addr: str, filter_options: StreamFilterOptions): - super().__init__(filter_options.filters()) + super().__init__(filter_options.filter_set()) self._addr = addr - def apply(self, link: Link) -> None: link.source.filter.put_dict(self.filter_set) diff --git a/rabbitmq_amqp_python_client/qpid/proton/_reactor.py b/rabbitmq_amqp_python_client/qpid/proton/_reactor.py index 8065618..89b2e7c 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_reactor.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_reactor.py @@ -814,7 +814,6 @@ class Filter(ReceiverOption): """ def __init__(self, filter_set: Dict[symbol, Described] = {}) -> None: - print("filterset: " + str(filter_set)) self.filter_set = filter_set def apply(self, receiver: "Receiver") -> None: diff --git a/tests/conftest.py b/tests/conftest.py index 34eb718..edb33fd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,5 @@ +from typing import Optional + import pytest from rabbitmq_amqp_python_client import ( @@ -86,7 +88,25 @@ def __init__(self): self._received = 0 def on_message(self, event: Event): - print("received message: " + str(event.message.body)) + # print("received message: " + str(event.message.body)) + self.delivery_context.accept(event) + self._received = self._received + 1 + if self._received == 1000: + event.connection.close() + raise ConsumerTestException("consumed") + + +class MyMessageHandlerAcceptStreamOffset(AMQPMessagingHandler): + + def __init__(self, starting_offset: Optional[int] = None): + super().__init__() + self._received = 0 + self._starting_offset = starting_offset + + def on_message(self, event: Event): + if self._starting_offset is not None: + assert event.message.annotations["x-stream-offset"] == self._starting_offset + self._starting_offset = self._starting_offset + 1 self.delivery_context.accept(event) self._received = self._received + 1 if self._received == 10: diff --git a/tests/test_publisher.py b/tests/test_publisher.py index c4f1e86..f707b66 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -261,6 +261,3 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None: for i in range(messages_to_send): publisher.publish(Message(body="test")) - - - diff --git a/tests/test_publisher_streams.py b/tests/test_publisher_streams.py deleted file mode 100644 index 586e363..0000000 --- a/tests/test_publisher_streams.py +++ /dev/null @@ -1,47 +0,0 @@ -from rabbitmq_amqp_python_client import ( - StreamSpecification, - QueueType, - Management, - AddressHelper, - StreamFilterOptions, - Connection, -) - -from .utils import publish_messages -from .conftest import MyMessageHandlerAccept, ConsumerTestException - - -def test_queue_info_for_stream_with_validations(connection: Connection) -> None: - - stream_name = "test_stream_info_with_validation" - messages_to_send = 200 - - queue_specification = StreamSpecification( - name=stream_name, - ) - management = connection.management() - management.declare_queue(queue_specification) - - publish_messages(connection, messages_to_send, stream_name) - - addr_queue = AddressHelper.queue_address(stream_name) - - stream_filter_options = StreamFilterOptions() - stream_filter_options.offset(0) - - consumer = connection.consumer(addr_queue, handler=MyMessageHandlerAccept()) - - try: - print("running") - consumer.run() - # ack to terminate the consumer - except ConsumerTestException: - pass - - consumer.close() - - management.delete_queue(stream_name) - - #assert stream_info.name == stream_name - #assert stream_info.queue_type == QueueType.stream - #assert stream_info.message_count == 0 \ No newline at end of file diff --git a/tests/test_streams.py b/tests/test_streams.py new file mode 100644 index 0000000..e1cd586 --- /dev/null +++ b/tests/test_streams.py @@ -0,0 +1,195 @@ +from rabbitmq_amqp_python_client import ( + AddressHelper, + Connection, + OffsetSpecification, + StreamFilterOptions, + StreamSpecification, +) + +from .conftest import ( + ConsumerTestException, + MyMessageHandlerAcceptStreamOffset, +) +from .utils import create_connection, publish_messages + + +def test_stream_read_from_last_default(connection: Connection) -> None: + + consumer = None + stream_name = "test_stream_info_with_validation" + messages_to_send = 10 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + # consume and then publish + try: + connection_consumer = create_connection() + consumer = connection_consumer.consumer( + addr_queue, handler=MyMessageHandlerAcceptStreamOffset() + ) + publish_messages(connection, messages_to_send, stream_name) + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + consumer.close() + + management.delete_queue(stream_name) + + +def test_stream_read_from_last(connection: Connection) -> None: + + consumer = None + stream_name = "test_stream_info_with_validation" + messages_to_send = 10 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + stream_filter_options = StreamFilterOptions() + stream_filter_options.offset(OffsetSpecification.last) + + # consume and then publish + try: + connection_consumer = create_connection() + consumer = connection_consumer.consumer( + addr_queue, + handler=MyMessageHandlerAcceptStreamOffset(), + stream_filter_options=stream_filter_options, + ) + publish_messages(connection, messages_to_send, stream_name) + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + consumer.close() + + management.delete_queue(stream_name) + + +def test_stream_read_from_offset_zero(connection: Connection) -> None: + + consumer = None + stream_name = "test_stream_info_with_validation" + messages_to_send = 10 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + # publish and then consume + publish_messages(connection, messages_to_send, stream_name) + + stream_filter_options = StreamFilterOptions() + stream_filter_options.offset(0) + + try: + connection_consumer = create_connection() + consumer = connection_consumer.consumer( + addr_queue, + handler=MyMessageHandlerAcceptStreamOffset(0), + stream_filter_options=stream_filter_options, + ) + + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + consumer.close() + + management.delete_queue(stream_name) + + +def test_stream_read_from_offset_first(connection: Connection) -> None: + + consumer = None + stream_name = "test_stream_info_with_validation" + messages_to_send = 10 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + # publish and then consume + publish_messages(connection, messages_to_send, stream_name) + + stream_filter_options = StreamFilterOptions() + stream_filter_options.offset(OffsetSpecification.first) + + try: + connection_consumer = create_connection() + consumer = connection_consumer.consumer( + addr_queue, + handler=MyMessageHandlerAcceptStreamOffset(0), + stream_filter_options=stream_filter_options, + ) + + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + consumer.close() + + management.delete_queue(stream_name) + + +def test_stream_read_from_offset_ten(connection: Connection) -> None: + + consumer = None + stream_name = "test_stream_info_with_validation" + messages_to_send = 20 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + # publish and then consume + publish_messages(connection, messages_to_send, stream_name) + + stream_filter_options = StreamFilterOptions() + stream_filter_options.offset(10) + + try: + connection_consumer = create_connection() + consumer = connection_consumer.consumer( + addr_queue, + handler=MyMessageHandlerAcceptStreamOffset(10), + stream_filter_options=stream_filter_options, + ) + + consumer.run() + # ack to terminate the consumer + # this will finish after 10 messages read + except ConsumerTestException: + pass + + consumer.close() + + management.delete_queue(stream_name) diff --git a/tests/utils.py b/tests/utils.py index ec20f41..b9b3b29 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -17,12 +17,9 @@ def create_connection() -> Connection: def publish_messages(connection: Connection, messages_to_send: int, queue_name) -> None: - print("before creating publisher") publisher = connection.publisher("/queues/" + queue_name) - print("after creating publisher") # publish messages_to_send messages for i in range(messages_to_send): - print("sending message") publisher.publish(Message(body="test" + str(i))) publisher.close() From f4e0e626cfc7a11ebf9cf5f33dee0c0e89547587 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Tue, 4 Feb 2025 16:28:58 +0100 Subject: [PATCH 11/15] implementing and testing filterings --- rabbitmq_amqp_python_client/consumer.py | 6 +- tests/test_streams.py | 73 +++++++++++++++++++++++++ tests/utils.py | 16 +++++- 3 files changed, 90 insertions(+), 5 deletions(-) diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index bb96660..dce7a1b 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -1,5 +1,5 @@ import logging -from typing import Optional +from typing import Literal, Optional, Union from .entities import StreamFilterOptions from .options import ( @@ -36,9 +36,9 @@ def _open(self) -> None: logger.debug("Creating Sender") self._receiver = self._create_receiver(self._addr) - def consume(self) -> Message: + def consume(self, timeout: Union[None, Literal[False], float] = False) -> Message: if self._receiver is not None: - return self._receiver.receive() + return self._receiver.receive(timeout=timeout) def close(self) -> None: logger.debug("Closing the receiver") diff --git a/tests/test_streams.py b/tests/test_streams.py index e1cd586..0e4e369 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -193,3 +193,76 @@ def test_stream_read_from_offset_ten(connection: Connection) -> None: consumer.close() management.delete_queue(stream_name) + + +def test_stream_filtering(connection: Connection) -> None: + + consumer = None + stream_name = "test_stream_info_with_filtering" + messages_to_send = 10 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + # consume and then publish + try: + stream_filter_options = StreamFilterOptions() + stream_filter_options.apply_filters(["banana"]) + connection_consumer = create_connection() + consumer = connection_consumer.consumer( + addr_queue, + handler=MyMessageHandlerAcceptStreamOffset(), + stream_filter_options=stream_filter_options, + ) + # send with annotations filter banana + publish_messages(connection, messages_to_send, stream_name, ["banana"]) + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + consumer.close() + + management.delete_queue(stream_name) + + +def test_stream_filtering_not_present(connection: Connection) -> None: + + raised = False + stream_name = "test_stream_info_with_filtering" + messages_to_send = 10 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + # consume and then publish + stream_filter_options = StreamFilterOptions() + stream_filter_options.apply_filters(["apple"]) + connection_consumer = create_connection() + consumer = connection_consumer.consumer( + addr_queue, stream_filter_options=stream_filter_options + ) + # send with annotations filter banana + publish_messages(connection, messages_to_send, stream_name, ["banana"]) + + try: + consumer.consume(timeout=1) + except Exception: + # valid no message should arrive with filter banana so a timeout exception is raised + raised = True + + consumer.close() + + management.delete_queue(stream_name) + + assert raised is True diff --git a/tests/utils.py b/tests/utils.py index b9b3b29..46c8de2 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,3 +1,5 @@ +from typing import Optional + from rabbitmq_amqp_python_client import ( BindingSpecification, Connection, @@ -16,11 +18,21 @@ def create_connection() -> Connection: return connection_consumer -def publish_messages(connection: Connection, messages_to_send: int, queue_name) -> None: +def publish_messages( + connection: Connection, + messages_to_send: int, + queue_name, + filters: Optional[list[str]] = None, +) -> None: + annotations = {} + if filters is not None: + for filter in filters: + annotations = {"x-stream-filter-value": filter} + publisher = connection.publisher("/queues/" + queue_name) # publish messages_to_send messages for i in range(messages_to_send): - publisher.publish(Message(body="test" + str(i))) + publisher.publish(Message(body="test" + str(i), annotations=annotations)) publisher.close() From 756f534f88dcf81aafdfffee67d51c5ba364d54c Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Wed, 5 Feb 2025 09:24:04 +0100 Subject: [PATCH 12/15] adding match unfiltered test --- .../{main.py => basic-example.py} | 0 .../getting_started/example_with_streams.py | 10 ++++- tests/test_streams.py | 37 +++++++++++++++++++ 3 files changed, 45 insertions(+), 2 deletions(-) rename examples/getting_started/{main.py => basic-example.py} (100%) diff --git a/examples/getting_started/main.py b/examples/getting_started/basic-example.py similarity index 100% rename from examples/getting_started/main.py rename to examples/getting_started/basic-example.py diff --git a/examples/getting_started/example_with_streams.py b/examples/getting_started/example_with_streams.py index 4a850cc..03a0e30 100644 --- a/examples/getting_started/example_with_streams.py +++ b/examples/getting_started/example_with_streams.py @@ -19,7 +19,12 @@ def __init__(self): self._count = 0 def on_message(self, event: Event): - print("received message: " + str(event.message.body)) + print( + "received message from stream: " + + str(event.message.body) + + " with offset: " + + str(event.message.annotations["x-stream-offset"]) + ) # accepting self.delivery_context.accept(event) @@ -92,6 +97,7 @@ def main() -> None: stream_filter_options = StreamFilterOptions() # can be first, last, next or an offset long + # you can also specify stream filters stream_filter_options.offset(OffsetSpecification.first) consumer = consumer_connection.consumer( @@ -118,7 +124,7 @@ def main() -> None: # print("delete queue") - # management.delete_queue(queue_name) + management.delete_queue(queue_name) print("closing connections") management.close() diff --git a/tests/test_streams.py b/tests/test_streams.py index 0e4e369..6676a83 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -266,3 +266,40 @@ def test_stream_filtering_not_present(connection: Connection) -> None: management.delete_queue(stream_name) assert raised is True + + +def test_stream_match_unfiltered(connection: Connection) -> None: + + consumer = None + stream_name = "test_stream_info_with_filtering" + messages_to_send = 10 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + # consume and then publish + try: + stream_filter_options = StreamFilterOptions() + stream_filter_options.apply_filters(["banana"]) + stream_filter_options.filter_match_unfiltered(True) + connection_consumer = create_connection() + consumer = connection_consumer.consumer( + addr_queue, + handler=MyMessageHandlerAcceptStreamOffset(), + stream_filter_options=stream_filter_options, + ) + # send with annotations filter banana + publish_messages(connection, messages_to_send, stream_name) + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + consumer.close() + + management.delete_queue(stream_name) From 6af9bd004f716ef6c34123b6f248ff1b552e7585 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Wed, 5 Feb 2025 09:49:56 +0100 Subject: [PATCH 13/15] fix options parameters --- rabbitmq_amqp_python_client/consumer.py | 3 +-- rabbitmq_amqp_python_client/options.py | 5 +++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index dce7a1b..cdc953d 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -62,13 +62,12 @@ def _create_receiver(self, addr: str) -> BlockingReceiver: receiver = self._conn.create_receiver( addr, options=ReceiverOptionUnsettled(addr), handler=self._handler ) - receiver.credit = 1 + else: receiver = self._conn.create_receiver( addr, options=ReceiverOptionUnsettledWithFilters(addr, self._stream_options), handler=self._handler, ) - receiver.credit = 1 return receiver diff --git a/rabbitmq_amqp_python_client/options.py b/rabbitmq_amqp_python_client/options.py index 5c62ed8..96e39ed 100644 --- a/rabbitmq_amqp_python_client/options.py +++ b/rabbitmq_amqp_python_client/options.py @@ -73,6 +73,11 @@ def __init__(self, addr: str, filter_options: StreamFilterOptions): self._addr = addr def apply(self, link: Link) -> None: + link.target.address = self._addr + link.snd_settle_mode = Link.SND_UNSETTLED + link.rcv_settle_mode = Link.RCV_FIRST + link.properties = PropertyDict({symbol("paired"): True}) + link.source.dynamic = False link.source.filter.put_dict(self.filter_set) def test(self, link: Link) -> bool: From d243a8ef25c0726bac1940e3b4ea8d856a8f8d3b Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Wed, 5 Feb 2025 15:10:59 +0100 Subject: [PATCH 14/15] naming conventions --- rabbitmq_amqp_python_client/__init__.py | 4 ++-- rabbitmq_amqp_python_client/connection.py | 4 ++-- rabbitmq_amqp_python_client/consumer.py | 4 ++-- rabbitmq_amqp_python_client/entities.py | 2 +- rabbitmq_amqp_python_client/options.py | 4 ++-- tests/test_streams.py | 16 ++++++++-------- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 260f380..1a3e997 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -9,7 +9,7 @@ BindingSpecification, ExchangeSpecification, OffsetSpecification, - StreamFilterOptions, + StreamOptions, ) from .exceptions import ArgumentOutOfRangeException from .management import Management @@ -63,6 +63,6 @@ "ClientCert", "Delivery", "ConnectionClosed", - "StreamFilterOptions", + "StreamOptions", "OffsetSpecification", ] diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index f3a6c87..c8480b7 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -3,7 +3,7 @@ from .address_helper import validate_address from .consumer import Consumer -from .entities import StreamFilterOptions +from .entities import StreamOptions from .exceptions import ArgumentOutOfRangeException from .management import Management from .publisher import Publisher @@ -88,7 +88,7 @@ def consumer( self, destination: str, handler: Optional[MessagingHandler] = None, - stream_filter_options: Optional[StreamFilterOptions] = None, + stream_filter_options: Optional[StreamOptions] = None, ) -> Consumer: if validate_address(destination) is False: raise ArgumentOutOfRangeException( diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index cdc953d..729f247 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -1,7 +1,7 @@ import logging from typing import Literal, Optional, Union -from .entities import StreamFilterOptions +from .entities import StreamOptions from .options import ( ReceiverOptionUnsettled, ReceiverOptionUnsettledWithFilters, @@ -22,7 +22,7 @@ def __init__( conn: BlockingConnection, addr: str, handler: Optional[MessagingHandler] = None, - stream_options: Optional[StreamFilterOptions] = None, + stream_options: Optional[StreamOptions] = None, ): self._receiver: Optional[BlockingReceiver] = None self._conn = conn diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 59954d7..20b74ad 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -47,7 +47,7 @@ class BindingSpecification: binding_key: str -class StreamFilterOptions: +class StreamOptions: def __init__(self): # type: ignore self._filter_set: Dict[symbol, Described] = {} diff --git a/rabbitmq_amqp_python_client/options.py b/rabbitmq_amqp_python_client/options.py index 96e39ed..2617dff 100644 --- a/rabbitmq_amqp_python_client/options.py +++ b/rabbitmq_amqp_python_client/options.py @@ -1,4 +1,4 @@ -from .entities import StreamFilterOptions +from .entities import StreamOptions from .qpid.proton._data import ( # noqa: E402 PropertyDict, symbol, @@ -68,7 +68,7 @@ def test(self, link: Link) -> bool: class ReceiverOptionUnsettledWithFilters(Filter): # type: ignore - def __init__(self, addr: str, filter_options: StreamFilterOptions): + def __init__(self, addr: str, filter_options: StreamOptions): super().__init__(filter_options.filter_set()) self._addr = addr diff --git a/tests/test_streams.py b/tests/test_streams.py index 6676a83..7e7e557 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -2,7 +2,7 @@ AddressHelper, Connection, OffsetSpecification, - StreamFilterOptions, + StreamOptions, StreamSpecification, ) @@ -58,7 +58,7 @@ def test_stream_read_from_last(connection: Connection) -> None: addr_queue = AddressHelper.queue_address(stream_name) - stream_filter_options = StreamFilterOptions() + stream_filter_options = StreamOptions() stream_filter_options.offset(OffsetSpecification.last) # consume and then publish @@ -97,7 +97,7 @@ def test_stream_read_from_offset_zero(connection: Connection) -> None: # publish and then consume publish_messages(connection, messages_to_send, stream_name) - stream_filter_options = StreamFilterOptions() + stream_filter_options = StreamOptions() stream_filter_options.offset(0) try: @@ -135,7 +135,7 @@ def test_stream_read_from_offset_first(connection: Connection) -> None: # publish and then consume publish_messages(connection, messages_to_send, stream_name) - stream_filter_options = StreamFilterOptions() + stream_filter_options = StreamOptions() stream_filter_options.offset(OffsetSpecification.first) try: @@ -173,7 +173,7 @@ def test_stream_read_from_offset_ten(connection: Connection) -> None: # publish and then consume publish_messages(connection, messages_to_send, stream_name) - stream_filter_options = StreamFilterOptions() + stream_filter_options = StreamOptions() stream_filter_options.offset(10) try: @@ -211,7 +211,7 @@ def test_stream_filtering(connection: Connection) -> None: # consume and then publish try: - stream_filter_options = StreamFilterOptions() + stream_filter_options = StreamOptions() stream_filter_options.apply_filters(["banana"]) connection_consumer = create_connection() consumer = connection_consumer.consumer( @@ -246,7 +246,7 @@ def test_stream_filtering_not_present(connection: Connection) -> None: addr_queue = AddressHelper.queue_address(stream_name) # consume and then publish - stream_filter_options = StreamFilterOptions() + stream_filter_options = StreamOptions() stream_filter_options.apply_filters(["apple"]) connection_consumer = create_connection() consumer = connection_consumer.consumer( @@ -284,7 +284,7 @@ def test_stream_match_unfiltered(connection: Connection) -> None: # consume and then publish try: - stream_filter_options = StreamFilterOptions() + stream_filter_options = StreamOptions() stream_filter_options.apply_filters(["banana"]) stream_filter_options.filter_match_unfiltered(True) connection_consumer = create_connection() From 1281089f5a63f800bf42550a561d61c2612ac834 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Wed, 5 Feb 2025 15:56:55 +0100 Subject: [PATCH 15/15] improving examples --- .../{basic-example.py => basic_example.py} | 5 +- .../getting_started/example_with_streams.py | 6 +- .../getting_started/reconnection_example.py | 123 +++++++++++------- 3 files changed, 80 insertions(+), 54 deletions(-) rename examples/getting_started/{basic-example.py => basic_example.py} (98%) diff --git a/examples/getting_started/basic-example.py b/examples/getting_started/basic_example.py similarity index 98% rename from examples/getting_started/basic-example.py rename to examples/getting_started/basic_example.py index 8e69dee..baa253d 100644 --- a/examples/getting_started/basic-example.py +++ b/examples/getting_started/basic_example.py @@ -12,6 +12,8 @@ QuorumQueueSpecification, ) +messages_to_publish = 100 + class MyMessageHandler(AMQPMessagingHandler): @@ -43,7 +45,7 @@ def on_message(self, event: Event): self._count = self._count + 1 - if self._count == 100: + if self._count == messages_to_publish: print("closing receiver") # if you want you can add cleanup operations here # event.receiver.close() @@ -81,7 +83,6 @@ def main() -> None: exchange_name = "test-exchange" queue_name = "example-queue" routing_key = "routing-key" - messages_to_publish = 100000 print("connection to amqp server") connection = create_connection() diff --git a/examples/getting_started/example_with_streams.py b/examples/getting_started/example_with_streams.py index 03a0e30..0ff9df0 100644 --- a/examples/getting_started/example_with_streams.py +++ b/examples/getting_started/example_with_streams.py @@ -7,7 +7,7 @@ Event, Message, OffsetSpecification, - StreamFilterOptions, + StreamOptions, StreamSpecification, ) @@ -95,9 +95,9 @@ def main() -> None: consumer_connection = create_connection() - stream_filter_options = StreamFilterOptions() + stream_filter_options = StreamOptions() # can be first, last, next or an offset long - # you can also specify stream filters + # you can also specify stream filters with methods: apply_filters and filter_match_unfiltered stream_filter_options.offset(OffsetSpecification.first) consumer = consumer_connection.consumer( diff --git a/examples/getting_started/reconnection_example.py b/examples/getting_started/reconnection_example.py index a6aca92..0003eb5 100644 --- a/examples/getting_started/reconnection_example.py +++ b/examples/getting_started/reconnection_example.py @@ -2,6 +2,8 @@ import time +from dataclasses import dataclass +from typing import Optional from rabbitmq_amqp_python_client import ( AddressHelper, @@ -9,16 +11,27 @@ BindingSpecification, Connection, ConnectionClosed, + Consumer, Event, ExchangeSpecification, + Management, Message, + Publisher, QuorumQueueSpecification, ) -connection = None -management = None -publisher = None -consumer = None + +# here we keep track of the objects we need to reconnect +@dataclass +class ConnectionConfiguration: + connection: Optional[Connection] = None + management: Optional[Management] = None + publisher: Optional[Publisher] = None + consumer: Optional[Consumer] = None + + +connection_configuration = ConnectionConfiguration() +messages_to_publish = 50000 # disconnection callback @@ -30,22 +43,27 @@ def on_disconnection(): queue_name = "example-queue" routing_key = "routing-key" - global connection - global management - global publisher - global consumer + global connection_configuration addr = AddressHelper.exchange_address(exchange_name, routing_key) addr_queue = AddressHelper.queue_address(queue_name) - if connection is not None: - connection = create_connection() - if management is not None: - management = connection.management() - if publisher is not None: - publisher = connection.publisher(addr) - if consumer is not None: - consumer = connection.consumer(addr_queue, handler=MyMessageHandler()) + if connection_configuration.connection is not None: + connection_configuration.connection = create_connection() + if connection_configuration.management is not None: + connection_configuration.management = ( + connection_configuration.connection.management() + ) + if connection_configuration.publisher is not None: + connection_configuration.publisher = ( + connection_configuration.connection.publisher(addr) + ) + if connection_configuration.consumer is not None: + connection_configuration.consumer = ( + connection_configuration.connection.consumer( + addr_queue, handler=MyMessageHandler() + ) + ) class MyMessageHandler(AMQPMessagingHandler): @@ -55,7 +73,8 @@ def __init__(self): self._count = 0 def on_message(self, event: Event): - print("received message: " + str(event.message.annotations)) + if self._count % 1000 == 0: + print("received 100 message: " + str(event.message.body)) # accepting self.delivery_context.accept(event) @@ -74,11 +93,9 @@ def on_message(self, event: Event): # in case of rejection with annotations added # self.delivery_context.discard_with_annotations(event) - print("count " + str(self._count)) - self._count = self._count + 1 - if self._count == 100: + if self._count == messages_to_publish: print("closing receiver") # if you want you can add cleanup operations here # event.receiver.close() @@ -115,30 +132,30 @@ def main() -> None: exchange_name = "test-exchange" queue_name = "example-queue" routing_key = "routing-key" - messages_to_publish = 50000 - global connection - global management - global publisher - global consumer + global connection_configuration print("connection to amqp server") - if connection is None: - connection = create_connection() + if connection_configuration.connection is None: + connection_configuration.connection = create_connection() - if management is None: - management = connection.management() + if connection_configuration.management is None: + connection_configuration.management = ( + connection_configuration.connection.management() + ) print("declaring exchange and queue") - management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) + connection_configuration.management.declare_exchange( + ExchangeSpecification(name=exchange_name, arguments={}) + ) - management.declare_queue( + connection_configuration.management.declare_queue( QuorumQueueSpecification(name=queue_name) # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter") ) print("binding queue to exchange") - bind_name = management.bind( + bind_name = connection_configuration.management.bind( BindingSpecification( source_exchange=exchange_name, destination_queue=queue_name, @@ -151,30 +168,34 @@ def main() -> None: addr_queue = AddressHelper.queue_address(queue_name) print("create a publisher and publish a test message") - if publisher is None: - publisher = connection.publisher(addr) + if connection_configuration.publisher is None: + connection_configuration.publisher = ( + connection_configuration.connection.publisher(addr) + ) print("purging the queue") - messages_purged = management.purge_queue(queue_name) + messages_purged = connection_configuration.management.purge_queue(queue_name) print("messages purged: " + str(messages_purged)) # management.close() - # publish 10 messages + # publishing messages while True: for i in range(messages_to_publish): if i % 1000 == 0: - print("publishing") + print("published 1000 messages...") try: - publisher.publish(Message(body="test")) + if connection_configuration.publisher is not None: + connection_configuration.publisher.publish(Message(body="test")) except ConnectionClosed: print("publisher closing exception, resubmitting") continue - print("closing") + print("closing publisher") try: - publisher.close() + if connection_configuration.publisher is not None: + connection_configuration.publisher.close() except ConnectionClosed: print("publisher closing exception, resubmitting") continue @@ -183,12 +204,16 @@ def main() -> None: print( "create a consumer and consume the test message - press control + c to terminate to consume" ) - if consumer is None: - consumer = connection.consumer(addr_queue, handler=MyMessageHandler()) + if connection_configuration.consumer is None: + connection_configuration.consumer = ( + connection_configuration.connection.consumer( + addr_queue, handler=MyMessageHandler() + ) + ) while True: try: - consumer.run() + connection_configuration.consumer.run() except KeyboardInterrupt: pass except ConnectionClosed: @@ -200,24 +225,24 @@ def main() -> None: break print("cleanup") - consumer.close() + connection_configuration.consumer.close() # once we finish consuming if we close the connection we need to create a new one # connection = create_connection() # management = connection.management() print("unbind") - management.unbind(bind_name) + connection_configuration.management.unbind(bind_name) print("delete queue") - management.delete_queue(queue_name) + connection_configuration.management.delete_queue(queue_name) print("delete exchange") - management.delete_exchange(exchange_name) + connection_configuration.management.delete_exchange(exchange_name) print("closing connections") - management.close() + connection_configuration.management.close() print("after management closing") - connection.close() + connection_configuration.connection.close() print("after connection closing")