Skip to content

Commit 683a29f

Browse files
DanielePalaiaDanielePalaiaGsantomaggio
authored
Consumer implementation (#15)
* connection layer implementation * WIP * implementing Declare Exchange/Queue * test for gabriele * body encodiding Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> * adding basic tests * implementing bind * adding user-defined exception * Adding debugging info * publisher implementation * adding publisher basic test * improve help_address utility functions * modify example * integrate qpid-proton lib * implementing purge operation * improving url helper * cleaning up qpid unecessary folders and files * some improvements * implementing queue_info * fixing queue arguments management * better management of arguments * improved arguments management during declare_queue * adding purge test * adding fixtures in tests * adding a publisher test * removing useless queue_type parameter * removing receiver from publisher * consumer implementation * Implementing Connection, Management and Publisher modules (#10) * connection layer implementation * WIP * implementing Declare Exchange/Queue * test for gabriele * body encodiding Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> * adding basic tests * implementing bind * adding user-defined exception * Adding debugging info * publisher implementation * adding publisher basic test * improve help_address utility functions * modify example * integrate qpid-proton lib * implementing purge operation * improving url helper * cleaning up qpid unecessary folders and files * some improvements * implementing queue_info * fixing queue arguments management * better management of arguments * improved arguments management during declare_queue * adding purge test * adding fixtures in tests * adding a publisher test * removing useless queue_type parameter * removing receiver from publisher --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Co-authored-by: DanielePalaia <daniele985@@gmail.com> Co-authored-by: Gabriele Santomaggio <G.santomaggio@gmail.com> * consumer implementation * modification for testing purpose * change option Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> * adding tests and improvementes * improving ack implementation * adding requeue with annotations test * adding dead lettering test * adding discard with annotations test * refactoring tests * improving AddressHelper utility functions * Change the class name for the consumer add documentation Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> * adding validations for annotations --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Co-authored-by: DanielePalaia <daniele985@@gmail.com> Co-authored-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 848ced6 commit 683a29f

19 files changed

+977
-97
lines changed

examples/getting_started/main.py

Lines changed: 95 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,89 @@
1+
# type: ignore
2+
3+
14
from rabbitmq_amqp_python_client import (
5+
AddressHelper,
6+
AMQPMessagingHandler,
27
BindingSpecification,
38
Connection,
9+
Event,
410
ExchangeSpecification,
511
Message,
612
QuorumQueueSpecification,
7-
exchange_address,
813
)
914

1015

16+
class MyMessageHandler(AMQPMessagingHandler):
17+
18+
def __init__(self):
19+
super().__init__()
20+
self._count = 0
21+
22+
def on_message(self, event: Event):
23+
print("received message: " + str(event.message.annotations))
24+
25+
# accepting
26+
self.delivery_context.accept(event)
27+
28+
# in case of rejection (+eventually deadlettering)
29+
# self.delivery_context.discard(event)
30+
31+
# in case of requeuing
32+
# self.delivery_context.requeue(event)
33+
34+
# annotations = {}
35+
# annotations[symbol('x-opt-string')] = 'x-test1'
36+
# in case of requeuing with annotations added
37+
# self.delivery_context.requeue_with_annotations(event, annotations)
38+
39+
# in case of rejection with annotations added
40+
# self.delivery_context.discard_with_annotations(event)
41+
42+
print("count " + str(self._count))
43+
44+
self._count = self._count + 1
45+
46+
if self._count == 100:
47+
print("closing receiver")
48+
# if you want you can add cleanup operations here
49+
# event.receiver.close()
50+
# event.connection.close()
51+
52+
def on_connection_closed(self, event: Event):
53+
# if you want you can add cleanup operations here
54+
print("connection closed")
55+
56+
def on_link_closed(self, event: Event) -> None:
57+
# if you want you can add cleanup operations here
58+
print("link closed")
59+
60+
61+
def create_connection() -> Connection:
62+
connection = Connection("amqp://guest:guest@localhost:5672/")
63+
connection.dial()
64+
65+
return connection
66+
67+
1168
def main() -> None:
69+
1270
exchange_name = "test-exchange"
1371
queue_name = "example-queue"
1472
routing_key = "routing-key"
15-
connection = Connection("amqp://guest:guest@localhost:5672/")
73+
messages_to_publish = 100
1674

1775
print("connection to amqp server")
18-
connection.dial()
76+
connection = create_connection()
1977

2078
management = connection.management()
2179

2280
print("declaring exchange and queue")
2381
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
2482

25-
management.declare_queue(QuorumQueueSpecification(name=queue_name))
83+
management.declare_queue(
84+
QuorumQueueSpecification(name=queue_name)
85+
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
86+
)
2687

2788
print("binding queue to exchange")
2889
bind_name = management.bind(
@@ -33,21 +94,44 @@ def main() -> None:
3394
)
3495
)
3596

36-
addr = exchange_address(exchange_name, routing_key)
97+
addr = AddressHelper.exchange_address(exchange_name, routing_key)
98+
99+
addr_queue = AddressHelper.queue_address(queue_name)
37100

38101
print("create a publisher and publish a test message")
39102
publisher = connection.publisher(addr)
40103

41-
publisher.publish(Message(body="test"))
104+
print("purging the queue")
105+
messages_purged = management.purge_queue(queue_name)
106+
107+
print("messages purged: " + str(messages_purged))
108+
# management.close()
109+
110+
# publish 10 messages
111+
for i in range(messages_to_publish):
112+
publisher.publish(Message(body="test"))
42113

43114
publisher.close()
44115

116+
print(
117+
"create a consumer and consume the test message - press control + c to terminate to consume"
118+
)
119+
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())
120+
121+
try:
122+
consumer.run()
123+
except KeyboardInterrupt:
124+
pass
125+
126+
print("cleanup")
127+
consumer.close()
128+
# once we finish consuming if we close the connection we need to create a new one
129+
# connection = create_connection()
130+
# management = connection.management()
131+
45132
print("unbind")
46133
management.unbind(bind_name)
47134

