35
35
WinSslConfigurationContext ,
36
36
)
37
37
38
+ from .entities import OAuth2Options
39
+
38
40
logger = logging .getLogger (__name__ )
39
41
40
42
MT = TypeVar ("MT" )
@@ -60,6 +62,7 @@ def __init__(
60
62
ssl_context : Union [
61
63
PosixSslConfigurationContext , WinSslConfigurationContext , None
62
64
] = None ,
65
+ oauth2_options : Optional [OAuth2Options ] = None ,
63
66
recovery_configuration : RecoveryConfiguration = RecoveryConfiguration (),
64
67
):
65
68
"""
@@ -93,6 +96,7 @@ def __init__(
93
96
self ._index : int = - 1
94
97
self ._publishers : list [Publisher ] = []
95
98
self ._consumers : list [Consumer ] = []
99
+ self ._oauth2_options = oauth2_options
96
100
97
101
# Some recovery_configuration validation
98
102
if recovery_configuration .back_off_reconnect_interval < timedelta (seconds = 1 ):
@@ -109,19 +113,8 @@ def _set_environment_connection_list(self, connections: []): # type: ignore
109
113
def _open_connections (self , reconnect_handlers : bool = False ) -> None :
110
114
111
115
logger .debug ("inside connection._open_connections creating connection" )
112
- if self ._recovery_configuration .active_recovery is False :
113
- self ._conn = BlockingConnection (
114
- url = self ._addr ,
115
- urls = self ._addrs ,
116
- ssl_domain = self ._ssl_domain ,
117
- )
118
- else :
119
- self ._conn = BlockingConnection (
120
- url = self ._addr ,
121
- urls = self ._addrs ,
122
- ssl_domain = self ._ssl_domain ,
123
- on_disconnection_handler = self ._on_disconnection ,
124
- )
116
+
117
+ self ._create_connection ()
125
118
126
119
if reconnect_handlers is True :
127
120
logger .debug ("reconnecting managements, publishers and consumers handlers" )
@@ -137,6 +130,35 @@ def _open_connections(self, reconnect_handlers: bool = False) -> None:
137
130
# Update the broken connection and sender in the consumer
138
131
self ._consumers [i ]._update_connection (self ._conn )
139
132
133
+ def _create_connection (self ):
134
+
135
+ user = None
136
+ password = None
137
+
138
+ if self ._oauth2_options is not None :
139
+ user = ""
140
+ password = self ._oauth2_options .token
141
+
142
+ if self ._recovery_configuration .active_recovery is False :
143
+ self ._conn = BlockingConnection (
144
+ url = self ._addr ,
145
+ urls = self ._addrs ,
146
+ oauth2_options = self ._oauth2_options ,
147
+ ssl_domain = self ._ssl_domain ,
148
+ user = user ,
149
+ password = password ,
150
+ )
151
+ else :
152
+ self ._conn = BlockingConnection (
153
+ url = self ._addr ,
154
+ urls = self ._addrs ,
155
+ oauth2_options = self ._oauth2_options ,
156
+ ssl_domain = self ._ssl_domain ,
157
+ on_disconnection_handler = self ._on_disconnection ,
158
+ user = user ,
159
+ password = password ,
160
+ )
161
+
140
162
def dial (self ) -> None :
141
163
"""
142
164
Establish a connection to the AMQP server.
0 commit comments