Skip to content

Environment and improvements for doc compatibility #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ poetry run python ./examples/getting_started/main.py

### Creating a connection

A connection to the RabbitMQ AMQP 1.0 server can be established using the Connection object.
A connection to the RabbitMQ AMQP 1.0 server can be established using the Environment object.

For example:

```python
connection = Connection("amqp://guest:guest@localhost:5672/")
environment = Environment()
connection = environment.connection("amqp://guest:guest@localhost:5672/")
connection.dial()
```

Expand Down
31 changes: 22 additions & 9 deletions examples/getting_started/basic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
AMQPMessagingHandler,
BindingSpecification,
Connection,
Disposition,
Environment,
Event,
ExchangeSpecification,
Message,
OutcomeState,
QuorumQueueSpecification,
)

Expand Down Expand Up @@ -61,8 +62,19 @@ def on_link_closed(self, event: Event) -> None:
print("link closed")


def create_connection() -> Connection:
connection = Connection("amqp://guest:guest@localhost:5672/")
def create_connection(environment: Environment) -> Connection:
connection = environment.connection("amqp://guest:guest@localhost:5672/")
# in case of SSL enablement
# ca_cert_file = ".ci/certs/ca_certificate.pem"
# client_cert = ".ci/certs/client_certificate.pem"
# client_key = ".ci/certs/client_key.pem"
# connection = Connection(
# "amqps://guest:guest@localhost:5671/",
# ssl_context=SslConfigurationContext(
# ca_cert=ca_cert_file,
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
# ),
# )
connection.dial()

return connection
Expand All @@ -75,12 +87,13 @@ def main() -> None:
routing_key = "routing-key"

print("connection to amqp server")
connection = create_connection()
environment = Environment()
connection = create_connection(environment)

management = connection.management()

print("declaring exchange and queue")
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
management.declare_exchange(ExchangeSpecification(name=exchange_name))

management.declare_queue(
QuorumQueueSpecification(name=queue_name)
Expand Down Expand Up @@ -113,11 +126,11 @@ def main() -> None:
for i in range(MESSAGES_TO_PUBLISH):
print("publishing")
status = publisher.publish(Message(body="test"))
if status.remote_state == Disposition.ACCEPTED:
if status.remote_state == OutcomeState.ACCEPTED:
print("message accepted")
elif status.remote_state == Disposition.RELEASED:
elif status.remote_state == OutcomeState.RELEASED:
print("message not routed")
elif status.remote_state == Disposition.REJECTED:
elif status.remote_state == OutcomeState.REJECTED:
print("message not rejected")

publisher.close()
Expand Down Expand Up @@ -150,7 +163,7 @@ def main() -> None:
print("closing connections")
management.close()
print("after management closing")
connection.close()
environment.close()
print("after connection closing")


Expand Down
23 changes: 18 additions & 5 deletions examples/getting_started/example_with_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
AddressHelper,
AMQPMessagingHandler,
Connection,
Environment,
Event,
Message,
OffsetSpecification,
Expand Down Expand Up @@ -65,8 +66,19 @@ def on_link_closed(self, event: Event) -> None:
print("link closed")


def create_connection() -> Connection:
connection = Connection("amqp://guest:guest@localhost:5672/")
def create_connection(environment: Environment) -> Connection:
connection = environment.connection("amqp://guest:guest@localhost:5672/")
# in case of SSL enablement
# ca_cert_file = ".ci/certs/ca_certificate.pem"
# client_cert = ".ci/certs/client_certificate.pem"
# client_key = ".ci/certs/client_key.pem"
# connection = Connection(
# "amqps://guest:guest@localhost:5671/",
# ssl_context=SslConfigurationContext(
# ca_cert=ca_cert_file,
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
# ),
# )
connection.dial()

return connection
Expand All @@ -76,15 +88,16 @@ def main() -> None:
queue_name = "example-queue"

print("connection to amqp server")
connection = create_connection()
environment = Environment()
connection = create_connection(environment)

management = connection.management()

management.declare_queue(StreamSpecification(name=queue_name))

addr_queue = AddressHelper.queue_address(queue_name)

consumer_connection = create_connection()
consumer_connection = create_connection(environment)

stream_filter_options = StreamOptions()
# can be first, last, next or an offset long
Expand Down Expand Up @@ -135,7 +148,7 @@ def main() -> None:
print("closing connections")
management.close()
print("after management closing")
connection.close()
environment.close()
print("after connection closing")


Expand Down
12 changes: 8 additions & 4 deletions examples/getting_started/reconnection_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Connection,
ConnectionClosed,
Consumer,
Environment,
Event,
ExchangeSpecification,
Management,
Expand All @@ -20,6 +21,8 @@
QuorumQueueSpecification,
)

