@@ -231,6 +231,45 @@ def test_stream_filtering(connection: Connection) -> None:
231
231
management .delete_queue (stream_name )
232
232
233
233
234
+ def test_stream_filtering_mixed (connection : Connection ) -> None :
235
+
236
+ consumer = None
237
+ stream_name = "test_stream_info_with_filtering"
238
+ messages_to_send = 10
239
+
240
+ queue_specification = StreamSpecification (
241
+ name = stream_name ,
242
+ )
243
+ management = connection .management ()
244
+ management .declare_queue (queue_specification )
245
+
246
+ addr_queue = AddressHelper .queue_address (stream_name )
247
+
248
+ # consume and then publish
249
+ try :
250
+ stream_filter_options = StreamOptions ()
251
+ stream_filter_options .filter_values (["banana" ])
252
+ connection_consumer = create_connection ()
253
+ consumer = connection_consumer .consumer (
254
+ addr_queue ,
255
+ # check we are reading just from offset 10 as just banana filtering applies
256
+ message_handler = MyMessageHandlerAcceptStreamOffset (10 ),
257
+ stream_filter_options = stream_filter_options ,
258
+ )
259
+ # send with annotations filter apple and then banana
260
+ # consumer will read just from offset 10
261
+ publish_messages (connection , messages_to_send , stream_name , ["apple" ])
262
+ publish_messages (connection , messages_to_send , stream_name , ["banana" ])
263
+ consumer .run ()
264
+ # ack to terminate the consumer
265
+ except ConsumerTestException :
266
+ pass
267
+
268
+ consumer .close ()
269
+
270
+ management .delete_queue (stream_name )
271
+
272
+
234
273
def test_stream_filtering_not_present (connection : Connection ) -> None :
235
274
236
275
raised = False
0 commit comments