@@ -261,6 +261,50 @@ def test_stream_filtering(connection: Connection, environment: Environment) -> N
261
261
management .delete_queue (stream_name )
262
262
263
263
264
+ def test_stream_filtering_mixed (
265
+ connection : Connection , environment : Environment
266
+ ) -> None :
267
+
268
+ consumer = None
269
+ stream_name = "test_stream_info_with_filtering"
270
+ messages_to_send = 10
271
+
272
+ queue_specification = StreamSpecification (
273
+ name = stream_name ,
274
+ )
275
+ management = connection .management ()
276
+ management .declare_queue (queue_specification )
277
+
278
+ addr_queue = AddressHelper .queue_address (stream_name )
279
+
280
+ # consume and then publish
281
+ try :
282
+ stream_filter_options = StreamOptions ()
283
+ stream_filter_options .filter_values (["banana" ])
284
+ connection_consumer = environment .connection (
285
+ "amqp://guest:guest@localhost:5672/"
286
+ )
287
+ connection_consumer .dial ()
288
+ consumer = connection_consumer .consumer (
289
+ addr_queue ,
290
+ # check we are reading just from offset 10 as just banana filtering applies
291
+ message_handler = MyMessageHandlerAcceptStreamOffset (10 ),
292
+ stream_filter_options = stream_filter_options ,
293
+ )
294
+ # send with annotations filter apple and then banana
295
+ # consumer will read just from offset 10
296
+ publish_messages (connection , messages_to_send , stream_name , ["apple" ])
297
+ publish_messages (connection , messages_to_send , stream_name , ["banana" ])
298
+ consumer .run ()
299
+ # ack to terminate the consumer
300
+ except ConsumerTestException :
301
+ pass
302
+
303
+ consumer .close ()
304
+
305
+ management .delete_queue (stream_name )
306
+
307
+
264
308
def test_stream_filtering_not_present (
265
309
connection : Connection , environment : Environment
266
310
) -> None :
0 commit comments