Skip to content

Commit f4e0e62

Browse files
author
DanielePalaia
committed
implementing and testing filterings
1 parent 7a04eaf commit f4e0e62

File tree

3 files changed

+90
-5
lines changed

3 files changed

+90
-5
lines changed

rabbitmq_amqp_python_client/consumer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import logging
2-
from typing import Optional
2+
from typing import Literal, Optional, Union
33

44
from .entities import StreamFilterOptions
55
from .options import (
@@ -36,9 +36,9 @@ def _open(self) -> None:
3636
logger.debug("Creating Sender")
3737
self._receiver = self._create_receiver(self._addr)
3838

39-
def consume(self) -> Message:
39+
def consume(self, timeout: Union[None, Literal[False], float] = False) -> Message:
4040
if self._receiver is not None:
41-
return self._receiver.receive()
41+
return self._receiver.receive(timeout=timeout)
4242

4343
def close(self) -> None:
4444
logger.debug("Closing the receiver")

tests/test_streams.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,3 +193,76 @@ def test_stream_read_from_offset_ten(connection: Connection) -> None:
193193
consumer.close()
194194

195195
management.delete_queue(stream_name)
196+
197+
198+
def test_stream_filtering(connection: Connection) -> None:
199+
200+
consumer = None
201+
stream_name = "test_stream_info_with_filtering"
202+
messages_to_send = 10
203+
204+
queue_specification = StreamSpecification(
205+
name=stream_name,
206+
)
207+
management = connection.management()
208+
management.declare_queue(queue_specification)
209+
210+
addr_queue = AddressHelper.queue_address(stream_name)
211+
212+
# consume and then publish
213+
try:
214+
stream_filter_options = StreamFilterOptions()
215+
stream_filter_options.apply_filters(["banana"])
216+
connection_consumer = create_connection()
217+
consumer = connection_consumer.consumer(
218+
addr_queue,
219+
handler=MyMessageHandlerAcceptStreamOffset(),
220+
stream_filter_options=stream_filter_options,
221+
)
222+
# send with annotations filter banana
223+
publish_messages(connection, messages_to_send, stream_name, ["banana"])
224+
consumer.run()
225+
# ack to terminate the consumer
226+
except ConsumerTestException:
227+
pass
228+
229+
consumer.close()
230+
231+
management.delete_queue(stream_name)
232+
233+
234+
def test_stream_filtering_not_present(connection: Connection) -> None:
235+
236+
raised = False
237+
stream_name = "test_stream_info_with_filtering"
238+
messages_to_send = 10
239+
240+
queue_specification = StreamSpecification(
241+
name=stream_name,
242+
)
243+
management = connection.management()
244+
management.declare_queue(queue_specification)
245+
246+
addr_queue = AddressHelper.queue_address(stream_name)
247+
248+
# consume and then publish
249+
stream_filter_options = StreamFilterOptions()
250+
stream_filter_options.apply_filters(["apple"])
251+
connection_consumer = create_connection()
252+
consumer = connection_consumer.consumer(
253+
addr_queue, stream_filter_options=stream_filter_options
254+
)
255+
# send with annotations filter banana
256+
publish_messages(connection, messages_to_send, stream_name, ["banana"])
257+
258+
try:
259+
consumer.consume(timeout=1)
260+
except Exception:
261+
# valid no message should arrive with filter banana so a timeout exception is raised
262+
raised = True
263+
264+
consumer.close()
265+
266+
management.delete_queue(stream_name)
267+
268+
assert raised is True

tests/utils.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Optional
2+
13
from rabbitmq_amqp_python_client import (
24
BindingSpecification,
35
Connection,
@@ -16,11 +18,21 @@ def create_connection() -> Connection:
1618
return connection_consumer
1719

1820

19-
def publish_messages(connection: Connection, messages_to_send: int, queue_name) -> None:
21+
def publish_messages(
22+
connection: Connection,
23+
messages_to_send: int,
24+
queue_name,
25+
filters: Optional[list[str]] = None,
26+
) -> None:
27+
annotations = {}
28+
if filters is not None:
29+
for filter in filters:
30+
annotations = {"x-stream-filter-value": filter}
31+
2032
publisher = connection.publisher("/queues/" + queue_name)
2133
# publish messages_to_send messages
2234
for i in range(messages_to_send):
23-
publisher.publish(Message(body="test" + str(i)))
35+
publisher.publish(Message(body="test" + str(i), annotations=annotations))
2436
publisher.close()
2537

2638

0 commit comments

Comments
 (0)