48-
print("purging the queue")
49-
management.purge_queue(queue_name)
50-
51135
print("delete queue")
52136
management.delete_queue(queue_name)
53137

@@ -56,7 +140,9 @@ def main() -> None:
56140

57141
print("closing connections")
58142
management.close()
143+
print("after management closing")
59144
connection.close()
145+
print("after connection closing")
60146

61147

62148
if __name__ == "__main__":

rabbitmq_amqp_python_client/__init__.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
from importlib import metadata
22

3-
from .address_helper import exchange_address, queue_address
4-
from .common import QueueType
3+
from .address_helper import AddressHelper
4+
from .amqp_consumer_handler import AMQPMessagingHandler
5+
from .common import ExchangeType, QueueType
56
from .connection import Connection
7+
from .consumer import Consumer
68
from .entities import (
79
BindingSpecification,
810
ExchangeSpecification,
911
)
12+
from .exceptions import ArgumentOutOfRangeException
1013
from .management import Management
1114
from .publisher import Publisher
15+
from .qpid.proton._data import symbol # noqa: E402
16+
from .qpid.proton._delivery import Delivery
17+
from .qpid.proton._events import Event
1218
from .qpid.proton._message import Message
19+
from .qpid.proton.handlers import MessagingHandler
1320
from .queues import (
1421
ClassicQueueSpecification,
1522
QuorumQueueSpecification,
@@ -35,7 +42,14 @@
3542
"BindingSpecification",
3643
"QueueType",
3744
"Publisher",
38-
"exchange_address",
39-
"queue_address",
4045
"Message",
46+
"Consumer",
47+
"MessagingHandler",
48+
"Event",
49+
"Delivery",
50+
"symbol",
51+
"ExchangeType",
52+
"AddressHelper",
53+
"AMQPMessagingHandler",
54+
"ArgumentOutOfRangeException",
4155
]
Lines changed: 44 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from .entities import BindingSpecification
22

33

4-
def is_unreserved(char: str) -> bool:
4+
def _is_unreserved(char: str) -> bool:
55
# According to RFC 3986, unreserved characters are A-Z, a-z, 0-9, '-', '.', '_', and '~'
66
return char.isalnum() or char in "-._~"
77

@@ -12,7 +12,7 @@ def encode_path_segment(input_string: str) -> str:
1212
# Iterate over each character in the input string
1313
for char in input_string:
1414
# Check if the character is an unreserved character
15-
if is_unreserved(char):
15+
if _is_unreserved(char):
1616
encoded.append(char) # Append as is
1717
else:
1818
# Encode character to %HH format
@@ -21,49 +21,54 @@ def encode_path_segment(input_string: str) -> str:
2121
return "".join(encoded)
2222

2323

24-
def exchange_address(exchange_name: str, routing_key: str = "") -> str:
25-
if routing_key == "":
26-
path = "/exchanges/" + encode_path_segment(exchange_name)
27-
else:
28-
path = (
29-
"/exchanges/"
30-
+ encode_path_segment(exchange_name)
31-
+ "/"
32-
+ encode_path_segment(routing_key)
33-
)
34-
35-
return path
36-
24+
class AddressHelper:
3725

