Skip to content

Commit 7a04eaf

Browse files
author
DanielePalaia
committed
refactoring and adding tests
1 parent 8931dd5 commit 7a04eaf

File tree

13 files changed

+447
-134
lines changed

13 files changed

+447
-134
lines changed
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
# type: ignore
2+
3+
from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert,
4+
AddressHelper,
5+
AMQPMessagingHandler,
6+
Connection,
7+
Event,
8+
Message,
9+
OffsetSpecification,
10+
StreamFilterOptions,
11+
StreamSpecification,
12+
)
13+
14+
15+
class MyMessageHandler(AMQPMessagingHandler):
16+
17+
def __init__(self):
18+
super().__init__()
19+
self._count = 0
20+
21+
def on_message(self, event: Event):
22+
print("received message: " + str(event.message.body))
23+
24+
# accepting
25+
self.delivery_context.accept(event)
26+
27+
# in case of rejection (+eventually deadlettering)
28+
# self.delivery_context.discard(event)
29+
30+
# in case of requeuing
31+
# self.delivery_context.requeue(event)
32+
33+
# annotations = {}
34+
# annotations[symbol('x-opt-string')] = 'x-test1'
35+
# in case of requeuing with annotations added
36+
# self.delivery_context.requeue_with_annotations(event, annotations)
37+
38+
# in case of rejection with annotations added
39+
# self.delivery_context.discard_with_annotations(event)
40+
41+
print("count " + str(self._count))
42+
43+
self._count = self._count + 1
44+
45+
if self._count == 100:
46+
print("closing receiver")
47+
# if you want you can add cleanup operations here
48+
# event.receiver.close()
49+
# event.connection.close()
50+
51+
def on_connection_closed(self, event: Event):
52+
# if you want you can add cleanup operations here
53+
print("connection closed")
54+
55+
def on_link_closed(self, event: Event) -> None:
56+
# if you want you can add cleanup operations here
57+
print("link closed")
58+
59+
60+
def create_connection() -> Connection:
61+
connection = Connection("amqp://guest:guest@localhost:5672/")
62+
# in case of SSL enablement
63+
# ca_cert_file = ".ci/certs/ca_certificate.pem"
64+
# client_cert = ".ci/certs/client_certificate.pem"
65+
# client_key = ".ci/certs/client_key.pem"
66+
# connection = Connection(
67+
# "amqps://guest:guest@localhost:5671/",
68+
# ssl_context=SslConfigurationContext(
69+
# ca_cert=ca_cert_file,
70+
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
71+
# ),
72+
# )
73+
connection.dial()
74+
75+
return connection
76+
77+
78+
def main() -> None:
79+
queue_name = "example-queue"
80+
messages_to_publish = 100
81+
82+
print("connection to amqp server")
83+
connection = create_connection()
84+
85+
management = connection.management()
86+
87+
management.declare_queue(StreamSpecification(name=queue_name))
88+
89+
addr_queue = AddressHelper.queue_address(queue_name)
90+
91+
consumer_connection = create_connection()
92+
93+
stream_filter_options = StreamFilterOptions()
94+
# can be first, last, next or an offset long
95+
stream_filter_options.offset(OffsetSpecification.first)
96+
97+
consumer = consumer_connection.consumer(
98+
addr_queue,
99+
handler=MyMessageHandler(),
100+
stream_filter_options=stream_filter_options,
101+
)
102+
print(
103+
"create a consumer and consume the test message - press control + c to terminate to consume"
104+
)
105+
106+
# print("create a publisher and publish a test message")
107+
publisher = connection.publisher(addr_queue)
108+
109+
for i in range(messages_to_publish):
110+
publisher.publish(Message(body="test: " + str(i)))
111+
112+
publisher.close()
113+
114+
try:
115+
consumer.run()
116+
except KeyboardInterrupt:
117+
pass
118+
119+
#
120+
print("delete queue")
121+
# management.delete_queue(queue_name)
122+
123+
print("closing connections")
124+
management.close()
125+
print("after management closing")
126+
connection.close()
127+
print("after connection closing")
128+
129+
130+
if __name__ == "__main__":
131+
main()

examples/getting_started/main.py

Lines changed: 43 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# type: ignore
2-
import threading
2+
33

