Skip to content

Commit d08139f

Browse files
DanielePalaiaDanielePalaiaGsantomaggio
authored
Streams support and management of disconnections (#28)
* managing disconnections * adding tests * adding a test * supporting multinode in Connection * multinode implementation * stream offset implementation * add credits (#27) Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> * test updated * add filters (#29) Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> * refactoring and adding tests * implementing and testing filterings * adding match unfiltered test * fix options parameters * naming conventions * improving examples --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Co-authored-by: DanielePalaia <daniele985@@gmail.com> Co-authored-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 1068c05 commit d08139f

File tree

17 files changed

+1263
-72
lines changed

17 files changed

+1263
-72
lines changed

examples/getting_started/main.py renamed to examples/getting_started/basic_example.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
QuorumQueueSpecification,
1313
)
1414

15+
messages_to_publish = 100
16+
1517

1618
class MyMessageHandler(AMQPMessagingHandler):
1719

@@ -43,7 +45,7 @@ def on_message(self, event: Event):
4345

4446
self._count = self._count + 1
4547

46-
if self._count == 100:
48+
if self._count == messages_to_publish:
4749
print("closing receiver")
4850
# if you want you can add cleanup operations here
4951
# event.receiver.close()
@@ -59,7 +61,7 @@ def on_link_closed(self, event: Event) -> None:
5961

6062

6163
def create_connection() -> Connection:
62-
connection = Connection("amqps://guest:guest@localhost:5672/")
64+
connection = Connection("amqp://guest:guest@localhost:5672/")
6365
# in case of SSL enablement
6466
# ca_cert_file = ".ci/certs/ca_certificate.pem"
6567
# client_cert = ".ci/certs/client_certificate.pem"
@@ -81,7 +83,6 @@ def main() -> None:
8183
exchange_name = "test-exchange"
8284
queue_name = "example-queue"
8385
routing_key = "routing-key"
84-
messages_to_publish = 100000
8586

8687
print("connection to amqp server")
8788
connection = create_connection()
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# type: ignore
2+
3+
from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert,
4+
AddressHelper,
5+
AMQPMessagingHandler,
6+
Connection,
7+
Event,
8+
Message,
9+
OffsetSpecification,
10+
StreamOptions,
11+
StreamSpecification,
12+
)
13+
14+
15+
class MyMessageHandler(AMQPMessagingHandler):
16+
17+
def __init__(self):
18+
super().__init__()
19+
self._count = 0
20+
21+
def on_message(self, event: Event):
22+
print(
23+
"received message from stream: "
24+
+ str(event.message.body)
25+
+ " with offset: "
26+
+ str(event.message.annotations["x-stream-offset"])
27+
)
28+
29+
# accepting
30+
self.delivery_context.accept(event)
31+
32+
# in case of rejection (+eventually deadlettering)
33+
# self.delivery_context.discard(event)
34+
35+
# in case of requeuing
36+
# self.delivery_context.requeue(event)
37+
38+
# annotations = {}
39+
# annotations[symbol('x-opt-string')] = 'x-test1'
40+
# in case of requeuing with annotations added
41+
# self.delivery_context.requeue_with_annotations(event, annotations)
42+
43+
# in case of rejection with annotations added
44+
# self.delivery_context.discard_with_annotations(event)
45+
46+
print("count " + str(self._count))
47+
48+
self._count = self._count + 1
49+
50+
if self._count == 100:
51+
print("closing receiver")
52+
# if you want you can add cleanup operations here
53+
# event.receiver.close()
54+
# event.connection.close()
55+
56+
def on_connection_closed(self, event: Event):
57+
# if you want you can add cleanup operations here
58+
print("connection closed")
59+
60+
def on_link_closed(self, event: Event) -> None:
61+
# if you want you can add cleanup operations here
62+
print("link closed")
63+
64+
65+
def create_connection() -> Connection:
66+
connection = Connection("amqp://guest:guest@localhost:5672/")
67+
# in case of SSL enablement
68+
# ca_cert_file = ".ci/certs/ca_certificate.pem"
69+
# client_cert = ".ci/certs/client_certificate.pem"
70+
# client_key = ".ci/certs/client_key.pem"
71+
# connection = Connection(
72+
# "amqps://guest:guest@localhost:5671/",
73+
# ssl_context=SslConfigurationContext(
74+
# ca_cert=ca_cert_file,
75+
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
76+
# ),
77+
# )
78+
connection.dial()
79+
80+
return connection
81+
82+
83+
def main() -> None:
84+
queue_name = "example-queue"
85+
messages_to_publish = 100
86+
87+
print("connection to amqp server")
88+
connection = create_connection()
89+
90+
management = connection.management()
91+
92+
management.declare_queue(StreamSpecification(name=queue_name))
93+
94+
addr_queue = AddressHelper.queue_address(queue_name)
95+
96+
consumer_connection = create_connection()
97+
98+
stream_filter_options = StreamOptions()
99+
# can be first, last, next or an offset long
100+
# you can also specify stream filters with methods: apply_filters and filter_match_unfiltered
101+
stream_filter_options.offset(OffsetSpecification.first)
102+
103+
consumer = consumer_connection.consumer(
104+
addr_queue,
105+
handler=MyMessageHandler(),
106+
stream_filter_options=stream_filter_options,
107+
)
108+
print(
109+
"create a consumer and consume the test message - press control + c to terminate to consume"
110+
)
111+
112+
# print("create a publisher and publish a test message")
113+
publisher = connection.publisher(addr_queue)
114+
115+
for i in range(messages_to_publish):
116+
publisher.publish(Message(body="test: " + str(i)))
117+
118+
publisher.close()
119+
120+
try:
121+
consumer.run()
122+
except KeyboardInterrupt:
123+
pass
124+
125+
#
126+
print("delete queue")
127+
management.delete_queue(queue_name)
128+
129+
print("closing connections")
130+
management.close()
131+
print("after management closing")
132+
connection.close()
133+
print("after connection closing")
134+
135+
136+
if __name__ == "__main__":
137+
main()

0 commit comments

Comments
 (0)