Skip to content

Commit 06daba6

Browse files
author
DanielePalaia
committed
stream offset implementation
1 parent 78a49b6 commit 06daba6

File tree

11 files changed

+168
-31
lines changed

11 files changed

+168
-31
lines changed

examples/getting_started/main.py

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
ExchangeSpecification,
1111
Message,
1212
QuorumQueueSpecification,
13+
StreamSpecification,
14+
StreamFilterOptions
1315
)
1416

1517

@@ -81,59 +83,63 @@ def main() -> None:
8183
exchange_name = "test-exchange"
8284
queue_name = "example-queue"
8385
routing_key = "routing-key"
84-
messages_to_publish = 100000
86+
messages_to_publish = 1000
8587

8688
print("connection to amqp server")
8789
connection = create_connection()
8890

8991
management = connection.management()
9092

9193
print("declaring exchange and queue")
92-
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
94+
#management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
9395

9496
management.declare_queue(
95-
QuorumQueueSpecification(name=queue_name)
97+
StreamSpecification(name=queue_name)
9698
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
9799
)
98100

99101
print("binding queue to exchange")
100-
bind_name = management.bind(
101-
BindingSpecification(
102-
source_exchange=exchange_name,
103-
destination_queue=queue_name,
104-
binding_key=routing_key,
105-
)
106-
)
102+
#bind_name = management.bind(
103+
# BindingSpecification(
104+
# source_exchange=exchange_name,
105+
# destination_queue=queue_name,
106+
# binding_key=routing_key,
107+
# )
108+
#)
107109

108-
addr = AddressHelper.exchange_address(exchange_name, routing_key)
110+
#addr = AddressHelper.exchange_address(exchange_name, routing_key)
109111

110112
addr_queue = AddressHelper.queue_address(queue_name)
111113

112114
print("create a publisher and publish a test message")
113-
publisher = connection.publisher(addr)
115+
publisher = connection.publisher(addr_queue)
114116

115117
print("purging the queue")
116-
messages_purged = management.purge_queue(queue_name)
118+
#messages_purged = management.purge_queue(queue_name)
117119

118-
print("messages purged: " + str(messages_purged))
120+
#print("messages purged: " + str(messages_purged))
119121
# management.close()
120122

121123
# publish 10 messages
122124
for i in range(messages_to_publish):
123125
status = publisher.publish(Message(body="test"))
124-
if status.ACCEPTED:
125-
print("message accepted")
126-
elif status.RELEASED:
127-
print("message not routed")
128-
elif status.REJECTED:
129-
print("message not rejected")
126+
#if status.ACCEPTED:
127+
# print("message accepted")
128+
#elif status.RELEASED:
129+
# print("message not routed")
130+
#elif status.REJECTED:
131+
# print("message not rejected")
130132

131133
publisher.close()
132134

133135
print(
134136
"create a consumer and consume the test message - press control + c to terminate to consume"
135137
)
136-
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())
138+
139+
stream_filter_options = StreamFilterOptions()
140+
stream_filter_options.offset(0)
141+
142+
consumer = connection.consumer(addr_queue, handler=MyMessageHandler(), stream_filter_options=stream_filter_options)
137143

138144
try:
139145
consumer.run()
@@ -147,10 +153,10 @@ def main() -> None:
147153
# management = connection.management()
148154

149155
print("unbind")
150-
management.unbind(bind_name)
156+
#management.unbind(bind_name)
151157

152158
print("delete queue")
153-
management.delete_queue(queue_name)
159+
#management.delete_queue(queue_name)
154160

155161
print("delete exchange")
156162
management.delete_exchange(exchange_name)

rabbitmq_amqp_python_client/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
SslConfigurationContext,
2929
)
3030

31+
from .entities import StreamFilterOptions
32+
3133
try:
3234
__version__ = metadata.version(__package__)
3335
__license__ = metadata.metadata(__package__)["license"]
@@ -61,4 +63,5 @@
6163
"ClientCert",
6264
"Delivery",
6365
"ConnectionClosed",
66+
"StreamFilterOptions",
6467
]

rabbitmq_amqp_python_client/connection.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
from .qpid.proton.utils import BlockingConnection
1212
from .ssl_configuration import SslConfigurationContext
1313

