Skip to content

Commit 60d4763

Browse files
author
DanielePalaia
committed
supporting reconnection of streams by last consumed offset
1 parent c09162d commit 60d4763

File tree

6 files changed

+107
-11
lines changed

6 files changed

+107
-11
lines changed

examples/streams/example_with_streams.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
AddressHelper,
55
AMQPMessagingHandler,
66
Connection,
7+
ConnectionClosed,
78
Environment,
89
Event,
910
Message,
@@ -12,7 +13,7 @@
1213
StreamSpecification,
1314
)
1415

15-
MESSAGES_TO_PUBLISH = 100
16+
MESSAGES_TO_PUBLISH = 1
1617

1718

1819
class MyMessageHandler(AMQPMessagingHandler):
@@ -21,7 +22,7 @@ def __init__(self):
2122
super().__init__()
2223
self._count = 0
2324

24-
def on_message(self, event: Event):
25+
def on_amqp_message(self, event: Event):
2526
# just messages with banana filters get received
2627
print(
2728
"received message from stream: "
@@ -86,7 +87,7 @@ def main() -> None:
8687
queue_name = "example-queue"
8788

8889
print("connection to amqp server")
89-
environment = Environment("amqp://guest:guest@localhost:5672/")
90+
environment = Environment("amqp://guest:guest@localhost:5672/", reconnect=True)
9091
connection = create_connection(environment)
9192

9293
management = connection.management()
@@ -134,14 +135,22 @@ def main() -> None:
134135

135136
publisher.close()
136137

137-
try:
138-
consumer.run()
139-
except KeyboardInterrupt:
140-
pass
138+
while True:
139+
try:
140+
consumer.run()
141+
except KeyboardInterrupt:
142+
pass
143+
except ConnectionClosed:
144+
print("connection closed")
145+
continue
146+
except Exception as e:
147+
print("consumer exited for exception " + str(e))
148+
149+
break
141150

142151
#
143152
print("delete queue")
144-
management.delete_queue(queue_name)
153+
# management.delete_queue(queue_name)
145154

146155
print("closing connections")
147156
management.close()

rabbitmq_amqp_python_client/amqp_consumer_handler.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,18 @@ def __init__(self, auto_accept: bool = False, auto_settle: bool = True):
2121
"""
2222
super().__init__(auto_accept=auto_accept, auto_settle=auto_settle)
2323
self.delivery_context: DeliveryContext = DeliveryContext()
24+
self._offset = 0
2425

2526
def on_amqp_message(self, event: Event) -> None:
2627
pass
2728

2829
def on_message(self, event: Event) -> None:
2930
print("first level callback")
31+
if "x-stream-offset" in event.message.annotations:
32+
print("setting offset")
33+
self._offset = int(event.message.annotations["x-stream-offset"])
3034
self.on_amqp_message(event)
35+
36+
@property
37+
def offset(self) -> int:
38+
return self._offset

rabbitmq_amqp_python_client/connection.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ def _on_disconnection(self) -> None:
292292
self._publishers[i]._update_connection(self._conn)
293293

294294
for i, consumer in enumerate(self._consumers):
295+
print("reconnecting consumer")
295296
# Update the broken connection and sender in the consumer
296297
self._consumers[i]._update_connection(self._conn)
297298

rabbitmq_amqp_python_client/consumer.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import logging
22
from typing import Literal, Optional, Union, cast
33

4+
from .amqp_consumer_handler import AMQPMessagingHandler
45
from .entities import StreamOptions
56
from .options import (
67
ReceiverOptionUnsettled,
78
ReceiverOptionUnsettledWithFilters,
89
)
9-
from .qpid.proton._handlers import MessagingHandler
1010
from .qpid.proton._message import Message
1111
from .qpid.proton.utils import (
1212
BlockingConnection,
@@ -37,7 +37,7 @@ def __init__(
3737
self,
3838
conn: BlockingConnection,
3939
addr: str,
40-
handler: Optional[MessagingHandler] = None,
40+
handler: Optional[AMQPMessagingHandler] = None,
4141
stream_options: Optional[StreamOptions] = None,
4242
credit: Optional[int] = None,
4343
):
@@ -67,7 +67,23 @@ def _open(self) -> None:
6767

6868
def _update_connection(self, conn: BlockingConnection) -> None:
6969
self._conn = conn
70-
self._receiver = self._create_receiver(self._addr)
70+
if self._stream_options is None:
71+
print("creating new receiver without stream")
72+
self._receiver = self._conn.create_receiver(
73+
self._addr,
74+
options=ReceiverOptionUnsettled(self._addr),
75+
handler=self._handler,
76+
)
77+
else:
78+
print("creating new stream receiver")
79+
self._stream_options.offset(self._handler.offset - 1) # type: ignore
80+
self._receiver = self._conn.create_receiver(
81+
self._addr,
82+
options=ReceiverOptionUnsettledWithFilters(
83+
self._addr, self._stream_options
84+
),
85+
handler=self._handler,
86+
)
7187

7288
def _set_consumers_list(self, consumers: []) -> None: # type: ignore
7389
self._consumers = consumers
@@ -149,3 +165,7 @@ def _create_receiver(self, addr: str) -> BlockingReceiver:
149165
def address(self) -> str:
150166
"""Get the current publisher address."""
151167
return self._addr
168+
169+
@property
170+
def handler(self) -> Optional[AMQPMessagingHandler]:
171+
return self._handler

tests/conftest.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
FriendlyName,
2121
)
2222

23+
from .http_requests import delete_all_connections
24+
2325
os.chdir(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
2426

2527

@@ -150,6 +152,22 @@ def on_message(self, event: Event):
150152
raise ConsumerTestException("consumed")
151153

152154

155+
class MyMessageHandlerAcceptStreamOffsetReconnect(AMQPMessagingHandler):
156+
157+
def __init__(self, starting_offset: Optional[int] = None):
158+
super().__init__()
159+
self._received = 0
160+
self._starting_offset = starting_offset
161+
162+
def on_message(self, event: Event):
163+
if self._received == 5:
164+
delete_all_connections()
165+
self.delivery_context.accept(event)
166+
self._received = self._received + 1
167+
if self._received == 10:
168+
raise ConsumerTestException("consumed")
169+
170+
153171
class MyMessageHandlerNoack(AMQPMessagingHandler):
154172

155173
def __init__(self):

tests/test_streams.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from .conftest import (
1111
ConsumerTestException,
1212
MyMessageHandlerAcceptStreamOffset,
13+
MyMessageHandlerAcceptStreamOffsetReconnect,
1314
)
1415
from .utils import publish_messages
1516

@@ -370,3 +371,42 @@ def test_stream_match_unfiltered(
370371
consumer.close()
371372

372373
management.delete_queue(stream_name)
374+
375+
376+
def test_stream_reconnection(connection: Connection, environment: Environment) -> None:
377+
378+
consumer = None
379+
stream_name = "test_stream_info_with_filtering"
380+
messages_to_send = 10
381+
382+
queue_specification = StreamSpecification(
383+
name=stream_name,
384+
)
385+
management = connection.management()
386+
management.declare_queue(queue_specification)
387+
388+
addr_queue = AddressHelper.queue_address(stream_name)
389+
390+
# consume and then publish
391+
try:
392+
stream_filter_options = StreamOptions()
393+
stream_filter_options.filter_values(["banana"])
394+
stream_filter_options.filter_match_unfiltered(True)
395+
connection_consumer = environment.connection()
396+
connection_consumer.dial()
397+
consumer = connection_consumer.consumer(
398+
addr_queue,
399+
# disconnection and check happens here
400+
message_handler=MyMessageHandlerAcceptStreamOffsetReconnect(),
401+
stream_filter_options=stream_filter_options,
402+
)
403+
# send with annotations filter banana
404+
publish_messages(connection, messages_to_send, stream_name)
405+
consumer.run()
406+
# ack to terminate the consumer
407+
except ConsumerTestException:
408+
pass
409+
410+
consumer.close()
411+
412+
management.delete_queue(stream_name)

0 commit comments

Comments
 (0)