44
from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert,
55
AddressHelper,
@@ -10,8 +10,6 @@
1010
ExchangeSpecification,
1111
Message,
1212
QuorumQueueSpecification,
13-
StreamSpecification,
14-
StreamFilterOptions
1513
)
1614

1715

@@ -78,90 +76,81 @@ def create_connection() -> Connection:
7876
return connection
7977

8078

81-
def threaded_function(addr_queue):
82-
connection = create_connection()
83-
offset_specification = StreamFilterOptions()
84-
offset_specification.offset(10)
85-
consumer = connection.consumer(addr_queue, handler=MyMessageHandler(), stream_filter_options=offset_specification)
86-
try:
87-
consumer.run()
88-
except KeyboardInterrupt:
89-
pass
90-
91-
9279
def main() -> None:
80+
9381
exchange_name = "test-exchange"
9482
queue_name = "example-queue"
9583
routing_key = "routing-key"
96-
messages_to_publish = 1000
84+
messages_to_publish = 100000
9785

9886
print("connection to amqp server")
9987
connection = create_connection()
10088

10189
management = connection.management()
10290

10391
print("declaring exchange and queue")
104-
# management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
92+
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
10593

106-
#management.declare_queue(
107-
# StreamSpecification(name=queue_name)
94+
management.declare_queue(
95+
QuorumQueueSpecification(name=queue_name)
10896
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
109-
#)
97+
)
11098

11199
print("binding queue to exchange")
112-
# bind_name = management.bind(
113-
# BindingSpecification(
114-
# source_exchange=exchange_name,
115-
# destination_queue=queue_name,
116-
# binding_key=routing_key,
117-
# )
118-
# )
100+
bind_name = management.bind(
101+
BindingSpecification(
102+
source_exchange=exchange_name,
103+
destination_queue=queue_name,
104+
binding_key=routing_key,
105+
)
106+
)
119107

120-
# addr = AddressHelper.exchange_address(exchange_name, routing_key)
108+
addr = AddressHelper.exchange_address(exchange_name, routing_key)
121109

122110
addr_queue = AddressHelper.queue_address(queue_name)
123111

124-
thread = threading.Thread(target=threaded_function, args=(addr_queue,))
125-
thread.start()
126-
## press control + c to terminate the consumer
127-
128-
# print("create a publisher and publish a test message")
129-
#publisher = connection.publisher(addr_queue)
112+
print("create a publisher and publish a test message")
113+
publisher = connection.publisher(addr)
130114

131-
# print("purging the queue")
132-
# messages_purged = management.purge_queue(queue_name)
115+
print("purging the queue")
116+
messages_purged = management.purge_queue(queue_name)
133117

134-
# print("messages purged: " + str(messages_purged))
118+
print("messages purged: " + str(messages_purged))
135119
# management.close()
136120

137121
# publish 10 messages
138-
#for i in range(messages_to_publish):
139-
# status = publisher.publish(Message(body="test"))
140-
# # if status.ACCEPTED:
141-
# # print("message accepted")
142-
# # elif status.RELEASED:
143-
# # print("message not routed")
144-
# # elif status.REJECTED:
145-
# # print("message not rejected")
146-
#
147-
#publisher.close()
148-
#
149-
# print(
150-
# "create a consumer and consume the test message - press control + c to terminate to consume"
151-
# )
122+
for i in range(messages_to_publish):
123+
status = publisher.publish(Message(body="test"))
124+
if status.ACCEPTED:
125+
print("message accepted")
126+
elif status.RELEASED:
127+
print("message not routed")
128+
elif status.REJECTED:
129+
print("message not rejected")
130+
131+
publisher.close()
132+
133+
print(
134+
"create a consumer and consume the test message - press control + c to terminate to consume"
135+
)
136+
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())
137+
138+
try:
139+
consumer.run()
140+
except KeyboardInterrupt:
141+
pass
152142

153-
input("Press Enter to continue...")
154143
print("cleanup")
155-
# consumer.close()
144+
consumer.close()
156145
# once we finish consuming if we close the connection we need to create a new one
157146
# connection = create_connection()
158147
# management = connection.management()
159148

