Skip to content

Commit baa2372

Browse files
author
DanielePalaia
committed
improve examples / updating docs / adding credits to consumer
1 parent d08139f commit baa2372

File tree

8 files changed

+253
-38
lines changed

8 files changed

+253
-38
lines changed

README.md

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,20 @@
22

33
This library is in early stages of development. It is meant to be used with RabbitMQ 4.0.
44

5+
# Table of Contents
6+
7+
- [How to Build the project and run the tests](#How-to-Build-the-project-and-run-the-tests)
8+
- [Installation](#Installation)
9+
- [Getting started](#Getting-Started)
10+
* [Creating a connection](#Creating-a-connection)
11+
* [Managing resources](#Managing-resources)
12+
* [Publishing messages](#Publishing-messages)
13+
* [Consuming messages](#Consuming-messages)
14+
* [Support for streams](#support-for-streams)
15+
* [SSL connection](#ssl-connections)
16+
* [Managing disconnections](#Managing-disconnections)
17+
18+
519
## How to Build the project and run the tests
620

721
- Start a RabbitMQ 4.x broker
@@ -18,7 +32,7 @@ The client is distributed via [`PIP`](https://pypi.org/project/rabbitmq-amqp-pyt
1832

1933
## Getting Started
2034

21-
An example is provided in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with:
35+
An example is provided [`here`](./examples/getting_started/basic_example.py) you can run it after starting a RabbitMQ 4.0 broker with:
2236

2337
poetry run python ./examples/getting_started/main.py
2438

@@ -109,6 +123,33 @@ Then from connection get a consumer object:
109123

110124
The consumer will run indefinitively waiting for messages to arrive.
111125

126+
### Support for streams
127+
128+
The client natively supports streams: https://www.rabbitmq.com/blog/2021/07/13/rabbitmq-streams-overview
129+
130+
You can consume from a given offset or specify a default starting point (FIRST, NEXT, LAST).
131+
132+
Streams filtering is also supported: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering
133+
134+
You can check the [`stream example`](./examples/getting_started/example_with_streams.py) to see how to work with RabbitMQ streams.
135+
136+
### SSL connections
137+
138+
The client supports TLS/SSL connections.
139+
140+
You can check the [`ssl example`](./examples/getting_started/tls_example.py) to see how to establish a secured connection
141+
142+
143+
### Managing disconnections
144+
145+
At this stage the client doesn't support auto-reconnect but a callback is invoked everytime a remote disconnection is detected.
146+
You can use this callback to implement your own logic and eventually attempt a reconnection.
147+
148+
You can check the [`reconnection example`](./examples/getting_started/reconnection_example.py) to see how to manage disconnections and
149+
eventually attempt a reconnection
150+
151+
152+
112153

113154

114155

examples/getting_started/basic_example.py

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66
AMQPMessagingHandler,
77
BindingSpecification,
88
Connection,
9+
Disposition,
910
Event,
1011
ExchangeSpecification,
1112
Message,
1213
QuorumQueueSpecification,
1314
)
1415

15-
messages_to_publish = 100
16+
MESSAGES_TO_PUBLISH = 100
1617

1718

1819
class MyMessageHandler(AMQPMessagingHandler):
@@ -45,7 +46,7 @@ def on_message(self, event: Event):
4546

4647
self._count = self._count + 1
4748

48-
if self._count == messages_to_publish:
49+
if self._count == MESSAGES_TO_PUBLISH:
4950
print("closing receiver")
5051
# if you want you can add cleanup operations here
5152
# event.receiver.close()
@@ -62,17 +63,6 @@ def on_link_closed(self, event: Event) -> None:
6263

6364
def create_connection() -> Connection:
6465
connection = Connection("amqp://guest:guest@localhost:5672/")
65-
# in case of SSL enablement
66-
# ca_cert_file = ".ci/certs/ca_certificate.pem"
67-
# client_cert = ".ci/certs/client_certificate.pem"
68-
# client_key = ".ci/certs/client_key.pem"
69-
# connection = Connection(
70-
# "amqps://guest:guest@localhost:5671/",
71-
# ssl_context=SslConfigurationContext(
72-
# ca_cert=ca_cert_file,
73-
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
74-
# ),
75-
# )
7666
connection.dial()
7767

7868
return connection
@@ -120,13 +110,14 @@ def main() -> None:
120110
# management.close()
121111

122112
# publish 10 messages
123-
for i in range(messages_to_publish):
113+
for i in range(MESSAGES_TO_PUBLISH):
114+
print("publishing")
124115
status = publisher.publish(Message(body="test"))
125-
if status.ACCEPTED:
116+
if status.remote_state == Disposition.ACCEPTED:
126117
print("message accepted")
127-
elif status.RELEASED:
118+
elif status.remote_state == Disposition.RELEASED:
128119
print("message not routed")
129-
elif status.REJECTED:
120+
elif status.remote_state == Disposition.REJECTED:
130121
print("message not rejected")
131122

132123
publisher.close()

examples/getting_started/example_with_streams.py

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
StreamSpecification,
1212
)
1313

14+
MESSAGES_TO_PUBLISH = 100
15+
1416

1517
class MyMessageHandler(AMQPMessagingHandler):
1618

@@ -19,6 +21,7 @@ def __init__(self):
1921
self._count = 0
2022

2123
def on_message(self, event: Event):
24+
# just messages with banana filters get received
2225
print(
2326
"received message from stream: "
2427
+ str(event.message.body)
@@ -47,7 +50,7 @@ def on_message(self, event: Event):
4750

4851
self._count = self._count + 1
4952

50-
if self._count == 100:
53+
if self._count == MESSAGES_TO_PUBLISH:
5154
print("closing receiver")
5255
# if you want you can add cleanup operations here
5356
# event.receiver.close()
@@ -64,25 +67,13 @@ def on_link_closed(self, event: Event) -> None:
6467

6568
def create_connection() -> Connection:
6669
connection = Connection("amqp://guest:guest@localhost:5672/")
67-
# in case of SSL enablement
68-
# ca_cert_file = ".ci/certs/ca_certificate.pem"
69-
# client_cert = ".ci/certs/client_certificate.pem"
70-
# client_key = ".ci/certs/client_key.pem"
71-
# connection = Connection(
72-
# "amqps://guest:guest@localhost:5671/",
73-
# ssl_context=SslConfigurationContext(
74-
# ca_cert=ca_cert_file,
75-
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
76-
# ),
77-
# )
7870
connection.dial()
7971

8072
return connection
8173

8274

8375
def main() -> None:
8476
queue_name = "example-queue"
85-
messages_to_publish = 100
8677

8778
print("connection to amqp server")
8879
connection = create_connection()
@@ -99,6 +90,7 @@ def main() -> None:
9990
# can be first, last, next or an offset long
10091
# you can also specify stream filters with methods: apply_filters and filter_match_unfiltered
10192
stream_filter_options.offset(OffsetSpecification.first)
93+
stream_filter_options.apply_filters(["banana"])
10294

10395
consumer = consumer_connection.consumer(
10496
addr_queue,
@@ -112,8 +104,22 @@ def main() -> None:
112104
# print("create a publisher and publish a test message")
113105
publisher = connection.publisher(addr_queue)
114106

115-
for i in range(messages_to_publish):
116-
publisher.publish(Message(body="test: " + str(i)))
107+
# publish with a filter of apple
108+
for i in range(MESSAGES_TO_PUBLISH):
109+
publisher.publish(
110+
Message(
111+
body="apple: " + str(i), annotations={"x-stream-filter-value": "apple"}
112+
)
113+
)
114+
115+
# publish with a filter of banana
116+
for i in range(MESSAGES_TO_PUBLISH):
117+
publisher.publish(
118+
Message(
119+
body="banana: " + str(i),
120+
annotations={"x-stream-filter-value": "banana"},
121+
)
122+
)
117123

118124
publisher.close()
119125

examples/getting_started/reconnection_example.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class ConnectionConfiguration:
3131

3232

3333
connection_configuration = ConnectionConfiguration()
34-
messages_to_publish = 50000
34+
MESSAGES_TO_PUBLSH = 50000
3535

3636

3737
# disconnection callback
@@ -95,7 +95,7 @@ def on_message(self, event: Event):
9595

9696
self._count = self._count + 1
9797

98-
if self._count == messages_to_publish:
98+
if self._count == MESSAGES_TO_PUBLSH:
9999
print("closing receiver")
100100
# if you want you can add cleanup operations here
101101
# event.receiver.close()
@@ -181,7 +181,7 @@ def main() -> None:
181181

182182
# publishing messages
183183
while True:
184-
for i in range(messages_to_publish):
184+
for i in range(MESSAGES_TO_PUBLSH):
185185

186186
if i % 1000 == 0:
187187
print("published 1000 messages...")

0 commit comments

Comments
 (0)