Skip to content

Commit 03faa76

Browse files
author
DanielePalaia
committed
adding tests and improvementes
1 parent da56b4c commit 03faa76

File tree

10 files changed

+330
-113
lines changed

10 files changed

+330
-113
lines changed

examples/getting_started/main.py

Lines changed: 47 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,78 @@
11
# type: ignore
2-
import time
3-
42
from rabbitmq_amqp_python_client import (
53
BindingSpecification,
64
Connection,
5+
Delivery,
76
Event,
87
ExchangeSpecification,
98
Message,
109
MessagingHandler,
1110
QuorumQueueSpecification,
1211
exchange_address,
1312
queue_address,
14-
Delivery,
1513
)
1614

1715

18-
1916
class MyMessageHandler(MessagingHandler):
2017

2118
def __init__(self):
2219
super().__init__(auto_accept=False, auto_settle=False)
20+
self._count = 0
2321

2422
def on_message(self, event: Event):
2523
print("received message: " + event.message.body)
2624

2725
dlv = event.delivery
28-
#dlv.update(Delivery.REJECTED)
26+
2927
dlv.update(Delivery.ACCEPTED)
30-
# dlv.settle()
28+
dlv.settle()
3129

30+
print("count " + str(self._count))
3231

33-
#self.reject(event.delivery)
34-
#self.settle(event.delivery, Delivery.REJECTED)
32+
self._count = self._count + 1
3533

34+
if self._count == 100000:
35+
print("closing receiver")
36+
event.receiver.close()
37+
event.connection.close()
3638

3739
def on_connection_closed(self, event: Event):
3840
print("connection closed")
3941

40-
def on_connection_cloing(self, event: Event):
41-
print("connection closed")
42-
4342
def on_link_closed(self, event: Event) -> None:
4443
print("link closed")
4544

4645
def on_rejected(self, event: Event) -> None:
4746
print("rejected")
4847

4948

50-
def main() -> None:
49+
def create_connection() -> Connection:
50+
connection = Connection("amqp://guest:guest@localhost:5672/")
51+
connection.dial()
52+
53+
return connection
54+
5155

56+
def main() -> None:
5257

5358
exchange_name = "test-exchange"
5459
queue_name = "example-queue"
5560
routing_key = "routing-key"
56-
connection = Connection("amqp://guest:guest@localhost:5672/")
61+
messages_to_publish = 100000
5762

5863
print("connection to amqp server")
59-
connection.dial()
64+
connection = create_connection()
6065

6166
management = connection.management()
6267

6368
print("declaring exchange and queue")
6469
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
6570

66-
management.declare_queue(QuorumQueueSpecification(name=queue_name))
71+
management.declare_queue(
72+
QuorumQueueSpecification(
73+
name=queue_name, dead_letter_exchange="dead-letter-test"
74+
)
75+
)
6776

6877
print("binding queue to exchange")
6978
bind_name = management.bind(
@@ -81,34 +90,48 @@ def main() -> None:
8190
print("create a publisher and publish a test message")
8291
publisher = connection.publisher(addr)
8392

84-
8593
print("purging the queue")
86-
#messages_purged = management.purge_queue(queue_name)
94+
messages_purged = management.purge_queue(queue_name)
8795

88-
#print("messages purged: " + str(messages_purged))
96+
print("messages purged: " + str(messages_purged))
97+
management.close()
8998

90-
# for i in range(1):
91-
publisher.publish(Message(body="test"))
99+
# publish 10 messages
100+
for i in range(messages_to_publish):
101+
publisher.publish(Message(body="test"))
92102

93103
publisher.close()
94104

95-
print("create a consumer and consume the test message")
96-
105+
print(
106+
"create a consumer and consume the test message - press control + c to terminate to consume"
107+
)
97108
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())
98109

110+
try:
111+
consumer.run()
112+
except KeyboardInterrupt:
113+
pass
114+
115+
print("cleanup")
116+
# once we finish consuming we close the connection so we need to create a new one
117+
connection = create_connection()
118+
119+
management = connection.management()
99120
print("unbind")
100121
management.unbind(bind_name)
101122

102-
#consumer.close()
103123
print("delete queue")
104-
# management.delete_queue(queue_name)
124+
management.delete_queue(queue_name)
105125

106126
print("delete exchange")
107127
management.delete_exchange(exchange_name)
108128

109129
print("closing connections")
110130
management.close()
131+
consumer.close()
132+
print("after management closing")
111133
connection.close()
134+
print("after connection closing")
112135

113136

114137
if __name__ == "__main__":

rabbitmq_amqp_python_client/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
)
1111
from .management import Management
1212
from .publisher import Publisher
13+
from .qpid.proton._delivery import Delivery
1314
from .qpid.proton._events import Event
1415
from .qpid.proton._message import Message
1516
from .qpid.proton.handlers import MessagingHandler
@@ -19,8 +20,6 @@
1920
StreamSpecification,
2021
)
2122

22-
from .qpid.proton._delivery import Delivery
23-
2423
try:
2524
__version__ = metadata.version(__package__)
2625
__license__ = metadata.metadata(__package__)["license"]

rabbitmq_amqp_python_client/consumer.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
from typing import Optional
33

