Skip to content

Commit 259ec58

Browse files
author
Richard Kuo (Onyx)
committed
code review
1 parent 73b9148 commit 259ec58

File tree

3 files changed

+13
-12
lines changed

3 files changed

+13
-12
lines changed

backend/onyx/connectors/slack/onyx_retry_handler.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,9 @@ class OnyxRedisSlackRetryHandler(RetryHandler):
1717
"""
1818
This class uses Redis to share a rate limit among multiple threads.
1919
20-
Threads that encounter a rate limit will observe the shared delay, increment the
21-
shared delay with the retry value, and use the new shared value as a wait interval.
22-
23-
This has the effect of serializing calls when a rate limit is hit, which is what
24-
needs to happens if the server punishes us with additional limiting when we make
25-
a call too early. We believe this is what Slack is doing based on empirical
26-
observation, meaning we see indefinite hangs if we're too aggressive.
20+
This just sets the desired retry delay with TTL in redis. In conjunction with
21+
a custom subclass of the client, the value is read and obeyed prior to an API call
22+
and also serialized.
2723
2824
Another way to do this is just to do exponential backoff. Might be easier?
2925

backend/onyx/connectors/slack/onyx_slack_web_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ class OnyxSlackWebClient(WebClient):
3030
def __init__(
3131
self, delay_lock: str, delay_key: str, r: Redis, *args: Any, **kwargs: Any
3232
) -> None:
33-
""""""
3433
super().__init__(*args, **kwargs)
3534
self._delay_key = delay_key
3635
self._delay_lock = delay_lock
@@ -56,7 +55,6 @@ def _perform_urllib_http_request(
5655
start = time.monotonic()
5756
while True:
5857
acquired = lock.acquire(blocking_timeout=ONYX_SLACK_LOCK_BLOCKING_TIMEOUT)
59-
6058
if acquired:
6159
break
6260

@@ -93,8 +91,10 @@ def _perform_urllib_http_request_internal(
9391
url: str,
9492
req: Request,
9593
) -> Dict[str, Any]:
94+
"""Overrides the internal method which is mostly the direct call to
95+
urllib/urlopen ... so this is a good place to perform our delay."""
9696

97-
# if we can get the lock, then read and extend the ttl
97+
# read and execute the delay
9898
ttl_ms = cast(int, self._redis.pttl(self._delay_key))
9999
if ttl_ms < 0: # negative values are error status codes ... see docs
100100
ttl_ms = 0

backend/tests/integration/connector_job_tests/slack/slack_api_utils.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,13 @@ def get_and_provision_available_slack_channels(
216216

217217
@staticmethod
218218
def build_slack_user_email_id_map(slack_client: WebClient) -> dict[str, str]:
219-
users_results = slack_client.users_list()
220-
users: list[dict[str, Any]] = users_results.get("members", [])
219+
users: list[dict[str, Any]] = []
220+
221+
for users_results in make_paginated_slack_api_call(
222+
slack_client.users_list,
223+
):
224+
users.extend(users_results.get("members", []))
225+
221226
user_email_id_map = {}
222227
for user in users:
223228
if not (email := user.get("profile", {}).get("email")):

0 commit comments

Comments
 (0)