Skip to content

Commit 99b6020

Browse files
authored
feat(fcm) Add send_each_async and send_each_for_multicast_async for FCM async and HTTP/2 support (#882)
* Added minimal support for sending FCM messages in async using HTTP/2 (#870) * httpx async_send_each prototype * Clean up code and lint * fix: Add extra dependancy for http2 * fix: reset message batch limit to 500 * fix: Add new import to `setup.py` * Refactored retry config into `_retry.py` and added support for exponential backoff and `Retry-After` header (#871) * Refactored retry config to `_retry.py` and added support for backoff and Retry-After * Added unit tests for `_retry.py` * Updated unit tests for HTTPX request errors * Address review comments * Added `HttpxAsyncClient` wrapper for `httpx.AsyncClient` and support for `send_each_for_multicast_async()` (#878) * Refactored retry config to `_retry.py` and added support for backoff and Retry-After * Added unit tests for `_retry.py` * Updated unit tests for HTTPX request errors * Add HttpxAsyncClient to wrap httpx.AsyncClient * Added forced refresh to google auth credential flow and fixed lint * Added unit tests for `GoogleAuthCredentialFlow` and `HttpxAsyncClient` * Removed duplicate export * Added support for `send_each_for_multicast_async()` and updated doc string and type hints * Remove duplicate auth class * Cover auth request error case when `requests` request fails in HTTPX auth flow * Update test for `send_each_for_multicast_async()` * Address review comments * fix lint and some types * Address review comments and removed unused code * Update metric header test logic for `TestHttpxAsyncClient` * Add `send_each_for_multicast_async` to `__all__` * Apply suggestions from TW review
1 parent e0599f9 commit 99b6020

File tree

11 files changed

+1990
-141
lines changed

11 files changed

+1990
-141
lines changed

firebase_admin/_http_client.py

Lines changed: 208 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,23 @@
1414

1515
"""Internal HTTP client module.
1616
17-
This module provides utilities for making HTTP calls using the requests library.
18-
"""
19-
20-
from google.auth import transport
21-
import requests
17+
This module provides utilities for making HTTP calls using the requests library.
18+
"""
19+
20+
from __future__ import annotations
21+
import logging
22+
from typing import Any, Dict, Generator, Optional, Tuple, Union
23+
import httpx
24+
import requests.adapters
2225
from requests.packages.urllib3.util import retry # pylint: disable=import-error
26+
from google.auth import credentials
27+
from google.auth import transport
28+
from google.auth.transport import requests as google_auth_requests
2329

2430
from firebase_admin import _utils
31+
from firebase_admin._retry import HttpxRetry, HttpxRetryTransport
32+
33+
logger = logging.getLogger(__name__)
2534

2635
if hasattr(retry.Retry.DEFAULT, 'allowed_methods'):
2736
_ANY_METHOD = {'allowed_methods': None}
@@ -34,6 +43,9 @@
3443
connect=1, read=1, status=4, status_forcelist=[500, 503],
3544
raise_on_status=False, backoff_factor=0.5, **_ANY_METHOD)
3645

46+
DEFAULT_HTTPX_RETRY_CONFIG = HttpxRetry(
47+
max_retries=4, status_forcelist=[500, 503], backoff_factor=0.5)
48+
3749

3850
DEFAULT_TIMEOUT_SECONDS = 120
3951

@@ -144,7 +156,6 @@ def close(self):
144156
self._session.close()
145157
self._session = None
146158

147-
148159
class JsonHttpClient(HttpClient):
149160
"""An HTTP client that parses response messages as JSON."""
150161

@@ -153,3 +164,194 @@ def __init__(self, **kwargs):
153164

154165
def parse_body(self, resp):
155166
return resp.json()
167+
168+
class GoogleAuthCredentialFlow(httpx.Auth):
169+
"""Google Auth Credential Auth Flow"""
170+
def __init__(self, credential: credentials.Credentials):
171+
self._credential = credential
172+
self._max_refresh_attempts = 2
173+
self._refresh_status_codes = (401,)
174+
175+
def apply_auth_headers(
176+
self,
177+
request: httpx.Request,
178+
auth_request: google_auth_requests.Request
179+
) -> None:
180+
"""A helper function that refreshes credentials if needed and mutates the request headers
181+
to contain access token and any other Google Auth headers."""
182+
183+
logger.debug(
184+
'Attempting to apply auth headers. Credential validity before: %s',
185+
self._credential.valid
186+
)
187+
self._credential.before_request(
188+
auth_request, request.method, str(request.url), request.headers
189+
)
190+
logger.debug('Auth headers applied. Credential validity after: %s', self._credential.valid)
191+
192+
def auth_flow(self, request: httpx.Request) -> Generator[httpx.Request, httpx.Response, None]:
193+
_original_headers = request.headers.copy()
194+
_credential_refresh_attempt = 0
195+
196+
# Create a Google auth request object to be used for refreshing credentials
197+
auth_request = google_auth_requests.Request()
198+
199+
while True:
200+
# Copy original headers for each attempt
201+
request.headers = _original_headers.copy()
202+
203+
# Apply auth headers (which might include an implicit refresh if token is expired)
204+
self.apply_auth_headers(request, auth_request)
205+
206+
logger.debug(
207+
'Dispatching request, attempt %d of %d',
208+
_credential_refresh_attempt, self._max_refresh_attempts
209+
)
210+
response: httpx.Response = yield request
211+
212+
if response.status_code in self._refresh_status_codes:
213+
if _credential_refresh_attempt < self._max_refresh_attempts:
214+
logger.debug(
215+
'Received status %d. Attempting explicit credential refresh. \
216+
Attempt %d of %d.',
217+
response.status_code,
218+
_credential_refresh_attempt + 1,
219+
self._max_refresh_attempts
220+
)
221+
# Explicitly force a credentials refresh
222+
self._credential.refresh(auth_request)
223+
_credential_refresh_attempt += 1
224+
else:
225+
logger.debug(
226+
'Received status %d, but max auth refresh attempts (%d) reached. \
227+
Returning last response.',
228+
response.status_code, self._max_refresh_attempts
229+
)
230+
break
231+
else:
232+
# Status code is not one that requires a refresh, so break and return response
233+
logger.debug(
234+
'Status code %d does not require refresh. Returning response.',
235+
response.status_code
236+
)
237+
break
238+
# The last yielded response is automatically returned by httpx's auth flow.
239+
240+
class HttpxAsyncClient():
241+
"""Async HTTP client used to make HTTP/2 calls using HTTPX.
242+
243+
HttpxAsyncClient maintains an async HTTPX client, handles request authentication, and retries
244+
if necessary.
245+
"""
246+
def __init__(
247+
self,
248+
credential: Optional[credentials.Credentials] = None,
249+
base_url: str = '',
250+
headers: Optional[Union[httpx.Headers, Dict[str, str]]] = None,
251+
retry_config: HttpxRetry = DEFAULT_HTTPX_RETRY_CONFIG,
252+
timeout: int = DEFAULT_TIMEOUT_SECONDS,
253+
http2: bool = True
254+
) -> None:
255+
"""Creates a new HttpxAsyncClient instance from the provided arguments.
256+
257+
If a credential is provided, initializes a new async HTTPX client authorized with it.
258+
Otherwise, initializes a new unauthorized async HTTPX client.
259+
260+
Args:
261+
credential: A Google credential that can be used to authenticate requests (optional).
262+
base_url: A URL prefix to be added to all outgoing requests (optional).
263+
headers: A map of headers to be added to all outgoing requests (optional).
264+
retry_config: A HttpxRetry configuration. Default settings would retry up to 4 times for
265+
HTTP 500 and 503 errors (optional).
266+
timeout: HTTP timeout in seconds. Defaults to 120 seconds when not specified (optional).
267+
http2: A boolean indicating if HTTP/2 support should be enabled. Defaults to `True` when
268+
not specified (optional).
269+
"""
270+
self._base_url = base_url
271+
self._timeout = timeout
272+
self._headers = {**headers, **METRICS_HEADERS} if headers else {**METRICS_HEADERS}
273+
self._retry_config = retry_config
274+
275+
# Only set up retries on urls starting with 'http://' and 'https://'
276+
self._mounts = {
277+
'http://': HttpxRetryTransport(retry=self._retry_config, http2=http2),
278+
'https://': HttpxRetryTransport(retry=self._retry_config, http2=http2)
279+
}
280+
281+
if credential:
282+
self._async_client = httpx.AsyncClient(
283+
http2=http2,
284+
timeout=self._timeout,
285+
headers=self._headers,
286+
auth=GoogleAuthCredentialFlow(credential), # Add auth flow for credentials.
287+
mounts=self._mounts
288+
)
289+
else:
290+
self._async_client = httpx.AsyncClient(
291+
http2=http2,
292+
timeout=self._timeout,
293+
headers=self._headers,
294+
mounts=self._mounts
295+
)
296+
297+
@property
298+
def base_url(self):
299+
return self._base_url
300+
301+
@property
302+
def timeout(self):
303+
return self._timeout
304+
305+
@property
306+
def async_client(self):
307+
return self._async_client
308+
309+
async def request(self, method: str, url: str, **kwargs: Any) -> httpx.Response:
310+
"""Makes an HTTP call using the HTTPX library.
311+
312+
This is the sole entry point to the HTTPX library. All other helper methods in this
313+
class call this method to send HTTP requests out. Refer to
314+
https://www.python-httpx.org/api/ for more information on supported options
315+
and features.
316+
317+
Args:
318+
method: HTTP method name as a string (e.g. get, post).
319+
url: URL of the remote endpoint.
320+
**kwargs: An additional set of keyword arguments to be passed into the HTTPX API
321+
(e.g. json, params, timeout).
322+
323+
Returns:
324+
Response: An HTTPX response object.
325+
326+
Raises:
327+
HTTPError: Any HTTPX exceptions encountered while making the HTTP call.
328+
RequestException: Any requests exceptions encountered while making the HTTP call.
329+
"""
330+
if 'timeout' not in kwargs:
331+
kwargs['timeout'] = self.timeout
332+
resp = await self._async_client.request(method, self.base_url + url, **kwargs)
333+
return resp.raise_for_status()
334+
335+
async def headers(self, method: str, url: str, **kwargs: Any) -> httpx.Headers:
336+
resp = await self.request(method, url, **kwargs)
337+
return resp.headers
338+
339+
async def body_and_response(
340+
self, method: str, url: str, **kwargs: Any) -> Tuple[Any, httpx.Response]:
341+
resp = await self.request(method, url, **kwargs)
342+
return self.parse_body(resp), resp
343+
344+
async def body(self, method: str, url: str, **kwargs: Any) -> Any:
345+
resp = await self.request(method, url, **kwargs)
346+
return self.parse_body(resp)
347+
348+
async def headers_and_body(
349+
self, method: str, url: str, **kwargs: Any) -> Tuple[httpx.Headers, Any]:
350+
resp = await self.request(method, url, **kwargs)
351+
return resp.headers, self.parse_body(resp)
352+
353+
def parse_body(self, resp: httpx.Response) -> Any:
354+
return resp.json()
355+
356+
async def aclose(self) -> None:
357+
await self._async_client.aclose()

0 commit comments

Comments
 (0)