14+
from.entities import StreamFilterOptions
15+
1416
logger = logging.getLogger(__name__)
1517

1618
MT = TypeVar("MT")
@@ -84,11 +86,11 @@ def publisher(self, destination: str) -> Publisher:
8486
return publisher
8587

8688
def consumer(
87-
self, destination: str, handler: Optional[MessagingHandler] = None
89+
self, destination: str, handler: Optional[MessagingHandler] = None, stream_filter_options: Optional[StreamFilterOptions] = None
8890
) -> Consumer:
8991
if validate_address(destination) is False:
9092
raise ArgumentOutOfRangeException(
9193
"destination address must start with /queues or /exchanges"
9294
)
93-
consumer = Consumer(self._conn, destination, handler)
95+
consumer = Consumer(self._conn, destination, handler, stream_filter_options)
9496
return consumer

rabbitmq_amqp_python_client/consumer.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
import logging
22
from typing import Optional
33

4-
from .options import ReceiverOptionUnsettled
4+
from .options import ReceiverOptionUnsettled, ReceiverOptionUnsettledWithFilters
55
from .qpid.proton._handlers import MessagingHandler
66
from .qpid.proton._message import Message
77
from .qpid.proton.utils import (
88
BlockingConnection,
99
BlockingReceiver,
1010
)
1111

12+
from.entities import StreamFilterOptions
13+
1214
logger = logging.getLogger(__name__)
1315

1416

@@ -18,11 +20,13 @@ def __init__(
1820
conn: BlockingConnection,
1921
addr: str,
2022
handler: Optional[MessagingHandler] = None,
23+
stream_options: Optional[StreamFilterOptions] = None
2124
):
2225
self._receiver: Optional[BlockingReceiver] = None
2326
self._conn = conn
2427
self._addr = addr
2528
self._handler = handler
29+
self._stream_options = stream_options
2630
self._open()
2731

2832
def _open(self) -> None:
@@ -52,6 +56,15 @@ def stop(self) -> None:
5256

5357
def _create_receiver(self, addr: str) -> BlockingReceiver:
5458
logger.debug("Creating the receiver")
55-
return self._conn.create_receiver(
59+
if self._stream_options is None:
60+
receiver = self._conn.create_receiver(
5661
addr, options=ReceiverOptionUnsettled(addr), handler=self._handler
57-
)
62+
)
63+
else:
64+
print("stream option is not None")
65+
receiver = self._conn.create_receiver(
66+
addr, options=ReceiverOptionUnsettledWithFilters(addr, self._stream_options), handler=self._handler
67+
)
68+
69+
return receiver
70+

rabbitmq_amqp_python_client/entities.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
from dataclasses import dataclass
2-
from typing import Any, Optional
2+
from typing import Any, Optional, Dict
33

4+
from .qpid.proton._data import symbol, Described
45
from .common import ExchangeType, QueueType
56

7+
STREAM_FILTER_SPEC = "rabbitmq:stream-filter"
8+
STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec"
9+
STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered"
610

711
@dataclass
812
class ExchangeSpecification:
@@ -33,3 +37,16 @@ class BindingSpecification:
3337
source_exchange: str
3438
destination_queue: str
3539
binding_key: str
40+
41+
42+
class StreamFilterOptions:
43+
44+
def __init__(self):
45+
self._filter_set: Dict[symbol, Described] = {}
46+
47+
def offset(self, offset: int):
48+
#self._filter_set[symbol(STREAM_FILTER_SPEC)] = Described(symbol(STREAM_FILTER_SPEC), "first")
49+
self._filter_set[symbol('rabbitmq:stream-offset-spec')] = Described(symbol('rabbitmq:stream-offset-spec'), 0)
50+
51+
def filters(self) -> Dict[symbol, Described]:
52+
return self._filter_set

rabbitmq_amqp_python_client/options.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
symbol,
44
)
55
from .qpid.proton._endpoints import Link # noqa: E402
6-
from .qpid.proton.reactor import LinkOption # noqa: E402
6+
from .qpid.proton.reactor import LinkOption, Filter # noqa: E402
7+
from .entities import StreamFilterOptions
78

89

910
class SenderOption(LinkOption): # type: ignore
@@ -52,6 +53,23 @@ class ReceiverOptionUnsettled(LinkOption): # type: ignore
5253
def __init__(self, addr: str):
5354
self._addr = addr
5455

