Skip to content

Commit 8fdd752

Browse files
author
DanielePalaia
committed
implementing bind
1 parent 9e7bec5 commit 8fdd752

File tree

8 files changed

+132
-32
lines changed

8 files changed

+132
-32
lines changed

examples/getting_started/Makefile

Lines changed: 0 additions & 11 deletions
This file was deleted.

examples/getting_started/main.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from rabbitmq_amqp_python_client import (
2+
BindingSpecification,
23
Connection,
34
ExchangeSpecification,
45
QueueSpecification,
@@ -7,8 +8,9 @@
78

89

910
def main() -> None:
10-
exchange_name = "getting-started-exchange"
11+
exchange_name = "test-exchange"
1112
queue_name = "example-queue"
13+
routing_key = "routing-key"
1214
connection = Connection("amqp://guest:guest@localhost:5672/")
1315

1416
connection.dial()
@@ -21,10 +23,13 @@ def main() -> None:
2123
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
2224
)
2325

24-
"""
25-
#management.bind(BindingSpecification(source_exchange=exchange_name, destination_queue=queue_name, \
26-
binding_key=routing_key))
27-
"""
26+
binding_exchange_queue_path = management.bind(
27+
BindingSpecification(
28+
source_exchange=exchange_name,
29+
destination_queue=queue_name,
30+
binding_key=routing_key,
31+
)
32+
)
2833

2934
"""
3035
addr = exchange_address(exchange_name, routing_key)
@@ -44,9 +49,7 @@ def main() -> None:
4449
publisher.close()
4550
"""
4651

47-
"""
48-
management.unbind(binding_path)
49-
"""
52+
management.unbind(binding_exchange_queue_path)
5053