4-
from .options import ReceiverOption
4+
from .options import ReceiverOptionUnsettled
55
from .qpid.proton._handlers import MessagingHandler
66
from .qpid.proton._message import Message
77
from .qpid.proton.utils import (
@@ -35,13 +35,20 @@ def consume(self) -> Message:
3535
return self._receiver.receive()
3636

3737
def close(self) -> None:
38-
logger.debug("Closing Sender and Receiver")
38+
logger.debug("Closing the receiver")
3939
if self._receiver is not None:
4040
self._receiver.close()
41+
42+
def run(self) -> None:
4143
if self._receiver is not None:
42-
self._receiver.close()
44+
self._receiver.container.run()
45+
46+
def stop(self) -> None:
47+
if self._receiver is not None:
48+
self._receiver.container.stop_events()
49+
self._receiver.container.stop()
4350

4451
def _create_receiver(self, addr: str) -> BlockingReceiver:
4552
return self._conn.create_receiver(
46-
addr, options=ReceiverOption(addr), handler=self._handler
53+
addr, options=ReceiverOptionUnsettled(addr), handler=self._handler
4754
)

rabbitmq_amqp_python_client/consumer.pytmp

Lines changed: 0 additions & 47 deletions
This file was deleted.

rabbitmq_amqp_python_client/options.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,18 @@ class ReceiverOption(LinkOption): # type: ignore
2525
def __init__(self, addr: str):
2626
self._addr = addr
2727

28+
def apply(self, link: Link) -> None:
29+
link.target.address = self._addr
30+
link.snd_settle_mode = Link.SND_SETTLED
31+
link.rcv_settle_mode = Link.RCV_FIRST
32+
link.properties = PropertyDict({symbol("paired"): True})
33+
link.source.dynamic = False
34+
35+
36+
class ReceiverOptionUnsettled(LinkOption): # type: ignore
37+
def __init__(self, addr: str):
38+
self._addr = addr
39+
2840
def apply(self, link: Link) -> None:
2941
link.target.address = self._addr
3042
link.snd_settle_mode = Link.SND_UNSETTLED

rabbitmq_amqp_python_client/qpid/proton/_handlers.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,6 @@ class IncomingMessageHandler(Handler, Acking):
247247
def __init__(
248248
self, auto_accept: bool = True, delegate: Optional[Handler] = None
249249
) -> None:
250-
print("auto accept is" + str(auto_accept))
251250
self.delegate = delegate
252251
self.auto_accept = auto_accept
253252

@@ -266,10 +265,8 @@ def on_delivery(self, event: Event) -> None:
266265
dlv.settle()
267266
else:
268267
try:
269-
print("sending on message")
270268
self.on_message(event)
271269
if self.auto_accept:
272-
print("accepting")
273270
dlv.update(Delivery.ACCEPTED)
274271
dlv.settle()
275272
except Reject:

tests/conftest.py

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import pytest
22

3-
from rabbitmq_amqp_python_client import Connection
3+
from rabbitmq_amqp_python_client import (
4+
Connection,
5+
Delivery,
6+
Event,
7+
MessagingHandler,
8+
)
49

510

611
@pytest.fixture()
@@ -25,3 +30,87 @@ def management(pytestconfig):
2530
finally:
2631
management.close()
2732
connection.close()
33+
34+
35+
class ConsumerTestException(BaseException):
36+
# Constructor or Initializer
37+
def __init__(self, msg: str):
38+
self.msg = msg
39+
40+
def __str__(self) -> str:
41+
return repr(self.msg)
42+
43+
44+
class MyMessageHandlerAccept(MessagingHandler):
45+
46+
def __init__(self):
47+
super().__init__(auto_accept=True, auto_settle=True)
48+
self._received = 0
49+
50+
def on_message(self, event: Event):
51+
self._received = self._received + 1
52+
if self._received == 1000:
53+
event.connection.close()
54+
raise ConsumerTestException("consumed")
55+
56+
57+
class MyMessageHandlerNoack(MessagingHandler):
58+
59+
def __init__(self):
60+
super().__init__(auto_accept=False, auto_settle=False)
61+
self._received = 0
62+
63+
def on_message(self, event: Event):
64+
self._received = self._received + 1
65+
if self._received == 1000:
66+
event.receiver.close()
67+
event.connection.close()
68+
raise ConsumerTestException("consumed")
69+
70+
def on_connection_closed(self, event: Event):
71+
print("connection closed")
72+
73+
def on_link_closed(self, event: Event) -> None:
74+
print("link closed")
75+
76+
def on_rejected(self, event: Event) -> None:
77+
print("rejected")
78+
79+
80+
class MyMessageHandlerReject(MessagingHandler):
81+
82+
def __init__(self):
83+
super().__init__(auto_accept=False, auto_settle=True)
84+
self._received = 0
85+
86+
def on_message(self, event: Event):
87+
dlv = event.delivery
88+
dlv.update(Delivery.REJECTED)
89+
dlv.settle()
90+
self._received = self._received + 1
91+
if self._received == 1000:
92+
event.connection.close()
93+
raise ConsumerTestException("consumed")
94+
95+
96+
class MyMessageHandlerRequeue(MessagingHandler):
97+
98+
def __init__(self):
99+
super().__init__(auto_accept=False, auto_settle=True)
100+
self._received = 0
101+
102+
def on_message(self, event: Event):
103+
dlv = event.delivery
104+
dlv.update(Delivery.RELEASED)
105+
dlv.settle()
106+
self._received = self._received + 1
107+
if self._received == 1000:
108+
event.connection.close()
109+
raise ConsumerTestException("consumed")
110+
111+
112+
def create_connection() -> Connection:
113+
connection_consumer = Connection("amqp://guest:guest@localhost:5672/")
114+
connection_consumer.dial()
115+
116+
return connection_consumer

0 commit comments

Comments
 (0)