-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Added confluence permission syncing #2537
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
0874927
b9ab7ac
d84048f
5b55ce2
3e44719
bb79b88
8ddccda
b864f48
120aac4
c3ab8ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
"""Change primary key of user__external_user_group_id | ||
Revision ID: 46b7a812670f | ||
Revises: bd2921608c3a | ||
Create Date: 2024-09-23 12:58:03.894038 | ||
""" | ||
from alembic import op | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "46b7a812670f" | ||
down_revision = "bd2921608c3a" | ||
branch_labels = None | ||
depends_on = None | ||
|
||
|
||
def upgrade() -> None: | ||
# Drop the existing primary key | ||
op.drop_constraint( | ||
"user__external_user_group_id_pkey", | ||
"user__external_user_group_id", | ||
type_="primary", | ||
) | ||
|
||
# Add the new composite primary key | ||
op.create_primary_key( | ||
"user__external_user_group_id_pkey", | ||
"user__external_user_group_id", | ||
["user_id", "external_user_group_id"], | ||
) | ||
|
||
|
||
def downgrade() -> None: | ||
hagen-danswer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Drop the composite primary key | ||
op.drop_constraint( | ||
"user__external_user_group_id_pkey", | ||
"user__external_user_group_id", | ||
type_="primary", | ||
) | ||
|
||
# Recreate the original primary key on user_id | ||
op.create_primary_key( | ||
"user__external_user_group_id_pkey", "user__external_user_group_id", ["user_id"] | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
import bs4 | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I exported this function into a utils file so when I generate the id in the permission sync and when i generate the id in the connector, they match. Maybe even more of the logic can be exported to here to make sure they match but this is it for now |
||
def generate_confluence_document_id(base_url: str, content_url: str) -> str: | ||
hagen-danswer marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
return f"{base_url}{content_url}" | ||
|
||
|
||
def get_used_attachments(text: str) -> list[str]: | ||
"""Parse a Confluence html page to generate a list of current | ||
attachment in used | ||
Args: | ||
text (str): The page content | ||
Returns: | ||
list[str]: List of filename currently in used | ||
hagen-danswer marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
""" | ||
files_in_used = [] | ||
soup = bs4.BeautifulSoup(text, "html.parser") | ||
for attachment in soup.findAll("ri:attachment"): | ||
files_in_used.append(attachment.attrs["ri:filename"]) | ||
return files_in_used |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,10 @@ | |
from danswer.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE | ||
from danswer.configs.app_configs import INDEX_BATCH_SIZE | ||
from danswer.configs.constants import DocumentSource | ||
from danswer.connectors.confluence.confluence_utils import ( | ||
generate_confluence_document_id, | ||
) | ||
from danswer.connectors.confluence.confluence_utils import get_used_attachments | ||
from danswer.connectors.confluence.rate_limit_handler import ( | ||
make_confluence_call_handle_rate_limit, | ||
) | ||
|
@@ -105,24 +109,6 @@ def parse_html_page(text: str, confluence_client: Confluence) -> str: | |
return format_document_soup(soup) | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved to utils file |
||
def get_used_attachments(text: str, confluence_client: Confluence) -> list[str]: | ||
"""Parse a Confluence html page to generate a list of current | ||
attachment in used | ||
|
||
Args: | ||
text (str): The page content | ||
confluence_client (Confluence): Confluence client | ||
|
||
Returns: | ||
list[str]: List of filename currently in used | ||
""" | ||
files_in_used = [] | ||
soup = bs4.BeautifulSoup(text, "html.parser") | ||
for attachment in soup.findAll("ri:attachment"): | ||
files_in_used.append(attachment.attrs["ri:filename"]) | ||
return files_in_used | ||
|
||
|
||
def _comment_dfs( | ||
comments_str: str, | ||
comment_pages: Collection[dict[str, Any]], | ||
|
@@ -624,13 +610,16 @@ def _get_doc_batch( | |
page_html = ( | ||
page["body"].get("storage", page["body"].get("view", {})).get("value") | ||
) | ||
page_url = self.wiki_base + page["_links"]["webui"] | ||
# The url and the id are the same | ||
page_url = generate_confluence_document_id( | ||
self.wiki_base, page["_links"]["webui"] | ||
) | ||
if not page_html: | ||
logger.debug("Page is empty, skipping: %s", page_url) | ||
continue | ||
page_text = parse_html_page(page_html, self.confluence_client) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed unused argument from function and function calls |
||
files_in_used = get_used_attachments(page_html, self.confluence_client) | ||
files_in_used = get_used_attachments(page_html) | ||
attachment_text, unused_page_attachments = self._fetch_attachments( | ||
self.confluence_client, page_id, files_in_used | ||
) | ||
|
@@ -683,8 +672,9 @@ def _get_attachment_batch( | |
if time_filter and not time_filter(last_updated): | ||
continue | ||
|
||
attachment_url = self._attachment_to_download_link( | ||
self.confluence_client, attachment | ||
# The url and the id are the same | ||
attachment_url = generate_confluence_document_id( | ||
self.wiki_base, attachment["_links"]["download"] | ||
) | ||
attachment_content = self._attachment_to_content( | ||
self.confluence_client, attachment | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,6 @@ | ||
from sqlalchemy.orm import Session | ||
|
||
from danswer.db.search_settings import get_current_search_settings | ||
from danswer.document_index.interfaces import DocumentIndex | ||
from danswer.document_index.vespa.index import VespaIndex | ||
|
||
|
@@ -13,3 +16,11 @@ def get_default_document_index( | |
return VespaIndex( | ||
index_name=primary_index_name, secondary_index_name=secondary_index_name | ||
) | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. they may be able to be used elsewhere |
||
def get_current_primary_default_document_index(db_session: Session) -> DocumentIndex: | ||
search_settings = get_current_search_settings(db_session) | ||
hagen-danswer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return get_default_document_index( | ||
primary_index_name=search_settings.index_name, | ||
secondary_index_name=None, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -220,8 +220,8 @@ def index_doc_batch_prepare( | |
|
||
document_ids = [document.id for document in documents] | ||
db_docs: list[DBDocument] = get_documents_by_ids( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixing the calling order wherever i can to make the codebase uniform :p |
||
document_ids=document_ids, | ||
db_session=db_session, | ||
document_ids=document_ids, | ||
) | ||
|
||
# Skip indexing docs that don't have a newer updated at | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,12 +11,25 @@ | |
from danswer.utils.logger import setup_logger | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Everything in this module is just refactoring the group and doc permission syncing to be seperate |
||
from danswer.utils.variable_functionality import global_version | ||
from ee.danswer.background.celery_utils import should_perform_chat_ttl_check | ||
from ee.danswer.background.celery_utils import should_perform_external_permissions_check | ||
from ee.danswer.background.celery_utils import ( | ||
should_perform_external_doc_permissions_check, | ||
) | ||
from ee.danswer.background.celery_utils import ( | ||
should_perform_external_group_permissions_check, | ||
) | ||
from ee.danswer.background.task_name_builders import name_chat_ttl_task | ||
from ee.danswer.background.task_name_builders import name_sync_external_permissions_task | ||
from ee.danswer.background.task_name_builders import ( | ||
name_sync_external_doc_permissions_task, | ||
) | ||
from ee.danswer.background.task_name_builders import ( | ||
name_sync_external_group_permissions_task, | ||
) | ||
from ee.danswer.db.connector_credential_pair import get_all_auto_sync_cc_pairs | ||
from ee.danswer.external_permissions.permission_sync import ( | ||
run_permission_sync_entrypoint, | ||
run_external_doc_permission_sync, | ||
) | ||
from ee.danswer.external_permissions.permission_sync import ( | ||
run_external_group_permission_sync, | ||
) | ||
from ee.danswer.server.reporting.usage_export_generation import create_new_usage_report | ||
|
||
|
@@ -26,11 +39,18 @@ | |
global_version.set_ee() | ||
|
||
|
||
@build_celery_task_wrapper(name_sync_external_permissions_task) | ||
@build_celery_task_wrapper(name_sync_external_doc_permissions_task) | ||
@celery_app.task(soft_time_limit=JOB_TIMEOUT) | ||
def sync_external_permissions_task(cc_pair_id: int) -> None: | ||
def sync_external_doc_permissions_task(cc_pair_id: int) -> None: | ||
with Session(get_sqlalchemy_engine()) as db_session: | ||
run_permission_sync_entrypoint(db_session=db_session, cc_pair_id=cc_pair_id) | ||
run_external_doc_permission_sync(db_session=db_session, cc_pair_id=cc_pair_id) | ||
|
||
|
||
@build_celery_task_wrapper(name_sync_external_group_permissions_task) | ||
@celery_app.task(soft_time_limit=JOB_TIMEOUT) | ||
def sync_external_group_permissions_task(cc_pair_id: int) -> None: | ||
with Session(get_sqlalchemy_engine()) as db_session: | ||
run_external_group_permission_sync(db_session=db_session, cc_pair_id=cc_pair_id) | ||
|
||
|
||
@build_celery_task_wrapper(name_chat_ttl_task) | ||
|
@@ -44,18 +64,35 @@ def perform_ttl_management_task(retention_limit_days: int) -> None: | |
# Periodic Tasks | ||
##### | ||
@celery_app.task( | ||
name="check_sync_external_permissions_task", | ||
name="check_sync_external_doc_permissions_task", | ||
soft_time_limit=JOB_TIMEOUT, | ||
) | ||
def check_sync_external_permissions_task() -> None: | ||
def check_sync_external_doc_permissions_task() -> None: | ||
"""Runs periodically to sync external permissions""" | ||
with Session(get_sqlalchemy_engine()) as db_session: | ||
cc_pairs = get_all_auto_sync_cc_pairs(db_session) | ||
for cc_pair in cc_pairs: | ||
if should_perform_external_permissions_check( | ||
if should_perform_external_doc_permissions_check( | ||
cc_pair=cc_pair, db_session=db_session | ||
): | ||
sync_external_permissions_task.apply_async( | ||
sync_external_doc_permissions_task.apply_async( | ||
kwargs=dict(cc_pair_id=cc_pair.id), | ||
) | ||
|
||
|
||
@celery_app.task( | ||
name="check_sync_external_group_permissions_task", | ||
soft_time_limit=JOB_TIMEOUT, | ||
) | ||
def check_sync_external_group_permissions_task() -> None: | ||
"""Runs periodically to sync external group permissions""" | ||
with Session(get_sqlalchemy_engine()) as db_session: | ||
cc_pairs = get_all_auto_sync_cc_pairs(db_session) | ||
for cc_pair in cc_pairs: | ||
if should_perform_external_group_permissions_check( | ||
cc_pair=cc_pair, db_session=db_session | ||
): | ||
sync_external_group_permissions_task.apply_async( | ||
kwargs=dict(cc_pair_id=cc_pair.id), | ||
) | ||
|
||
|
@@ -94,9 +131,13 @@ def autogenerate_usage_report_task() -> None: | |
# Celery Beat (Periodic Tasks) Settings | ||
##### | ||
celery_app.conf.beat_schedule = { | ||
"sync-external-permissions": { | ||
"task": "check_sync_external_permissions_task", | ||
"schedule": timedelta(seconds=60), # TODO: optimize this | ||
"sync-external-doc-permissions": { | ||
"task": "check_sync_external_doc_permissions_task", | ||
"schedule": timedelta(seconds=5), # TODO: optimize this | ||
}, | ||
"sync-external-group-permissions": { | ||
"task": "check_sync_external_group_permissions_task", | ||
"schedule": timedelta(seconds=5), # TODO: optimize this | ||
}, | ||
"autogenerate_usage_report": { | ||
"task": "autogenerate_usage_report_task", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,12 @@ | |
from danswer.db.tasks import get_latest_task | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Everything in this module is just refactoring the group and doc permission syncing to be seperate |
||
from danswer.utils.logger import setup_logger | ||
from ee.danswer.background.task_name_builders import name_chat_ttl_task | ||
from ee.danswer.background.task_name_builders import name_sync_external_permissions_task | ||
from ee.danswer.background.task_name_builders import ( | ||
name_sync_external_doc_permissions_task, | ||
) | ||
from ee.danswer.background.task_name_builders import ( | ||
name_sync_external_group_permissions_task, | ||
) | ||
from ee.danswer.db.user_group import delete_user_group | ||
from ee.danswer.db.user_group import fetch_user_group | ||
from ee.danswer.db.user_group import mark_user_group_as_synced | ||
|
@@ -38,13 +43,32 @@ def should_perform_chat_ttl_check( | |
return True | ||
|
||
|
||
def should_perform_external_permissions_check( | ||
def should_perform_external_doc_permissions_check( | ||
hagen-danswer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
cc_pair: ConnectorCredentialPair, db_session: Session | ||
) -> bool: | ||
if cc_pair.access_type != AccessType.SYNC: | ||
return False | ||
|
||
task_name = name_sync_external_permissions_task(cc_pair_id=cc_pair.id) | ||
task_name = name_sync_external_doc_permissions_task(cc_pair_id=cc_pair.id) | ||
|
||
latest_task = get_latest_task(task_name, db_session) | ||
if not latest_task: | ||
return True | ||
|
||
if check_task_is_live_and_not_timed_out(latest_task, db_session): | ||
logger.debug(f"{task_name} is already being performed. Skipping.") | ||
return False | ||
|
||
return True | ||
|
||
|
||
def should_perform_external_group_permissions_check( | ||
cc_pair: ConnectorCredentialPair, db_session: Session | ||
) -> bool: | ||
if cc_pair.access_type != AccessType.SYNC: | ||
return False | ||
|
||
task_name = name_sync_external_group_permissions_task(cc_pair_id=cc_pair.id) | ||
|
||
latest_task = get_latest_task(task_name, db_session) | ||
if not latest_task: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,5 +2,9 @@ def name_chat_ttl_task(retention_limit_days: int) -> str: | |
return f"chat_ttl_{retention_limit_days}_days" | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Everything in this module is just refactoring the group and doc permission syncing to be seperate |
||
|
||
def name_sync_external_permissions_task(cc_pair_id: int) -> str: | ||
return f"sync_external_permissions_task__{cc_pair_id}" | ||
def name_sync_external_doc_permissions_task(cc_pair_id: int) -> str: | ||
return f"sync_external_doc_permissions_task__{cc_pair_id}" | ||
|
||
|
||
def name_sync_external_group_permissions_task(cc_pair_id: int) -> str: | ||
return f"sync_external_group_permissions_task__{cc_pair_id}" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
from typing import Any | ||
|
||
from atlassian import Confluence # type:ignore | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved to utils since this is done for both group and doc sync |
||
|
||
def build_confluence_client( | ||
connector_specific_config: dict[str, Any], raw_credentials_json: dict[str, Any] | ||
) -> Confluence: | ||
is_cloud = connector_specific_config.get("is_cloud", False) | ||
return Confluence( | ||
api_version="cloud" if is_cloud else "latest", | ||
# Remove trailing slash from wiki_base if present | ||
url=connector_specific_config["wiki_base"].rstrip("/"), | ||
# passing in username causes issues for Confluence data center | ||
username=raw_credentials_json["confluence_username"] if is_cloud else None, | ||
password=raw_credentials_json["confluence_access_token"] if is_cloud else None, | ||
token=raw_credentials_json["confluence_access_token"] if not is_cloud else None, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
last pr, I indicated that the user_id and the external_user_group_id were both primary in the models.py, but not in the alembic file. so this fixes that