160149
print("unbind")
161-
# management.unbind(bind_name)
150+
management.unbind(bind_name)
162151

163152
print("delete queue")
164-
# management.delete_queue(queue_name)
153+
management.delete_queue(queue_name)
165154

166155
print("delete exchange")
167156
management.delete_exchange(exchange_name)

rabbitmq_amqp_python_client/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
from .entities import (
99
BindingSpecification,
1010
ExchangeSpecification,
11+
OffsetSpecification,
12+
StreamFilterOptions,
1113
)
1214
from .exceptions import ArgumentOutOfRangeException
1315
from .management import Management
@@ -28,8 +30,6 @@
2830
SslConfigurationContext,
2931
)
3032

31-
from .entities import StreamFilterOptions
32-
3333
try:
3434
__version__ = metadata.version(__package__)
3535
__license__ = metadata.metadata(__package__)["license"]
@@ -64,4 +64,5 @@
6464
"Delivery",
6565
"ConnectionClosed",
6666
"StreamFilterOptions",
67+
"OffsetSpecification",
6768
]

rabbitmq_amqp_python_client/connection.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from .address_helper import validate_address
55
from .consumer import Consumer
6+
from .entities import StreamFilterOptions
67
from .exceptions import ArgumentOutOfRangeException
78
from .management import Management
89
from .publisher import Publisher
@@ -11,8 +12,6 @@
1112
from .qpid.proton.utils import BlockingConnection
1213
from .ssl_configuration import SslConfigurationContext
1314

14-
from.entities import StreamFilterOptions
15-
1615
logger = logging.getLogger(__name__)
1716

1817
MT = TypeVar("MT")
@@ -86,7 +85,10 @@ def publisher(self, destination: str) -> Publisher:
8685
return publisher
8786

8887
def consumer(
89-
self, destination: str, handler: Optional[MessagingHandler] = None, stream_filter_options: Optional[StreamFilterOptions] = None
88+
self,
89+
destination: str,
90+
handler: Optional[MessagingHandler] = None,
91+
stream_filter_options: Optional[StreamFilterOptions] = None,
9092
) -> Consumer:
9193
if validate_address(destination) is False:
9294
raise ArgumentOutOfRangeException(

rabbitmq_amqp_python_client/consumer.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
import logging
22
from typing import Optional
33

4-
from .options import ReceiverOptionUnsettled, ReceiverOptionUnsettledWithFilters
4+
from .entities import StreamFilterOptions
5+
from .options import (
6+
ReceiverOptionUnsettled,
7+
ReceiverOptionUnsettledWithFilters,
8+
)
59
from .qpid.proton._handlers import MessagingHandler
610
from .qpid.proton._message import Message
711
from .qpid.proton.utils import (
812
BlockingConnection,
913
BlockingReceiver,
1014
)
1115

12-
from.entities import StreamFilterOptions
13-
1416
logger = logging.getLogger(__name__)
1517

1618

@@ -20,7 +22,7 @@ def __init__(
2022
conn: BlockingConnection,
2123
addr: str,
2224
handler: Optional[MessagingHandler] = None,
23-
stream_options: Optional[StreamFilterOptions] = None
25+
stream_options: Optional[StreamFilterOptions] = None,
2426
):
2527
self._receiver: Optional[BlockingReceiver] = None
2628
self._conn = conn
@@ -58,15 +60,15 @@ def _create_receiver(self, addr: str) -> BlockingReceiver:
5860
logger.debug("Creating the receiver")
5961
if self._stream_options is None:
6062
receiver = self._conn.create_receiver(
61-
addr, options=ReceiverOptionUnsettled(addr), handler=self._handler
63+
addr, options=ReceiverOptionUnsettled(addr), handler=self._handler
6264
)
6365
receiver.credit = 1
6466
else:
65-
print("stream option is not None")
6667
receiver = self._conn.create_receiver(
67-
addr, options=ReceiverOptionUnsettledWithFilters(addr, self._stream_options), handler=self._handler
68+
addr,
69+
options=ReceiverOptionUnsettledWithFilters(addr, self._stream_options),
70+
handler=self._handler,
6871
)
6972
receiver.credit = 1
7073

7174
return receiver
72-

0 commit comments

Comments
 (0)