Skip to content

Commit 797b6f3

Browse files
author
DanielePalaia
committed
refresh_token
1 parent c3a35ea commit 797b6f3

File tree

4 files changed

+72
-16
lines changed

4 files changed

+72
-16
lines changed

rabbitmq_amqp_python_client/common.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class CommonValues(enum.Enum):
1818
key = "key"
1919
queue = "queues"
2020
bindings = "bindings"
21+
path_tokens = "/auth/tokens"
2122

2223

2324
class ExchangeType(enum.Enum):

rabbitmq_amqp_python_client/connection.py

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -134,25 +134,24 @@ def _open_connections(self, reconnect_handlers: bool = False) -> None:
134134

135135
def _create_connection(self) -> None:
136136

137-
user = None
138-
password = None
139-
mechs = None
137+
self._user = None
138+
self._password = None
139+
self._mechs = None
140140

141141
if self._oauth2_options is not None:
142-
user = "no"
143-
password = self._oauth2_options.token
144-
mechs = "PLAIN"
145-
print("password, mechs: " + user + " " + password)
142+
self._user = "no"
143+
self._password = self._oauth2_options.token
144+
self._mechs = "PLAIN"
146145

147146
if self._recovery_configuration.active_recovery is False:
148147
self._conn = BlockingConnection(
149148
url=self._addr,
150149
urls=self._addrs,
151150
oauth2_options=self._oauth2_options,
152151
ssl_domain=self._ssl_domain,
153-
allowed_mechs=mechs,
154-
user=user,
155-
password=password,
152+
allowed_mechs=self._mechs,
153+
user=self._user,
154+
password=self._password,
156155
)
157156
else:
158157
self._conn = BlockingConnection(
@@ -161,9 +160,9 @@ def _create_connection(self) -> None:
161160
oauth2_options=self._oauth2_options,
162161
ssl_domain=self._ssl_domain,
163162
on_disconnection_handler=self._on_disconnection,
164-
allowed_mechs=mechs,
165-
user=user,
166-
password=password,
163+
allowed_mechs=self._mechs,
164+
user=self._user,
165+
password=self._password,
167166
)
168167

169168
def dial(self) -> None:
@@ -383,3 +382,25 @@ def active_producers(self) -> int:
383382
def active_consumers(self) -> int:
384383
"""Returns the number of active consumers"""
385384
return len(self._consumers)
385+
386+
def refresh_token(self, token: str) -> None:
387+
"""
388+
Refresh the oauth token
389+
390+
Args:
391+
token: the oauth token to refresh
392+
393+
Raises:
394+
ValidationCodeException: If oauth is not enabled
395+
"""
396+
397+
if self._oauth2_options is None:
398+
raise ValidationCodeException("the connection is not oauth enabled")
399+
400+
# update credentials (for reconnection)
401+
self._user = "no"
402+
self._password = self._oauth2_options.token
403+
self._mechs = "PLAIN"
404+
405+
management = self.management()
406+
management.refresh_token(token)

rabbitmq_amqp_python_client/management.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,3 +573,14 @@ def queue_info(self, name: str) -> QueueInfo:
573573
message_count=queue_info["message_count"],
574574
consumer_count=queue_info["consumer_count"],
575575
)
576+
577+
def refresh_token(self, token: str) -> None:
578+
579+
self.request(
580+
token.encode(),
581+
CommonValues.path_tokens.value,
582+
CommonValues.command_put.value,
583+
[
584+
CommonValues.response_code_204.value,
585+
],
586+
)

tests/test_connection.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import time
2-
from datetime import timedelta
2+
from datetime import datetime, timedelta
33

44
from rabbitmq_amqp_python_client import (
55
ConnectionClosed,
@@ -11,6 +11,7 @@
1111
)
1212

1313
from .http_requests import delete_all_connections
14+
from .utils import token
1415

1516

1617
def on_disconnected():
@@ -44,7 +45,7 @@ def test_connection_ssl(ssl_context) -> None:
4445
environment.close()
4546

4647

47-
def test_connection_auth(environment_auth: Environment) -> None:
48+
def test_connection_oauth(environment_auth: Environment) -> None:
4849

4950
connection = environment_auth.connection()
5051
connection.dial()
@@ -54,7 +55,7 @@ def test_connection_auth(environment_auth: Environment) -> None:
5455
connection.close()
5556

5657

57-
def test_connection_auth_with_timeout(environment_auth: Environment) -> None:
58+
def test_connection_oauth_with_timeout(environment_auth: Environment) -> None:
5859

5960
connection = environment_auth.connection()
6061
connection.dial()
@@ -65,10 +66,32 @@ def test_connection_auth_with_timeout(environment_auth: Environment) -> None:
6566
try:
6667
management = connection.management()
6768
management.declare_queue(QuorumQueueSpecification(name="test-queue"))
69+
management.close()
6870
except Exception:
6971
raised = True
7072

7173
assert raised is True
74+
75+
connection.close()
76+
77+
78+
def test_connection_oauth_refresh_token(environment_auth: Environment) -> None:
79+
80+
connection = environment_auth.connection()
81+
connection.dial()
82+
# let the token expire
83+
time.sleep(1)
84+
raised = False
85+
# token expired, refresh
86+
connection.refresh_token(token(datetime.now() + timedelta(milliseconds=5000)))
87+
time.sleep(3)
88+
try:
89+
management = connection.management()
90+
management.declare_queue(QuorumQueueSpecification(name="test-queue"))
91+
except Exception:
92+
raised = True
93+
94+
assert raised is False
7295
connection.close()
7396

7497

0 commit comments

Comments
 (0)