environment = Environment()


# here we keep track of the objects we need to reconnect
@dataclass
Expand Down Expand Up @@ -118,8 +121,9 @@ def create_connection() -> Connection:
# "amqp://ha_tls-rabbit_node2-1:5602/",
# ]
# connection = Connection(uris=uris, on_disconnection_handler=on_disconnected)
connection = Connection(
uri="amqp://guest:guest@localhost:5672/",

connection = environment.connection(
url="amqp://guest:guest@localhost:5672/",
on_disconnection_handler=on_disconnection,
)
connection.dial()
Expand All @@ -146,7 +150,7 @@ def main() -> None:

print("declaring exchange and queue")
connection_configuration.management.declare_exchange(
ExchangeSpecification(name=exchange_name, arguments={})
ExchangeSpecification(name=exchange_name)
)

connection_configuration.management.declare_queue(
Expand Down Expand Up @@ -242,7 +246,7 @@ def main() -> None:
print("closing connections")
connection_configuration.management.close()
print("after management closing")
connection_configuration.connection.close()
environment.close()
print("after connection closing")


Expand Down
12 changes: 7 additions & 5 deletions examples/getting_started/tls_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
BindingSpecification,
ClientCert,
Connection,
Environment,
Event,
ExchangeSpecification,
Message,
Expand Down Expand Up @@ -62,12 +63,12 @@ def on_link_closed(self, event: Event) -> None:
print("link closed")


def create_connection() -> Connection:
def create_connection(environment: Environment) -> Connection:
# in case of SSL enablement
ca_cert_file = ".ci/certs/ca_certificate.pem"
client_cert = ".ci/certs/client_certificate.pem"
client_key = ".ci/certs/client_key.pem"
connection = Connection(
connection = environment.connection(
"amqps://guest:guest@localhost:5671/",
ssl_context=SslConfigurationContext(
ca_cert=ca_cert_file,
Expand All @@ -84,14 +85,15 @@ def main() -> None:
exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"
environment = Environment()

print("connection to amqp server")
connection = create_connection()
connection = create_connection(environment)

management = connection.management()

print("declaring exchange and queue")
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
management.declare_exchange(ExchangeSpecification(name=exchange_name))

management.declare_queue(
QuorumQueueSpecification(name=queue_name)
Expand Down Expand Up @@ -160,7 +162,7 @@ def main() -> None:
print("closing connections")
management.close()
print("after management closing")
connection.close()
environment.close()
print("after connection closing")


Expand Down
7 changes: 5 additions & 2 deletions rabbitmq_amqp_python_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
OffsetSpecification,
StreamOptions,
)
from .environment import Environment
from .exceptions import ArgumentOutOfRangeException
from .management import Management
from .publisher import Publisher
Expand Down Expand Up @@ -39,6 +40,8 @@

del metadata

OutcomeState = Disposition

__all__ = [
"Connection",
"Management",
Expand All @@ -61,9 +64,9 @@
"ArgumentOutOfRangeException",
"SslConfigurationContext",
"ClientCert",
"Delivery",
"ConnectionClosed",
"StreamOptions",
"OffsetSpecification",
"Disposition",
"OutcomeState",
"Environment",
]
9 changes: 8 additions & 1 deletion rabbitmq_amqp_python_client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ def __init__(
self._on_disconnection_handler = on_disconnection_handler
self._conf_ssl_context: Optional[SslConfigurationContext] = ssl_context
self._ssl_domain = None
self._connections = [] # type: ignore
self._index: int = -1

def _set_environment_connection_list(self, connections: []): # type: ignore
self._connections = connections

def dial(self) -> None:
logger.debug("Establishing a connection to the amqp server")
Expand Down Expand Up @@ -72,9 +77,11 @@ def management(self) -> Management:
return self._management

# closes the connection to the AMQP 1.0 server.
def close(self) -> None:
# This method should be called just from Environment and not from the user
def _close(self) -> None:
logger.debug("Closing connection")
self._conn.close()
self._connections.remove(self)

def publisher(self, destination: str) -> Publisher:
if validate_address(destination) is False:
Expand Down
6 changes: 3 additions & 3 deletions rabbitmq_amqp_python_client/entities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Optional, Union

Expand All @@ -13,7 +13,7 @@
@dataclass
class ExchangeSpecification:
name: str
arguments: dict[str, str]
arguments: dict[str, str] = field(default_factory=dict)
exchange_type: ExchangeType = ExchangeType.direct
is_auto_delete: bool = False
is_internal: bool = False
Expand All @@ -24,7 +24,7 @@ class ExchangeSpecification:
class QueueInfo:
name: str
arguments: dict[str, Any]
queue_type: QueueType = QueueType.quorum
queue_type: QueueType = QueueType.classic
is_exclusive: Optional[bool] = None
is_auto_delete: bool = False
is_durable: bool = True
Expand Down
46 changes: 46 additions & 0 deletions rabbitmq_amqp_python_client/environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# For the moment this is just a Connection pooler to keep compatibility with other clients
import logging
from typing import Annotated, Callable, Optional, TypeVar

from .connection import Connection
from .ssl_configuration import SslConfigurationContext

logger = logging.getLogger(__name__)

MT = TypeVar("MT")
CB = Annotated[Callable[[MT], None], "Message callback type"]


class Environment:

def __init__(self): # type: ignore

self._connections: list[Connection] = []

def connection(
self,
# single-node mode
uri: Optional[str] = None,
# multi-node mode
uris: Optional[list[str]] = None,
ssl_context: Optional[SslConfigurationContext] = None,
on_disconnection_handler: Optional[CB] = None, # type: ignore
) -> Connection:
connection = Connection(
uri=uri,
uris=uris,
ssl_context=ssl_context,
on_disconnection_handler=on_disconnection_handler,
)
logger.debug("Environment: Creating and returning a new connection")
self._connections.append(connection)
connection._set_environment_connection_list(self._connections)
return connection

def close(self) -> None:
logger.debug("Environment: Closing all pending connections")
for connection in self._connections:
connection._close()

def connections(self) -> list[Connection]:
return self._connections
2 changes: 2 additions & 0 deletions rabbitmq_amqp_python_client/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,4 +369,6 @@ def queue_info(self, name: str) -> QueueInfo:
leader=queue_info["leader"],
members=queue_info["replicas"],
arguments=queue_info["arguments"],
message_count=queue_info["message_count"],
consumer_count=queue_info["consumer_count"],
)
2 changes: 1 addition & 1 deletion rabbitmq_amqp_python_client/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ClassicQueueSpecification(QueueSpecification):

@dataclass
class QuorumQueueSpecification(QueueSpecification):
deliver_limit: Optional[str] = None
deliver_limit: Optional[int] = None
dead_letter_strategy: Optional[str] = None
quorum_initial_group_size: Optional[int] = None
cluster_target_size: Optional[int] = None
Expand Down
Loading