From b117790a501098cde0d4c15fe3fe962328209173 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Wed, 12 Feb 2025 11:23:46 +0100 Subject: [PATCH] adding filtering test --- tests/test_streams.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/tests/test_streams.py b/tests/test_streams.py index af9a714..07c0e76 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -231,6 +231,45 @@ def test_stream_filtering(connection: Connection) -> None: management.delete_queue(stream_name) +def test_stream_filtering_mixed(connection: Connection) -> None: + + consumer = None + stream_name = "test_stream_info_with_filtering" + messages_to_send = 10 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + # consume and then publish + try: + stream_filter_options = StreamOptions() + stream_filter_options.filter_values(["banana"]) + connection_consumer = create_connection() + consumer = connection_consumer.consumer( + addr_queue, + # check we are reading just from offset 10 as just banana filtering applies + message_handler=MyMessageHandlerAcceptStreamOffset(10), + stream_filter_options=stream_filter_options, + ) + # send with annotations filter apple and then banana + # consumer will read just from offset 10 + publish_messages(connection, messages_to_send, stream_name, ["apple"]) + publish_messages(connection, messages_to_send, stream_name, ["banana"]) + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + consumer.close() + + management.delete_queue(stream_name) + + def test_stream_filtering_not_present(connection: Connection) -> None: raised = False