5154
"""
5255
management.purge_queue(queue_info.name)

examples/getting_started/requirements.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.

rabbitmq_amqp_python_client/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from .common import QueueType
44
from .connection import Connection
55
from .entities import (
6+
BindingSpecification,
67
ExchangeSpecification,
78
QueueSpecification,
89
)
@@ -20,5 +21,6 @@
2021
"Connection",
2122
"ExchangeSpecification",
2223
"QueueSpecification",
24+
"BindingSpecification",
2325
"QueueType",
2426
]

rabbitmq_amqp_python_client/address_helper.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
from .entities import BindingSpecification
2+
3+
14
def exchange_address(name: str) -> str:
25
path = "/exchanges/" + name
36

@@ -8,3 +11,25 @@ def queue_address(name: str) -> str:
811
path = "/queues/" + name
912

1013
return path
14+
15+
16+
def path_address() -> str:
17+
path = "/bindings"
18+
19+
return path
20+
21+
22+
def binding_path_with_exchange_queue(bind_specification: BindingSpecification) -> str:
23+
binding_path_wth_exchange_queue_key = (
24+
"/bindings"
25+
+ "/"
26+
+ "src="
27+
+ bind_specification.source_exchange
28+
+ ";"
29+
+ "dstq="
30+
+ bind_specification.destination_queue
31+
+ ";key="
32+
+ bind_specification.binding_key
33+
+ ";args="
34+
)
35+
return binding_path_wth_exchange_queue_key

rabbitmq_amqp_python_client/entities.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,11 @@ class QueueSpecification:
2525
dead_letter_exchange: str = ""
2626
is_auto_delete: bool = False
2727
is_durable: bool = True
28+
29+
30+
@dataclass
31+
class BindingSpecification:
32+
source_exchange: str
33+
destination_queue: str
34+
# destination_exchange: str
35+
binding_key: str

rabbitmq_amqp_python_client/management.py

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,15 @@
99
BlockingSender,
1010
)
1111

12-
from .address_helper import exchange_address, queue_address
12+
from .address_helper import (
13+
binding_path_with_exchange_queue,
14+
exchange_address,
15+
path_address,
16+
queue_address,
17+
)
1318
from .common import CommonValues
1419
from .entities import (
20+
BindingSpecification,
1521
ExchangeSpecification,
1622
QueueSpecification,
1723
)
@@ -80,9 +86,6 @@ def _request(
8086

8187
self._validate_reponse_code(int(msg.subject), expected_response_codes)
8288

83-
# TODO
84-
# def delete_queue(self, name:str):
85-
8689
def declare_exchange(
8790
self, exchange_specification: ExchangeSpecification
8891
) -> ExchangeSpecification:
@@ -123,8 +126,6 @@ def declare_queue(
123126

124127
path = queue_address(queue_specification.name)
125128

126-
print(path)
127-
128129
self.request(
129130
body,
130131
path,
@@ -155,8 +156,6 @@ def delete_exchange(self, exchange_name: str) -> None:
155156
def delete_queue(self, queue_name: str) -> None:
156157
path = queue_address(queue_name)
157158

158-
print(path)
159-
160159
self.request(
161160
None,
162161
path,
@@ -169,6 +168,7 @@ def delete_queue(self, queue_name: str) -> None:
169168
def _validate_reponse_code(
170169
self, response_code: int, expected_response_codes: list[int]
171170
) -> None:
171+
print("response_code received: " + str(response_code))
172172
if response_code == CommonValues.response_code_409.value:
173173
# TODO replace with a new defined Exception
174174
raise Exception("ErrPreconditionFailed")
@@ -180,10 +180,37 @@ def _validate_reponse_code(
180180
raise Exception("wrong response code received")
181181

182182
# TODO
183-
# def bind(self, bind_specification:BindSpecification):
183+
def bind(self, bind_specification: BindingSpecification) -> str:
184+
body = {}
185+
body["binding_key"] = bind_specification.binding_key
186+
body["source"] = bind_specification.source_exchange
187+
body["destination_queue"] = bind_specification.destination_queue
188+
body["arguments"] = {} # type: ignore
189+
190+
path = path_address()
191+
192+
self.request(
193+
body,
194+
path,
195+
CommonValues.command_post.value,
196+
[
197+
CommonValues.response_code_204.value,
198+
],
199+
)
200+
201+
binding_path_with_queue = binding_path_with_exchange_queue(bind_specification)
202+
return binding_path_with_queue
184203

185204
# TODO
186-
# def unbind(self, binding_path:str):
205+
def unbind(self, binding_exchange_queue_path: str) -> None:
206+
self.request(
207+
None,
208+
binding_exchange_queue_path,
209+
CommonValues.command_delete.value,
210+
[
211+
CommonValues.response_code_200.value,
212+
],
213+
)
187214

188215
# TODO
189216
# def queue_info(self, queue_name:str):

tests/test_management.py

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from rabbitmq_amqp_python_client import (
2+
BindingSpecification,
23
Connection,
34
ExchangeSpecification,
45
QueueSpecification,
@@ -32,13 +33,59 @@ def test_declare_delete_queue() -> None:
3233
queue_name = "test-queue"
3334
management = connection.management()
3435

35-
exchange_info = management.declare_queue(
36+
queue_info = management.declare_queue(
3637
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
3738
)
3839

39-
assert exchange_info.name == queue_name
40+
assert queue_info.name == queue_name
4041

4142
# Still not working
4243
# management.delete_queue(queue_name)
4344

4445
connection.close()
46+
47+
48+
def test_bind_exchange_to_queue() -> None:
49+
connection = Connection("amqp://guest:guest@localhost:5672/")
50+
connection.dial()
51+
52+
exchange_name = "test-bind-exchange-to-queue-exchange"
53+
queue_name = "test-bind-exchange-to-queue-queue"
54+
routing_key = "routing-key"
55+
management = connection.management()
56+
57+
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
58+
59+
management.declare_queue(
60+
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
61+
)
62+
63+
binding_exchange_queue_path = management.bind(
64+
BindingSpecification(
65+
source_exchange=exchange_name,
66+
destination_queue=queue_name,
67+
binding_key=routing_key,
68+
)
69+
)
70+
71+
print(binding_exchange_queue_path)
72+
73+
assert (
74+
binding_exchange_queue_path
75+
== "/bindings/src="
76+
+ exchange_name
77+
+ ";dstq="
78+
+ queue_name
79+
+ ";key="
80+
+ routing_key
81+
+ ";args="
82+
)
83+
84+
# Still not working
85+
# management.delete_exchange(exchange_name)
86+
87+
# Still not working
88+
# management.delete_queue(queue_name)
89+
90+
# Still not working
91+
# management.delete_bind(binding_exchange_queue_path)

0 commit comments

Comments
 (0)