Skip to content

Commit 1be8172

Browse files
author
DanielePalaia
committed
Merge branch 'consumer_implementation' into consumer_imp
2 parents ba88885 + 8465bf3 commit 1be8172

File tree

8 files changed

+354
-5
lines changed

8 files changed

+354
-5
lines changed

examples/getting_started/main.py

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,40 @@
1+
# type: ignore
2+
13
from rabbitmq_amqp_python_client import (
24
BindingSpecification,
35
Connection,
6+
Event,
47
ExchangeSpecification,
58
Message,
9+
MessagingHandler,
610
QuorumQueueSpecification,
711
exchange_address,
12+
queue_address,
813
)
914

1015

16+
class MyMessageHandler(MessagingHandler):
17+
18+
def __init__(self):
19+
super().__init__()
20+
21+
def on_message(self, event: Event):
22+
print("received message: " + event.message.body)
23+
self.accept(event.delivery)
24+
25+
def on_connection_closed(self, event: Event):
26+
print("connection closed")
27+
28+
def on_connection_cloing(self, event: Event):
29+
print("connection closed")
30+
31+
def on_link_closed(self, event: Event) -> None:
32+
print("link closed")
33+
34+
def on_rejected(self, event: Event) -> None:
35+
print("rejected")
36+
37+
1138
def main() -> None:
1239
exchange_name = "test-exchange"
1340
queue_name = "example-queue"
@@ -35,21 +62,33 @@ def main() -> None:
3562

3663
addr = exchange_address(exchange_name, routing_key)
3764

65+
addr_queue = queue_address(queue_name)
66+
3867
print("create a publisher and publish a test message")
3968
publisher = connection.publisher(addr)
4069

4170
publisher.publish(Message(body="test"))
4271

72+
print("purging the queue")
73+
messages_purged = management.purge_queue(queue_name)
74+
75+
print("messages purged: " + str(messages_purged))
76+
77+
for i in range(10):
78+
publisher.publish(Message(body="test"))
79+
4380
publisher.close()
4481

82+
print("create a consumer and consume the test message")
83+
84+
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())
85+
4586
print("unbind")
4687
management.unbind(bind_name)
4788

48-
print("purging the queue")
49-
management.purge_queue(queue_name)
50-
89+
consumer.close()
5190
print("delete queue")
52-
management.delete_queue(queue_name)
91+
# management.delete_queue(queue_name)
5392

5493
print("delete exchange")
5594
management.delete_exchange(exchange_name)

examples/getting_started/main.pytmp

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# type: ignore
2+
3+
from rabbitmq_amqp_python_client import (
4+
BindingSpecification,
5+
Connection,
6+
Event,
7+
ExchangeSpecification,
8+
Message,
9+
MessagingHandler,
10+
QuorumQueueSpecification,
11+
exchange_address,
12+
queue_address,
13+
)
14+
15+
16+
class MyMessageHandler(MessagingHandler):
17+
18+
def __init__(self):
19+
super().__init__()
20+
21+
def on_message(self, event: Event):
22+
print("received message: " + event.message.body)
23+
self.accept(event.delivery)
24+
25+
def on_connection_closed(self, event: Event):
26+
print("connection closed")
27+
28+
def on_connection_cloing(self, event: Event):
29+
print("connection closed")
30+
31+
def on_link_closed(self, event: Event) -> None:
32+
print("link closed")
33+
34+
def on_rejected(self, event: Event) -> None:
35+
print("rejected")
36+
37+
38+
def main() -> None:
39+
exchange_name = "test-exchange"
40+
queue_name = "example-queue"
41+
routing_key = "routing-key"
42+
connection = Connection("amqp://guest:guest@localhost:5672/")
43+
44+
print("connection to amqp server")
45+
connection.dial()
46+
47+
management = connection.management()
48+
49+
print("declaring exchange and queue")
50+
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
51+
52+
management.declare_queue(QuorumQueueSpecification(name=queue_name))
53+
54+
print("binding queue to exchange")
55+
bind_name = management.bind(
56+
BindingSpecification(
57+
source_exchange=exchange_name,
58+
destination_queue=queue_name,
59+
binding_key=routing_key,
60+
)
61+
)
62+
63+
addr = exchange_address(exchange_name, routing_key)
64+
65+
addr_queue = queue_address(queue_name)
66+
67+
print("create a publisher and publish a test message")
68+
publisher = connection.publisher(addr)
69+
70+
publisher.publish(Message(body="test"))
71+
72+
print("purging the queue")
73+
messages_purged = management.purge_queue(queue_name)
74+
75+
print("messages purged: " + str(messages_purged))
76+
77+
for i in range(10):
78+
publisher.publish(Message(body="test"))
79+
80+
publisher.close()
81+
82+
print("create a consumer and consume the test message")
83+
84+
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())
85+
86+
print("unbind")
87+
management.unbind(bind_name)
88+
89+
consumer.close()
90+
print("delete queue")
91+
# management.delete_queue(queue_name)
92+
93+
print("delete exchange")
94+
management.delete_exchange(exchange_name)
95+
96+
print("closing connections")
97+
management.close()
98+
connection.close()
99+
100+
101+
if __name__ == "__main__":
102+
main()

