Skip to content

Commit 7fcbb05

Browse files
author
DanielePalaia
committed
improvements
1 parent 658af34 commit 7fcbb05

File tree

6 files changed

+170
-25
lines changed

6 files changed

+170
-25
lines changed

rabbitmq_amqp_python_client/amqp_message.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Optional, Union, cast
1+
from typing import Union, cast
22
from uuid import UUID
33

44
from proton._data import Described
@@ -16,13 +16,14 @@ def __init__( # type: ignore
1616
**kwargs,
1717
):
1818
super().__init__(body=body, **kwargs)
19-
self._addr: Optional[str] = None
19+
self._addr: str = ""
2020
self._native_message = None
2121

2222
def to_address(self, addr: str) -> None:
23+
self.address = addr
2324
self._addr = addr
2425

25-
def address(self) -> Optional[str]:
26+
def get_address(self) -> str:
2627
return self._addr
2728

2829
def native_message(self) -> Message:

rabbitmq_amqp_python_client/connection.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,12 @@ def close(self) -> None:
8282
self._conn.close()
8383
self._connections.remove(self)
8484

85-
def publisher(self, destination: str) -> Publisher:
86-
if validate_address(destination) is False:
87-
raise ArgumentOutOfRangeException(
88-
"destination address must start with /queues or /exchanges"
89-
)
85+
def publisher(self, destination: str = "") -> Publisher:
86+
if destination != "":
87+
if validate_address(destination) is False:
88+
raise ArgumentOutOfRangeException(
89+
"destination address must start with /queues or /exchanges"
90+
)
9091
publisher = Publisher(self._conn, destination)
9192
return publisher
9293

rabbitmq_amqp_python_client/publisher.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import logging
22
from typing import Optional
33

