Skip to content

Commit ec2535b

Browse files
author
DanielePalaia
committed
adding code documentation plus few minor improvements
1 parent ddb95f7 commit ec2535b

File tree

8 files changed

+564
-4
lines changed

8 files changed

+564
-4
lines changed

rabbitmq_amqp_python_client/address_helper.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,25 @@ def encode_path_segment(input_string: Optional[str]) -> str:
3131

3232

3333
class AddressHelper:
34+
"""
35+
Helper class for constructing and managing AMQP addresses.
36+
37+
This class provides static methods for creating properly formatted addresses
38+
for various AMQP operations including exchanges, queues, and bindings.
39+
"""
3440

3541
@staticmethod
3642
def exchange_address(exchange_name: str, routing_key: str = "") -> str:
43+
"""
44+
Create an address for an exchange, optionally with a routing key.
45+
46+
Args:
47+
exchange_name: The name of the exchange
48+
routing_key: Optional routing key
49+
50+
Returns:
51+
str: The formatted exchange address
52+
"""
3753
if routing_key == "":
3854
path = "/exchanges/" + encode_path_segment(exchange_name)
3955
else:
@@ -48,12 +64,30 @@ def exchange_address(exchange_name: str, routing_key: str = "") -> str:
4864

4965
@staticmethod
5066
def queue_address(name: str) -> str:
67+
"""
68+
Create an address for a queue.
69+
70+
Args:
71+
name: The name of the queue
72+
73+
Returns:
74+
str: The formatted queue address
75+
"""
5176
path = "/queues/" + encode_path_segment(name)
5277

5378
return path
5479

5580
@staticmethod
5681
def purge_queue_address(name: str) -> str:
82+
"""
83+
Create an address for purging a queue.
84+
85+
Args:
86+
name: The name of the queue to purge
87+
88+
Returns:
89+
str: The formatted purge queue address
90+
"""
5791
path = "/queues/" + encode_path_segment(name) + "/messages"
5892

