Skip to content

Commit b88894a

Browse files
author
DanielePalaia
committed
adding closing connections logic
1 parent f2807b3 commit b88894a

File tree

5 files changed

+52
-5
lines changed

5 files changed

+52
-5
lines changed

rabbitmq_amqp_python_client/connection.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ def __init__(
3737
self._on_disconnection_handler = on_disconnection_handler
3838
self._conf_ssl_context: Optional[SslConfigurationContext] = ssl_context
3939
self._ssl_domain = None
40+
self._connections = [] # type: ignore
41+
self._index: int = -1
42+
43+
def _set_environment_connection_list(self, connections: []): # type: ignore
44+
self._connections = connections
4045

4146
def dial(self) -> None:
4247
logger.debug("Establishing a connection to the amqp server")
@@ -72,9 +77,11 @@ def management(self) -> Management:
7277
return self._management
7378

7479
# closes the connection to the AMQP 1.0 server.
80+
# This method should be called just from Environment and not from the user
7581
def _close(self) -> None:
7682
logger.debug("Closing connection")
7783
self._conn.close()
84+
self._connections.remove(self)
7885

7986
def publisher(self, destination: str) -> Publisher:
8087
if validate_address(destination) is False:

rabbitmq_amqp_python_client/environment.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
# For the moment this is just a Connection pooler to keep compatibility with other clients
2+
import logging
13
from typing import Annotated, Callable, Optional, TypeVar
24

35
from .connection import Connection
46
from .ssl_configuration import SslConfigurationContext
57

8+
logger = logging.getLogger(__name__)
9+
610
MT = TypeVar("MT")
711
CB = Annotated[Callable[[MT], None], "Message callback type"]
812

@@ -11,7 +15,7 @@ class Environment:
1115

1216
def __init__(self): # type: ignore
1317

14-
self._connections = []
18+
self._connections: list[Connection] = []
1519

1620
def connection(
1721
self,
@@ -28,10 +32,15 @@ def connection(
2832
ssl_context=ssl_context,
2933
on_disconnection_handler=on_disconnection_handler,
3034
)
31-
35+
logger.debug("Environment: Creating and returning a new connection")
3236
self._connections.append(connection)
37+
connection._set_environment_connection_list(self._connections)
3338
return connection
3439

3540
def close(self) -> None:
41+
logger.debug("Environment: Closing all pending connections")
3642
for connection in self._connections:
3743
connection._close()
44+
45+
def connections(self) -> list[Connection]:
46+
return self._connections

rabbitmq_amqp_python_client/qpid/proton/_tracing.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232

3333
import proton
3434
from proton import Sender as ProtonSender
35-
from proton.handlers import IncomingMessageHandler as ProtonIncomingMessageHandler
36-
from proton.handlers import OutgoingMessageHandler as ProtonOutgoingMessageHandler
35+
from proton.handlers import \
36+
IncomingMessageHandler as ProtonIncomingMessageHandler
37+
from proton.handlers import \
38+
OutgoingMessageHandler as ProtonOutgoingMessageHandler
3739

3840
_tracer = None
3941
_trace_key = proton.symbol("x-opt-qpid-tracestate")

rabbitmq_amqp_python_client/qpid/proton/_transport.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@
139139

140140
if TYPE_CHECKING:
141141
from ._condition import Condition
142-
from ._endpoints import Connection # would produce circular import
142+
from ._endpoints import \
143+
Connection # would produce circular import
143144

144145

145146
class TraceAdapter:

tests/test_connection.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,34 @@ def test_connection_ssl() -> None:
4343
environment.close()
4444

4545

46+
def test_environment_connections_management() -> None:
47+
48+
environment = Environment()
49+
connection = environment.connection("amqp://guest:guest@localhost:5672/")
50+
connection.dial()
51+
connection2 = environment.connection("amqp://guest:guest@localhost:5672/")
52+
connection2.dial()
53+
connection3 = environment.connection("amqp://guest:guest@localhost:5672/")
54+
connection3.dial()
55+
56+
assert len(environment.connections()) == 3
57+
58+
# this shouldn't happen but we test it anyway
59+
connection._close()
60+
61+
assert len(environment.connections()) == 2
62+
63+
connection2._close()
64+
65+
assert len(environment.connections()) == 1
66+
67+
connection3._close()
68+
69+
assert len(environment.connections()) == 0
70+
71+
environment.close()
72+
73+
4674
def test_connection_reconnection() -> None:
4775

4876
reconnected = False

0 commit comments

Comments
 (0)