Skip to content

Commit 72f5f1e

Browse files
make sure the permission client uses the proper retry handler (onyx-dot-app#4737)
* make sure the permission client uses the proper retry handler * fix client --------- Co-authored-by: Richard Kuo <rkuo@rkuo.com>
1 parent 96a0099 commit 72f5f1e

File tree

2 files changed

+64
-32
lines changed

2 files changed

+64
-32
lines changed

backend/ee/onyx/external_permissions/slack/doc_sync.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from onyx.connectors.slack.connector import SlackConnector
1313
from onyx.db.models import ConnectorCredentialPair
1414
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
15+
from onyx.redis.redis_pool import get_redis_client
1516
from onyx.utils.logger import setup_logger
1617
from shared_configs.contextvars import get_current_tenant_id
1718

@@ -99,18 +100,10 @@ def _fetch_channel_permissions(
99100

100101

101102
def _get_slack_document_access(
102-
cc_pair: ConnectorCredentialPair,
103+
slack_connector: SlackConnector,
103104
channel_permissions: dict[str, ExternalAccess],
104105
callback: IndexingHeartbeatInterface | None,
105106
) -> Generator[DocExternalAccess, None, None]:
106-
slack_connector = SlackConnector(**cc_pair.connector.connector_specific_config)
107-
108-
# Use credentials provider instead of directly loading credentials
109-
provider = OnyxDBCredentialsProvider(
110-
get_current_tenant_id(), "slack", cc_pair.credential.id
111-
)
112-
slack_connector.set_credentials_provider(provider)
113-
114107
slim_doc_generator = slack_connector.retrieve_all_slim_documents(callback=callback)
115108

116109
for doc_metadata_batch in slim_doc_generator:
@@ -141,9 +134,18 @@ def slack_doc_sync(
141134
it in postgres so that when it gets created later, the permissions are
142135
already populated
143136
"""
144-
slack_client = WebClient(
145-
token=cc_pair.credential.credential_json["slack_bot_token"]
137+
# Use credentials provider instead of directly loading credentials
138+
139+
tenant_id = get_current_tenant_id()
140+
provider = OnyxDBCredentialsProvider(tenant_id, "slack", cc_pair.credential.id)
141+
r = get_redis_client(tenant_id=tenant_id)
142+
slack_client = SlackConnector.make_slack_web_client(
143+
provider.get_provider_key(),
144+
cc_pair.credential.credential_json["slack_bot_token"],
145+
SlackConnector.MAX_RETRIES,
146+
r,
146147
)
148+
147149
user_id_to_email_map = fetch_user_id_to_email_map(slack_client)
148150
if not user_id_to_email_map:
149151
raise ValueError(
@@ -160,8 +162,11 @@ def slack_doc_sync(
160162
user_id_to_email_map=user_id_to_email_map,
161163
)
162164

165+
slack_connector = SlackConnector(**cc_pair.connector.connector_specific_config)
166+
slack_connector.set_credentials_provider(provider)
167+
163168
yield from _get_slack_document_access(
164-
cc_pair=cc_pair,
169+
slack_connector,
165170
channel_permissions=channel_permissions,
166171
callback=callback,
167172
)

backend/onyx/connectors/slack/connector.py

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from typing import cast
1414

1515
from pydantic import BaseModel
16+
from redis import Redis
1617
from slack_sdk import WebClient
1718
from slack_sdk.errors import SlackApiError
1819
from slack_sdk.http_retry import ConnectionErrorRetryHandler
@@ -538,8 +539,44 @@ def __init__(
538539
self.user_cache: dict[str, BasicExpertInfo | None] = {}
539540
self.credentials_provider: CredentialsProviderInterface | None = None
540541
self.credential_prefix: str | None = None
541-
self.delay_lock: str | None = None # the redis key for the shared lock
542-
self.delay_key: str | None = None # the redis key for the shared delay
542+
# self.delay_lock: str | None = None # the redis key for the shared lock
543+
# self.delay_key: str | None = None # the redis key for the shared delay
544+
545+
@staticmethod
546+
def make_credential_prefix(key: str) -> str:
547+
return f"connector:slack:credential_{key}"
548+
549+
@staticmethod
550+
def make_delay_lock(prefix: str) -> str:
551+
return f"{prefix}:delay_lock"
552+
553+
@staticmethod
554+
def make_delay_key(prefix: str) -> str:
555+
return f"{prefix}:delay"
556+
557+
@staticmethod
558+
def make_slack_web_client(
559+
prefix: str, token: str, max_retry_count: int, r: Redis
560+
) -> WebClient:
561+
delay_lock = SlackConnector.make_delay_lock(prefix)
562+
delay_key = SlackConnector.make_delay_key(prefix)
563+
564+
# NOTE: slack has a built in RateLimitErrorRetryHandler, but it isn't designed
565+
# for concurrent workers. We've extended it with OnyxRedisSlackRetryHandler.
566+
connection_error_retry_handler = ConnectionErrorRetryHandler()
567+
onyx_rate_limit_error_retry_handler = OnyxRedisSlackRetryHandler(
568+
max_retry_count=max_retry_count,
569+
delay_lock=delay_lock,
570+
delay_key=delay_key,
571+
r=r,
572+
)
573+
custom_retry_handlers: list[RetryHandler] = [
574+
connection_error_retry_handler,
575+
onyx_rate_limit_error_retry_handler,
576+
]
577+
578+
client = WebClient(token=token, retry_handlers=custom_retry_handlers)
579+
return client
543580

544581
@property
545582
def channels(self) -> list[str] | None:
@@ -559,30 +596,20 @@ def set_credentials_provider(
559596
) -> None:
560597
credentials = credentials_provider.get_credentials()
561598
tenant_id = credentials_provider.get_tenant_id()
599+
if not tenant_id:
600+
raise ValueError("tenant_id cannot be None!")
601+
562602
self.redis = get_redis_client(tenant_id=tenant_id)
563603

564-
self.credential_prefix = (
565-
f"connector:slack:credential_{credentials_provider.get_provider_key()}"
604+
self.credential_prefix = SlackConnector.make_credential_prefix(
605+
credentials_provider.get_provider_key()
566606
)
567-
self.delay_lock = f"{self.credential_prefix}:delay_lock"
568-
self.delay_key = f"{self.credential_prefix}:delay"
569607

570-
# NOTE: slack has a built in RateLimitErrorRetryHandler, but it isn't designed
571-
# for concurrent workers. We've extended it with OnyxRedisSlackRetryHandler.
572-
connection_error_retry_handler = ConnectionErrorRetryHandler()
573-
onyx_rate_limit_error_retry_handler = OnyxRedisSlackRetryHandler(
574-
max_retry_count=self.MAX_RETRIES,
575-
delay_lock=self.delay_lock,
576-
delay_key=self.delay_key,
577-
r=self.redis,
608+
bot_token = credentials["slack_bot_token"]
609+
self.client = SlackConnector.make_slack_web_client(
610+
self.credential_prefix, bot_token, self.MAX_RETRIES, self.redis
578611
)
579-
custom_retry_handlers: list[RetryHandler] = [
580-
connection_error_retry_handler,
581-
onyx_rate_limit_error_retry_handler,
582-
]
583612

584-
bot_token = credentials["slack_bot_token"]
585-
self.client = WebClient(token=bot_token, retry_handlers=custom_retry_handlers)
586613
# use for requests that must return quickly (e.g. realtime flows where user is waiting)
587614
self.fast_client = WebClient(
588615
token=bot_token, timeout=SlackConnector.FAST_TIMEOUT

0 commit comments

Comments
 (0)