Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions backend/ee/onyx/external_permissions/slack/doc_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from onyx.connectors.slack.connector import SlackConnector
from onyx.db.models import ConnectorCredentialPair
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.redis.redis_pool import get_redis_client
from onyx.utils.logger import setup_logger
from shared_configs.contextvars import get_current_tenant_id

Expand Down Expand Up @@ -99,18 +100,10 @@ def _fetch_channel_permissions(


def _get_slack_document_access(
cc_pair: ConnectorCredentialPair,
slack_connector: SlackConnector,
channel_permissions: dict[str, ExternalAccess],
callback: IndexingHeartbeatInterface | None,
) -> Generator[DocExternalAccess, None, None]:
slack_connector = SlackConnector(**cc_pair.connector.connector_specific_config)

# Use credentials provider instead of directly loading credentials
provider = OnyxDBCredentialsProvider(
get_current_tenant_id(), "slack", cc_pair.credential.id
)
slack_connector.set_credentials_provider(provider)

slim_doc_generator = slack_connector.retrieve_all_slim_documents(callback=callback)

for doc_metadata_batch in slim_doc_generator:
Expand Down Expand Up @@ -141,6 +134,17 @@ def slack_doc_sync(
it in postgres so that when it gets created later, the permissions are
already populated
"""
# Use credentials provider instead of directly loading credentials

tenant_id = get_current_tenant_id()
provider = OnyxDBCredentialsProvider(tenant_id, "slack", cc_pair.credential.id)
r = get_redis_client(tenant_id=tenant_id)
slack_client = SlackConnector.make_slack_web_client(
provider.get_provider_key(),
cc_pair.credential.credential_json["slack_bot_token"],
SlackConnector.MAX_RETRIES,
r,
)
slack_client = WebClient(
token=cc_pair.credential.credential_json["slack_bot_token"]
)
Expand All @@ -160,8 +164,11 @@ def slack_doc_sync(
user_id_to_email_map=user_id_to_email_map,
)

slack_connector = SlackConnector(**cc_pair.connector.connector_specific_config)
slack_connector.set_credentials_provider(provider)

yield from _get_slack_document_access(
cc_pair=cc_pair,
slack_connector,
channel_permissions=channel_permissions,
callback=callback,
)
67 changes: 47 additions & 20 deletions backend/onyx/connectors/slack/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import cast

from pydantic import BaseModel
from redis import Redis
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from slack_sdk.http_retry import ConnectionErrorRetryHandler
Expand Down Expand Up @@ -538,8 +539,44 @@ def __init__(
self.user_cache: dict[str, BasicExpertInfo | None] = {}
self.credentials_provider: CredentialsProviderInterface | None = None
self.credential_prefix: str | None = None
self.delay_lock: str | None = None # the redis key for the shared lock
self.delay_key: str | None = None # the redis key for the shared delay
# self.delay_lock: str | None = None # the redis key for the shared lock
# self.delay_key: str | None = None # the redis key for the shared delay

@staticmethod
def make_credential_prefix(key: str) -> str:
return f"connector:slack:credential_{key}"

@staticmethod
def make_delay_lock(prefix: str) -> str:
return f"{prefix}:delay_lock"

@staticmethod
def make_delay_key(prefix: str) -> str:
return f"{prefix}:delay"

@staticmethod
def make_slack_web_client(
prefix: str, token: str, max_retry_count: int, r: Redis
) -> WebClient:
delay_lock = SlackConnector.make_delay_lock(prefix)
delay_key = SlackConnector.make_delay_key(prefix)

# NOTE: slack has a built in RateLimitErrorRetryHandler, but it isn't designed
# for concurrent workers. We've extended it with OnyxRedisSlackRetryHandler.
connection_error_retry_handler = ConnectionErrorRetryHandler()
onyx_rate_limit_error_retry_handler = OnyxRedisSlackRetryHandler(
max_retry_count=max_retry_count,
delay_lock=delay_lock,
delay_key=delay_key,
r=r,
)
custom_retry_handlers: list[RetryHandler] = [
connection_error_retry_handler,
onyx_rate_limit_error_retry_handler,
]

client = WebClient(token=token, retry_handlers=custom_retry_handlers)
return client

@property
def channels(self) -> list[str] | None:
Expand All @@ -559,30 +596,20 @@ def set_credentials_provider(
) -> None:
credentials = credentials_provider.get_credentials()
tenant_id = credentials_provider.get_tenant_id()
if not tenant_id:
raise ValueError("tenant_id cannot be None!")

self.redis = get_redis_client(tenant_id=tenant_id)

self.credential_prefix = (
f"connector:slack:credential_{credentials_provider.get_provider_key()}"
self.credential_prefix = SlackConnector.make_credential_prefix(
credentials_provider.get_provider_key()
)
self.delay_lock = f"{self.credential_prefix}:delay_lock"
self.delay_key = f"{self.credential_prefix}:delay"

# NOTE: slack has a built in RateLimitErrorRetryHandler, but it isn't designed
# for concurrent workers. We've extended it with OnyxRedisSlackRetryHandler.
connection_error_retry_handler = ConnectionErrorRetryHandler()
onyx_rate_limit_error_retry_handler = OnyxRedisSlackRetryHandler(
max_retry_count=self.MAX_RETRIES,
delay_lock=self.delay_lock,
delay_key=self.delay_key,
r=self.redis,
bot_token = credentials["slack_bot_token"]
self.client = SlackConnector.make_slack_web_client(
self.credential_prefix, bot_token, self.MAX_RETRIES, self.redis
)
custom_retry_handlers: list[RetryHandler] = [
connection_error_retry_handler,
onyx_rate_limit_error_retry_handler,
]

bot_token = credentials["slack_bot_token"]
self.client = WebClient(token=bot_token, retry_handlers=custom_retry_handlers)
# use for requests that must return quickly (e.g. realtime flows where user is waiting)
self.fast_client = WebClient(
token=bot_token, timeout=SlackConnector.FAST_TIMEOUT
Expand Down
Loading