rabbitmq_amqp_python_client/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,22 @@
33
from .address_helper import exchange_address, queue_address
44
from .common import QueueType
55
from .connection import Connection
6+
<<<<<<< HEAD
7+
=======
8+
from .consumer import Consumer
9+
>>>>>>> consumer_implementation
610
from .entities import (
711
BindingSpecification,
812
ExchangeSpecification,
913
)
1014
from .management import Management
1115
from .publisher import Publisher
16+
17+
from .qpid.proton._message import Message
18+
19+
from .qpid.proton._events import Event
1220
from .qpid.proton._message import Message
21+
from .qpid.proton.handlers import MessagingHandler
1322
from .queues import (
1423
ClassicQueueSpecification,
1524
QuorumQueueSpecification,
@@ -38,4 +47,8 @@
3847
"exchange_address",
3948
"queue_address",
4049
"Message",
50+
51+
"Consumer",
52+
"MessagingHandler",
53+
"Event",
4154
]

rabbitmq_amqp_python_client/connection.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import logging
22

3+
from typing import Optional
4+
5+
from .consumer import Consumer
36
from .management import Management
47
from .publisher import Publisher
8+
from .qpid.proton._handlers import MessagingHandler
59
from .qpid.proton.utils import BlockingConnection
610

711
logger = logging.getLogger(__name__)
@@ -34,3 +38,10 @@ def close(self) -> None:
3438
def publisher(self, destination: str) -> Publisher:
3539
publisher = Publisher(self._conn, destination)
3640
return publisher
41+
42+
43+
def consumer(
44+
self, destination: str, handler: Optional[MessagingHandler] = None
45+
) -> Consumer:
46+
consumer = Consumer(self._conn, destination, handler)
47+
return consumer
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import logging
2+
from typing import Optional
3+
4+
from .options import ReceiverOption
5+
from .qpid.proton._handlers import MessagingHandler
6+
from .qpid.proton._message import Message
7+
from .qpid.proton.utils import (
8+
BlockingConnection,
9+
BlockingReceiver,
10+
)
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class Consumer:
16+
def __init__(
17+
self,
18+
conn: BlockingConnection,
19+
addr: str,
20+
handler: Optional[MessagingHandler] = None,
21+
):
22+
self._receiver: Optional[BlockingReceiver] = None
23+
self._conn = conn
24+
self._addr = addr
25+
self._handler = handler
26+
self._open()
27+
28+
def _open(self) -> None:
29+
if self._receiver is None:
30+
logger.debug("Creating Sender")
31+
self._receiver = self._create_receiver(self._addr)
32+
33+
def consume(self) -> Message:
34+
if self._receiver is not None:
35+
return self._receiver.receive()
36+
37+
def close(self) -> None:
38+
logger.debug("Closing Sender and Receiver")
39+
if self._receiver is not None:
40+
self._receiver.close()
41+
if self._receiver is not None:
42+
self._receiver.close()
43+
44+
def _create_receiver(self, addr: str) -> BlockingReceiver:
45+
return self._conn.create_receiver(
46+
addr, options=ReceiverOption(addr), handler=self._handler
47+
)
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import logging
2+
from typing import Optional
3+
4+
from .options import ReceiverOption
5+
from .qpid.proton._handlers import MessagingHandler
6+
from .qpid.proton._message import Message
7+
from .qpid.proton.utils import (
8+
BlockingConnection,
9+
BlockingReceiver,
10+
)
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class Consumer:
16+
def __init__(
17+
self,
18+
conn: BlockingConnection,
19+
addr: str,
20+
handler: Optional[MessagingHandler] = None,
21+
):
22+
self._receiver: Optional[BlockingReceiver] = None
23+
self._conn = conn
24+
self._addr = addr
25+
self._handler = handler
26+
self._open()
27+
28+
def _open(self) -> None:
29+
if self._receiver is None:
30+
logger.debug("Creating Sender")
31+
self._receiver = self._create_receiver(self._addr)
32+
33+
def consume(self) -> Message:
34+
if self._receiver is not None:
35+
return self._receiver.receive()
36+
37+
def close(self) -> None:
38+
logger.debug("Closing Sender and Receiver")
39+
if self._receiver is not None:
40+
self._receiver.close()
41+
if self._receiver is not None:
42+
self._receiver.close()
43+
44+
def _create_receiver(self, addr: str) -> BlockingReceiver:
45+
return self._conn.create_receiver(
46+
addr, options=ReceiverOption(addr), handler=self._handler
47+
)

rabbitmq_amqp_python_client/qpid/proton/_utils.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ def __init__(
272272
)
273273
if credit:
274274
receiver.flow(credit)
275+
275276
self.fetcher = fetcher
276277
self.container = connection.container
277278

@@ -526,7 +527,8 @@ def create_receiver(
526527
handler=handler or fetcher,
527528
options=options,
528529
),
529-
fetcher,
530+
531+
handler or fetcher,
530532
credit=prefetch,
531533
)
532534

0 commit comments

Comments
 (0)