4+
from .address_helper import validate_address
45
from .amqp_message import AmqpMessage
6+
from .exceptions import ArgumentOutOfRangeException
57
from .options import SenderOptionUnseattle
68
from .qpid.proton._delivery import Delivery
79
from .qpid.proton.utils import (
@@ -25,8 +27,18 @@ def _open(self) -> None:
2527
self._sender = self._create_sender(self._addr)
2628

2729
def publish(self, message: AmqpMessage) -> Delivery:
28-
if self._sender is not None:
29-
return self._sender.send(message.native_message())
30+
if self._addr != "":
31+
if self._sender is not None:
32+
return self._sender.send(message.native_message())
33+
else:
34+
if message.get_address() != "":
35+
if validate_address(message.get_address()) is False:
36+
raise ArgumentOutOfRangeException(
37+
"destination address must start with /queues or /exchanges"
38+
)
39+
if self._sender is not None:
40+
delivery = self._sender.send(message.native_message())
41+
return delivery
3042

3143
def close(self) -> None:
3244
logger.debug("Closing Sender")

rabbitmq_amqp_python_client/qpid/proton/_message.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ def _check(self, err: int) -> int:
140140
def _check_property_keys(self) -> None:
141141
"""
142142
AMQP allows only string keys for properties. This function checks that this requirement is met
143-
and raises a MessageException if not. However, in certain cases, conversions to string are
144143
automatically performed:
145144
146145
1. When a key is a user-defined (non-AMQP) subclass of str.

tests/test_publisher.py

Lines changed: 118 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@
44
AddressHelper,
55
AmqpMessage,
66
ArgumentOutOfRangeException,
7-
BindingSpecification,
87
Connection,
98
ConnectionClosed,
109
Environment,
11-
ExchangeSpecification,
10+
OutcomeState,
1211
QuorumQueueSpecification,
1312
StreamSpecification,
1413
)
1514

1615
from .http_requests import delete_all_connections
16+
from .utils import create_binding, publish_per_message
1717

1818

1919
def test_publish_queue(connection: Connection) -> None:
@@ -31,18 +31,64 @@ def test_publish_queue(connection: Connection) -> None:
3131
try:
3232
publisher = connection.publisher("/queues/" + queue_name)
3333
status = publisher.publish(AmqpMessage(body="test"))
34-
if status.ACCEPTED:
34+
if status.remote_state == OutcomeState.ACCEPTED:
35+
accepted = True
36+
except Exception:
37+
raised = True
38+
39+
if publisher is not None:
40+
publisher.close()
41+
42+
management.delete_queue(queue_name)
43+
management.close()
44+
45+
assert accepted is True
46+
assert raised is False
47+
48+
49+
def test_publish_per_message(connection: Connection) -> None:
50+
51+
queue_name = "test-queue-1"
52+
queue_name_2 = "test-queue-2"
53+
management = connection.management()
54+
55+
management.declare_queue(QuorumQueueSpecification(name=queue_name))
56+
management.declare_queue(QuorumQueueSpecification(name=queue_name_2))
57+
58+
raised = False
59+
60+
publisher = None
61+
accepted = False
62+
accepted_2 = True
63+
64+
try:
65+
publisher = connection.publisher()
66+
status = publish_per_message(
67+
publisher, addr=AddressHelper.queue_address(queue_name)
68+
)
69+
if status.remote_state == OutcomeState.ACCEPTED:
3570
accepted = True
71+
status = publish_per_message(
72+
publisher, addr=AddressHelper.queue_address(queue_name_2)
73+
)
74+
if status.remote_state == OutcomeState.ACCEPTED:
75+
accepted_2 = True
3676
except Exception:
3777
raised = True
3878

3979
if publisher is not None:
4080
publisher.close()
4181

82+
purged_messages_queue_1 = management.purge_queue(queue_name)
83+
purged_messages_queue_2 = management.purge_queue(queue_name_2)
4284
management.delete_queue(queue_name)
85+
management.delete_queue(queue_name_2)
4386
management.close()
4487

4588
assert accepted is True
89+
assert accepted_2 is True
90+
assert purged_messages_queue_1 == 1
91+
assert purged_messages_queue_2 == 1
4692
assert raised is False
4793

4894

@@ -90,24 +136,36 @@ def test_publish_to_invalid_destination(connection: Connection) -> None:
90136
assert raised is True
91137

92138

139+
def test_publish_per_message_to_invalid_destination(connection: Connection) -> None:
140+
141+
queue_name = "test-queue-1"
142+
raised = False
143+
144+
message = AmqpMessage(body="test")
145+
message.to_address("/invalid_destination/" + queue_name)
146+
publisher = connection.publisher()
147+
148+
try:
149+
publisher.publish(message)
150+
except ArgumentOutOfRangeException:
151+
raised = True
152+
except Exception:
153+
raised = False
154+
155+
if publisher is not None:
156+
publisher.close()
157+
158+
assert raised is True
159+
160+
93161
def test_publish_exchange(connection: Connection) -> None:
94162

95163
exchange_name = "test-exchange"
96164
queue_name = "test-queue"
97165
management = connection.management()
98166
routing_key = "routing-key"
99167

100-
management.declare_exchange(ExchangeSpecification(name=exchange_name))
101-
102-
management.declare_queue(QuorumQueueSpecification(name=queue_name))
103-
104-
management.bind(
105-
BindingSpecification(
106-
source_exchange=exchange_name,
107-
destination_queue=queue_name,
108-
binding_key=routing_key,
109-
)
110-
)
168+
bind_name = create_binding(management, exchange_name, queue_name, routing_key)
111169

112170
addr = AddressHelper.exchange_address(exchange_name, routing_key)
113171

@@ -124,6 +182,7 @@ def test_publish_exchange(connection: Connection) -> None:
124182

125183
publisher.close()
126184

185+
management.unbind(bind_name)
127186
management.delete_exchange(exchange_name)
128187
management.delete_queue(queue_name)
129188
management.close()
@@ -265,3 +324,48 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None:
265324
for i in range(messages_to_send):
266325

267326
publisher.publish(AmqpMessage(body="test"))
327+
328+
329+
def test_publish_per_message_exchange(connection: Connection) -> None:
330+
331+
exchange_name = "test-exchange-per-message"
332+
queue_name = "test-queue-per-message"
333+
management = connection.management()
334+
routing_key = "routing-key-per-message"
335+
336+
bind_name = create_binding(management, exchange_name, queue_name, routing_key)
337+
338+
raised = False
339+
340+
publisher = None
341+
accepted = False
342+
accepted_2 = True
343+
344+
try:
345+
publisher = connection.publisher()
346+
status = publish_per_message(
347+
publisher, addr=AddressHelper.exchange_address(exchange_name, routing_key)
348+
)
349+
if status.remote_state == OutcomeState.ACCEPTED:
350+
accepted = True
351+
status = publish_per_message(
352+
publisher, addr=AddressHelper.queue_address(queue_name)
353+
)
354+
if status.remote_state == OutcomeState.ACCEPTED:
355+
accepted_2 = True
356+
except Exception:
357+
raised = True
358+
359+
# if publisher is not None:
360+
publisher.close()
361+
362+
purged_messages_queue = management.purge_queue(queue_name)
363+
management.unbind(bind_name)
364+
management.delete_exchange(exchange_name)
365+
management.delete_queue(queue_name)
366+
management.close()
367+
368+
assert accepted is True
369+
assert accepted_2 is True
370+
assert purged_messages_queue == 2
371+
assert raised is False

tests/utils.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
AmqpMessage,
55
BindingSpecification,
66
Connection,
7+
Delivery,
78
ExchangeSpecification,
89
ExchangeType,
910
Management,
11+
Publisher,
1012
QuorumQueueSpecification,
1113
)
1214

@@ -29,6 +31,13 @@ def publish_messages(
2931
publisher.close()
3032

3133

34+
def publish_per_message(publisher: Publisher, addr: str) -> Delivery:
35+
message = AmqpMessage(body="test")
36+
message.to_address(addr)
37+
status = publisher.publish(message)
38+
return status
39+
40+
3241
def setup_dead_lettering(management: Management) -> str:
3342

3443
exchange_dead_lettering = "exchange-dead-letter"
@@ -55,6 +64,25 @@ def setup_dead_lettering(management: Management) -> str:
5564
return bind_path
5665

5766

67+
def create_binding(
68+
management: Management, exchange_name: str, queue_name: str, routing_key: str
69+
) -> str:
70+
71+
management.declare_exchange(ExchangeSpecification(name=exchange_name))
72+
73+
management.declare_queue(QuorumQueueSpecification(name=queue_name))
74+
75+
bind_name = management.bind(
76+
BindingSpecification(
77+
source_exchange=exchange_name,
78+
destination_queue=queue_name,
79+
binding_key=routing_key,
80+
)
81+
)
82+
83+
return bind_name
84+
85+
5886
def cleanup_dead_lettering(management: Management, bind_path: str) -> None:
5987

6088
exchange_dead_lettering = "exchange-dead-letter"

0 commit comments

Comments
 (0)