38-
def queue_address(queue_name: str) -> str:
39-
path = "/queues/" + encode_path_segment(queue_name)
40-
41-
return path
26+
@staticmethod
27+
def exchange_address(exchange_name: str, routing_key: str = "") -> str:
28+
if routing_key == "":
29+
path = "/exchanges/" + encode_path_segment(exchange_name)
30+
else:
31+
path = (
32+
"/exchanges/"
33+
+ encode_path_segment(exchange_name)
34+
+ "/"
35+
+ encode_path_segment(routing_key)
36+
)
4237

38+
return path
4339

44-
def purge_queue_address(queue_name: str) -> str:
45-
path = "/queues/" + encode_path_segment(queue_name) + "/messages"
40+
@staticmethod
41+
def queue_address(queue_name: str) -> str:
42+
path = "/queues/" + encode_path_segment(queue_name)
4643

47-
return path
44+
return path
4845

46+
@staticmethod
47+
def purge_queue_address(queue_name: str) -> str:
48+
path = "/queues/" + encode_path_segment(queue_name) + "/messages"
4949

50-
def path_address() -> str:
51-
path = "/bindings"
50+
return path
5251

53-
return path
52+
@staticmethod
53+
def path_address() -> str:
54+
path = "/bindings"
5455

56+
return path
5557

56-
def binding_path_with_exchange_queue(bind_specification: BindingSpecification) -> str:
57-
binding_path_wth_exchange_queue_key = (
58-
"/bindings"
59-
+ "/"
60-
+ "src="
61-
+ encode_path_segment(bind_specification.source_exchange)
62-
+ ";"
63-
+ "dstq="
64-
+ encode_path_segment(bind_specification.destination_queue)
65-
+ ";key="
66-
+ encode_path_segment(bind_specification.binding_key)
67-
+ ";args="
68-
)
69-
return binding_path_wth_exchange_queue_key
58+
@staticmethod
59+
def binding_path_with_exchange_queue(
60+
bind_specification: BindingSpecification,
61+
) -> str:
62+
binding_path_wth_exchange_queue_key = (
63+
"/bindings"
64+
+ "/"
65+
+ "src="
66+
+ encode_path_segment(bind_specification.source_exchange)
67+
+ ";"
68+
+ "dstq="
69+
+ encode_path_segment(bind_specification.destination_queue)
70+
+ ";key="
71+
+ encode_path_segment(bind_specification.binding_key)
72+
+ ";args="
73+
)
74+
return binding_path_wth_exchange_queue_key
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from .delivery_context import DeliveryContext
2+
from .qpid.proton.handlers import MessagingHandler
3+
4+
"""
5+
AMQPMessagingHandler extends the QPID MessagingHandler.
6+
It is an helper to set the default values needed for manually accepting and settling messages.
7+
self.delivery_context is an instance of DeliveryContext, which is used to accept, reject,
8+
requeue or requeue with annotations a message.
9+
It is not mandatory to use this class, but it is a good practice to use it.
10+
"""
11+
12+
13+
class AMQPMessagingHandler(MessagingHandler): # type: ignore
14+
15+
def __init__(self, auto_accept: bool = False, auto_settle: bool = True):
16+
"""
17+
:param auto_accept: if True, the message is automatically accepted
18+
by default is false, so the user has to manually accept the message and decide with the
19+
different methods of the delivery_context what to do with the message
20+
"""
21+
super().__init__(auto_accept=auto_accept, auto_settle=auto_settle)
22+
self.delivery_context: DeliveryContext = DeliveryContext()

rabbitmq_amqp_python_client/connection.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import logging
2+
from typing import Optional
23

4+
from .consumer import Consumer
35
from .management import Management
46
from .publisher import Publisher
7+
from .qpid.proton._handlers import MessagingHandler
58
from .qpid.proton.utils import BlockingConnection
69

710
logger = logging.getLogger(__name__)
@@ -34,3 +37,9 @@ def close(self) -> None:
3437
def publisher(self, destination: str) -> Publisher:
3538
publisher = Publisher(self._conn, destination)
3639
return publisher
40+
41+
def consumer(
42+
self, destination: str, handler: Optional[MessagingHandler] = None
43+
) -> Consumer:
44+
consumer = Consumer(self._conn, destination, handler)
45+
return consumer

0 commit comments

Comments
 (0)