File tree Expand file tree Collapse file tree 3 files changed +39
-1
lines changed Expand file tree Collapse file tree 3 files changed +39
-1
lines changed File renamed without changes.
Original file line number Diff line number Diff line change @@ -92,6 +92,7 @@ def main() -> None:
92
92
93
93
stream_filter_options = StreamFilterOptions ()
94
94
# can be first, last, next or an offset long
95
+ # you can also specify stream filters
95
96
stream_filter_options .offset (OffsetSpecification .first )
96
97
97
98
consumer = consumer_connection .consumer (
@@ -118,7 +119,7 @@ def main() -> None:
118
119
119
120
#
120
121
print ("delete queue" )
121
- # management.delete_queue(queue_name)
122
+ management .delete_queue (queue_name )
122
123
123
124
print ("closing connections" )
124
125
management .close ()
Original file line number Diff line number Diff line change @@ -266,3 +266,40 @@ def test_stream_filtering_not_present(connection: Connection) -> None:
266
266
management .delete_queue (stream_name )
267
267
268
268
assert raised is True
269
+
270
+
271
+ def test_stream_match_unfiltered (connection : Connection ) -> None :
272
+
273
+ consumer = None
274
+ stream_name = "test_stream_info_with_filtering"
275
+ messages_to_send = 10
276
+
277
+ queue_specification = StreamSpecification (
278
+ name = stream_name ,
279
+ )
280
+ management = connection .management ()
281
+ management .declare_queue (queue_specification )
282
+
283
+ addr_queue = AddressHelper .queue_address (stream_name )
284
+
285
+ # consume and then publish
286
+ try :
287
+ stream_filter_options = StreamFilterOptions ()
288
+ stream_filter_options .apply_filters (["banana" ])
289
+ stream_filter_options .filter_match_unfiltered (True )
290
+ connection_consumer = create_connection ()
291
+ consumer = connection_consumer .consumer (
292
+ addr_queue ,
293
+ handler = MyMessageHandlerAcceptStreamOffset (),
294
+ stream_filter_options = stream_filter_options ,
295
+ )
296
+ # send with annotations filter banana
297
+ publish_messages (connection , messages_to_send , stream_name )
298
+ consumer .run ()
299
+ # ack to terminate the consumer
300
+ except ConsumerTestException :
301
+ pass
302
+
303
+ consumer .close ()
304
+
305
+ management .delete_queue (stream_name )
You can’t perform that action at this time.
0 commit comments