56+
57+
def apply(self, link: Link) -> None:
58+
link.target.address = self._addr
59+
link.snd_settle_mode = Link.SND_UNSETTLED
60+
link.rcv_settle_mode = Link.RCV_FIRST
61+
link.properties = PropertyDict({symbol("paired"): True})
62+
link.source.dynamic = False
63+
64+
def test(self, link: Link) -> bool:
65+
return bool(link.is_receiver)
66+
67+
class ReceiverOptionUnsettledWithFilters(Filter): # type: ignore
68+
def __init__(self, addr: str, filter_options: StreamFilterOptions):
69+
super().__init__(filter_options.filters())
70+
self._addr = addr
71+
72+
5573
def apply(self, link: Link) -> None:
5674
link.target.address = self._addr
5775
link.snd_settle_mode = Link.SND_UNSETTLED

rabbitmq_amqp_python_client/qpid/proton/_reactor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,7 @@ class Filter(ReceiverOption):
814814
"""
815815

816816
def __init__(self, filter_set: Dict[symbol, Described] = {}) -> None:
817+
print("filterset: " + str(filter_set))
817818
self.filter_set = filter_set
818819

819820
def apply(self, receiver: "Receiver") -> None:

tests/conftest.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,10 @@ def __init__(self):
8686
self._received = 0
8787

8888
def on_message(self, event: Event):
89+
print("received message: " + str(event.message.body))
8990
self.delivery_context.accept(event)
9091
self._received = self._received + 1
91-
if self._received == 1000:
92+
if self._received == 10:
9293
event.connection.close()
9394
raise ConsumerTestException("consumed")
9495

tests/test_publisher.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
ExchangeSpecification,
1010
Message,
1111
QuorumQueueSpecification,
12+
StreamSpecification,
1213
)
1314

1415
from .http_requests import delete_all_connections
@@ -238,3 +239,28 @@ def on_disconnected():
238239
assert disconnected is True
239240
assert reconnected is True
240241
assert message_purged == messages_to_publish - 1
242+
243+
244+
def test_queue_info_for_stream_with_validations(connection: Connection) -> None:
245+
246+
stream_name = "test_stream_info_with_validation"
247+
messages_to_send = 200
248+
249+
queue_specification = StreamSpecification(
250+
name=stream_name,
251+
)
252+
management = connection.management()
253+
management.declare_queue(queue_specification)
254+
255+
print("before creating publisher")
256+
257+
publisher = connection.publisher("/queues/" + stream_name)
258+
259+
print("after creating publisher")
260+
261+
for i in range(messages_to_send):
262+
263+
publisher.publish(Message(body="test"))
264+
265+
266+

tests/test_publisher_streams.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from rabbitmq_amqp_python_client import (
2+
StreamSpecification,
3+
QueueType,
4+
Management,
5+
AddressHelper,
6+
StreamFilterOptions,
7+
Connection,
8+
)
9+
10+
from .utils import publish_messages
11+
from .conftest import MyMessageHandlerAccept, ConsumerTestException
12+
13+
14+
def test_queue_info_for_stream_with_validations(connection: Connection) -> None:
15+
16+
stream_name = "test_stream_info_with_validation"
17+
messages_to_send = 200
18+
19+
queue_specification = StreamSpecification(
20+
name=stream_name,
21+
)
22+
management = connection.management()
23+
management.declare_queue(queue_specification)
24+
25+
publish_messages(connection, messages_to_send, stream_name)
26+
27+
addr_queue = AddressHelper.queue_address(stream_name)
28+
29+
stream_filter_options = StreamFilterOptions()
30+
stream_filter_options.offset(0)
31+
32+
consumer = connection.consumer(addr_queue, handler=MyMessageHandlerAccept())
33+
34+
try:
35+
print("running")
36+
consumer.run()
37+
# ack to terminate the consumer
38+
except ConsumerTestException:
39+
pass
40+
41+
consumer.close()
42+
43+
management.delete_queue(stream_name)
44+
45+
#assert stream_info.name == stream_name
46+
#assert stream_info.queue_type == QueueType.stream
47+
#assert stream_info.message_count == 0

0 commit comments

Comments
 (0)