5993
return path
@@ -68,6 +102,15 @@ def path_address() -> str:
68102
def binding_path_with_exchange_queue(
69103
bind_specification: ExchangeToQueueBindingSpecification,
70104
) -> str:
105+
"""
106+
Create a binding path for an exchange-to-queue binding.
107+
108+
Args:
109+
bind_specification: The specification for the binding
110+
111+
Returns:
112+
str: The formatted binding path
113+
"""
71114
if bind_specification.binding_key is not None:
72115
key = ";key=" + encode_path_segment(bind_specification.binding_key)
73116
else:
@@ -90,6 +133,15 @@ def binding_path_with_exchange_queue(
90133
def binding_path_with_exchange_exchange(
91134
bind_specification: ExchangeToExchangeBindingSpecification,
92135
) -> str:
136+
"""
137+
Create a binding path for an exchange-to-exchange binding.
138+
139+
Args:
140+
bind_specification: The specification for the binding
141+
142+
Returns:
143+
str: The formatted binding path
144+
"""
93145
binding_path_wth_exchange_exchange_key = (
94146
"/bindings"
95147
+ "/"
@@ -106,6 +158,16 @@ def binding_path_with_exchange_exchange(
106158

107159
@staticmethod
108160
def message_to_address_helper(message: Message, address: str) -> Message:
161+
"""
162+
Set the address on a message.
163+
164+
Args:
165+
message: The message to modify
166+
address: The address to set
167+
168+
Returns:
169+
Message: The modified message with the new address
170+
"""
109171
message.address = address
110172
return message
111173

rabbitmq_amqp_python_client/connection.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,15 @@
1919

2020

2121
class Connection:
22+
"""
23+
Main connection class for interacting with RabbitMQ via AMQP 1.0 protocol.
24+
25+
This class manages the connection to RabbitMQ and provides factory methods for
26+
creating publishers, consumers, and management interfaces. It supports both
27+
single-node and multi-node configurations, as well as SSL/TLS connections.
28+
29+
"""
30+
2231
def __init__(
2332
self,
2433
# single-node mode
@@ -28,6 +37,18 @@ def __init__(
2837
ssl_context: Optional[SslConfigurationContext] = None,
2938
on_disconnection_handler: Optional[CB] = None, # type: ignore
3039
):
40+
"""
41+
Initialize a new Connection instance.
42+
43+
Args:
44+
uri: Single node connection URI
45+
uris: List of URIs for multi-node setup
46+
ssl_context: SSL configuration for secure connections
47+
on_disconnection_handler: Callback for handling disconnection events
48+
49+
Raises:
50+
ValueError: If neither uri nor uris is provided
51+
"""
3152
if uri is None and uris is None:
3253
raise ValueError("You need to specify at least an addr or a list of addr")
3354
self._addr: Optional[str] = uri
@@ -44,6 +65,12 @@ def _set_environment_connection_list(self, connections: []): # type: ignore
4465
self._connections = connections
4566

4667
def dial(self) -> None:
68+
"""
69+
Establish a connection to the AMQP server.
70+
71+
Configures SSL if specified and establishes the connection using the
72+
provided URI(s). Also initializes the management interface.
73+
"""
4774
logger.debug("Establishing a connection to the amqp server")
4875
if self._conf_ssl_context is not None:
4976
logger.debug("Enabling SSL")
@@ -74,15 +101,38 @@ def _open(self) -> None:
74101
self._management.open()
75102

76103
def management(self) -> Management:
104+
"""
105+
Get the management interface for this connection.
106+
107+
Returns:
108+
Management: The management interface for performing administrative tasks
109+
"""
77110
return self._management
78111

79112
# closes the connection to the AMQP 1.0 server.
80113
def close(self) -> None:
114+
"""
115+
Close the connection to the AMQP 1.0 server.
116+
117+
Closes the underlying connection and removes it from the connection list.
118+
"""
81119
logger.debug("Closing connection")
82120
self._conn.close()
83121
self._connections.remove(self)
84122

85123
def publisher(self, destination: str = "") -> Publisher:
124+
"""
125+
Create a new publisher instance.
126+
127+
Args:
128+
destination: Optional default destination for published messages
129+
130+
Returns:
131+
Publisher: A new publisher instance
132+
133+
Raises:
134+
ArgumentOutOfRangeException: If destination address format is invalid
135+
"""
86136
if destination != "":
87137
if validate_address(destination) is False:
88138
raise ArgumentOutOfRangeException(
@@ -98,6 +148,21 @@ def consumer(
98148
stream_filter_options: Optional[StreamOptions] = None,
99149
credit: Optional[int] = None,
100150
) -> Consumer:
151+
"""
152+
Create a new consumer instance.
153+
154+
Args:
155+
destination: The address to consume from
156+
message_handler: Optional handler for processing messages
157+
stream_filter_options: Optional configuration for stream consumption
158+
credit: Optional credit value for flow control
159+
160+
Returns:
161+
Consumer: A new consumer instance
162+
163+
Raises:
164+
ArgumentOutOfRangeException: If destination address format is invalid
165+
"""
101166
if validate_address(destination) is False:
102167
raise ArgumentOutOfRangeException(
103168
"destination address must start with /queues or /exchanges"

rabbitmq_amqp_python_client/consumer.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,22 @@
1717

1818

1919
class Consumer:
20+
"""
21+
A consumer class for receiving messages from RabbitMQ via AMQP 1.0 protocol.
22+
23+
This class handles the consumption of messages from a specified address in RabbitMQ.
24+
It supports both standard message consumption and stream-based consumption with
25+
optional filtering capabilities.
26+
27+
Attributes:
28+
_receiver (Optional[BlockingReceiver]): The receiver for consuming messages
29+
_conn (BlockingConnection): The underlying connection to RabbitMQ
30+
_addr (str): The address to consume from
31+
_handler (Optional[MessagingHandler]): Optional message handling callback
32+
_stream_options (Optional[StreamOptions]): Configuration for stream consumption
33+
_credit (Optional[int]): Flow control credit value
34+
"""
35+
2036
def __init__(
2137
self,
2238
conn: BlockingConnection,
@@ -25,6 +41,16 @@ def __init__(
2541
stream_options: Optional[StreamOptions] = None,
2642
credit: Optional[int] = None,
2743
):
44+
"""
45+
Initialize a new Consumer instance.
46+
47+
Args:
48+
conn: The blocking connection to use for consuming
49+
addr: The address to consume from
50+
handler: Optional message handler for processing received messages
51+
stream_options: Optional configuration for stream-based consumption
52+
credit: Optional credit value for flow control
53+
"""
2854
self._receiver: Optional[BlockingReceiver] = None
2955
self._conn = conn
3056
self._addr = addr
@@ -39,21 +65,52 @@ def _open(self) -> None:
3965
self._receiver = self._create_receiver(self._addr)
4066

4167
def consume(self, timeout: Union[None, Literal[False], float] = False) -> Message:
68+
"""
69+
Consume a message from the queue.
70+
71+
Args:
72+
timeout: The time to wait for a message.
73+
None: Defaults to 60s
74+
float: Wait for specified number of seconds
75+
76+
Returns:
77+
Message: The received message
78+
79+
Note:
80+
The return type might be None if no message is available and timeout occurs,
81+
but this is handled by the cast to Message.
82+
"""
4283
if self._receiver is not None:
4384
message = self._receiver.receive(timeout=timeout)
4485
return cast(Message, message)
4586

4687
def close(self) -> None:
88+
"""
89+
Close the consumer connection.
90+
91+
Closes the receiver if it exists and cleans up resources.
92+
"""
4793
logger.debug("Closing the receiver")
4894
if self._receiver is not None:
4995
self._receiver.close()
5096

5197
def run(self) -> None:
98+
"""
99+
Run the consumer in continuous mode.
100+
101+
Starts the consumer's container to process messages continuously.
102+
"""
52103
logger.debug("Running the consumer: starting to consume")
53104
if self._receiver is not None:
54105
self._receiver.container.run()
55106

56107
def stop(self) -> None:
108+
"""
109+
Stop the consumer's continuous processing.
110+
111+
Stops the consumer's container, halting message processing.
112+
This should be called to cleanly stop a consumer that was started with run().
113+
"""
57114
logger.debug("Stopping the consumer: starting to consume")
58115
if self._receiver is not None:
59116
self._receiver.container.stop_events()

0 commit comments

Comments
 (0)