Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions backend/alembic/versions/46b7a812670f_add_cc_pair_id_to_user__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Change primary key of user__external_user_group_id
Revision ID: 46b7a812670f
Revises: bd2921608c3a
Create Date: 2024-09-23 12:58:03.894038
"""
from alembic import op

# revision identifiers, used by Alembic.
revision = "46b7a812670f"
down_revision = "bd2921608c3a"
branch_labels = None
depends_on = None


def upgrade() -> None:
# Drop the existing primary key
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last pr, I indicated that the user_id and the external_user_group_id were both primary in the models.py, but not in the alembic file. so this fixes that

op.drop_constraint(
"user__external_user_group_id_pkey",
"user__external_user_group_id",
type_="primary",
)

# Add the new composite primary key
op.create_primary_key(
"user__external_user_group_id_pkey",
"user__external_user_group_id",
["user_id", "external_user_group_id"],
)


def downgrade() -> None:
# Drop the composite primary key
op.drop_constraint(
"user__external_user_group_id_pkey",
"user__external_user_group_id",
type_="primary",
)

# Recreate the original primary key on user_id
op.create_primary_key(
"user__external_user_group_id_pkey", "user__external_user_group_id", ["user_id"]
)
22 changes: 22 additions & 0 deletions backend/danswer/connectors/confluence/confluence_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import bs4


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I exported this function into a utils file so when I generate the id in the permission sync and when i generate the id in the connector, they match.

Maybe even more of the logic can be exported to here to make sure they match but this is it for now

def generate_confluence_document_id(base_url: str, content_url: str) -> str:
return f"{base_url}{content_url}"


def get_used_attachments(text: str) -> list[str]:
"""Parse a Confluence html page to generate a list of current
attachment in used
Args:
text (str): The page content
Returns:
list[str]: List of filename currently in used
"""
files_in_used = []
soup = bs4.BeautifulSoup(text, "html.parser")
for attachment in soup.findAll("ri:attachment"):
files_in_used.append(attachment.attrs["ri:filename"])
return files_in_used
34 changes: 12 additions & 22 deletions backend/danswer/connectors/confluence/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
from danswer.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE
from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.constants import DocumentSource
from danswer.connectors.confluence.confluence_utils import (
generate_confluence_document_id,
)
from danswer.connectors.confluence.confluence_utils import get_used_attachments
from danswer.connectors.confluence.rate_limit_handler import (
make_confluence_call_handle_rate_limit,
)
Expand Down Expand Up @@ -105,24 +109,6 @@ def parse_html_page(text: str, confluence_client: Confluence) -> str:
return format_document_soup(soup)


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to utils file

def get_used_attachments(text: str, confluence_client: Confluence) -> list[str]:
"""Parse a Confluence html page to generate a list of current
attachment in used

Args:
text (str): The page content
confluence_client (Confluence): Confluence client

Returns:
list[str]: List of filename currently in used
"""
files_in_used = []
soup = bs4.BeautifulSoup(text, "html.parser")
for attachment in soup.findAll("ri:attachment"):
files_in_used.append(attachment.attrs["ri:filename"])
return files_in_used


def _comment_dfs(
comments_str: str,
comment_pages: Collection[dict[str, Any]],
Expand Down Expand Up @@ -624,13 +610,16 @@ def _get_doc_batch(
page_html = (
page["body"].get("storage", page["body"].get("view", {})).get("value")
)
page_url = self.wiki_base + page["_links"]["webui"]
# The url and the id are the same
page_url = generate_confluence_document_id(
self.wiki_base, page["_links"]["webui"]
)
if not page_html:
logger.debug("Page is empty, skipping: %s", page_url)
continue
page_text = parse_html_page(page_html, self.confluence_client)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed unused argument from function and function calls

files_in_used = get_used_attachments(page_html, self.confluence_client)
files_in_used = get_used_attachments(page_html)
attachment_text, unused_page_attachments = self._fetch_attachments(
self.confluence_client, page_id, files_in_used
)
Expand Down Expand Up @@ -683,8 +672,9 @@ def _get_attachment_batch(
if time_filter and not time_filter(last_updated):
continue

attachment_url = self._attachment_to_download_link(
self.confluence_client, attachment
# The url and the id are the same
attachment_url = generate_confluence_document_id(
self.wiki_base, attachment["_links"]["download"]
)
attachment_content = self._attachment_to_content(
self.confluence_client, attachment
Expand Down
14 changes: 13 additions & 1 deletion backend/danswer/db/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,18 @@ def construct_document_select_for_connector_credential_pair(
return stmt


def get_document_ids_for_connector_credential_pair(
db_session: Session, connector_id: int, credential_id: int, limit: int | None = None
) -> list[str]:
doc_ids_stmt = select(DocumentByConnectorCredentialPair.id).where(
and_(
DocumentByConnectorCredentialPair.connector_id == connector_id,
DocumentByConnectorCredentialPair.credential_id == credential_id,
)
)
return list(db_session.execute(doc_ids_stmt).scalars().all())


def get_documents_for_connector_credential_pair(
db_session: Session, connector_id: int, credential_id: int, limit: int | None = None
) -> Sequence[DbDocument]:
Expand All @@ -120,8 +132,8 @@ def get_documents_for_connector_credential_pair(


def get_documents_by_ids(
document_ids: list[str],
db_session: Session,
document_ids: list[str],
) -> list[DbDocument]:
stmt = select(DbDocument).where(DbDocument.id.in_(document_ids))
documents = db_session.execute(stmt).scalars().all()
Expand Down
11 changes: 11 additions & 0 deletions backend/danswer/document_index/factory.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from sqlalchemy.orm import Session

from danswer.db.search_settings import get_current_search_settings
from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.vespa.index import VespaIndex

Expand All @@ -13,3 +16,11 @@ def get_default_document_index(
return VespaIndex(
index_name=primary_index_name, secondary_index_name=secondary_index_name
)


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they may be able to be used elsewhere

def get_current_primary_default_document_index(db_session: Session) -> DocumentIndex:
search_settings = get_current_search_settings(db_session)
return get_default_document_index(
primary_index_name=search_settings.index_name,
secondary_index_name=None,
)
2 changes: 1 addition & 1 deletion backend/danswer/indexing/indexing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ def index_doc_batch_prepare(

document_ids = [document.id for document in documents]
db_docs: list[DBDocument] = get_documents_by_ids(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixing the calling order wherever i can to make the codebase uniform :p

document_ids=document_ids,
db_session=db_session,
document_ids=document_ids,
)

# Skip indexing docs that don't have a newer updated at
Expand Down
67 changes: 54 additions & 13 deletions backend/ee/danswer/background/celery/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,25 @@
from danswer.utils.logger import setup_logger
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything in this module is just refactoring the group and doc permission syncing to be seperate

from danswer.utils.variable_functionality import global_version
from ee.danswer.background.celery_utils import should_perform_chat_ttl_check
from ee.danswer.background.celery_utils import should_perform_external_permissions_check
from ee.danswer.background.celery_utils import (
should_perform_external_doc_permissions_check,
)
from ee.danswer.background.celery_utils import (
should_perform_external_group_permissions_check,
)
from ee.danswer.background.task_name_builders import name_chat_ttl_task
from ee.danswer.background.task_name_builders import name_sync_external_permissions_task
from ee.danswer.background.task_name_builders import (
name_sync_external_doc_permissions_task,
)
from ee.danswer.background.task_name_builders import (
name_sync_external_group_permissions_task,
)
from ee.danswer.db.connector_credential_pair import get_all_auto_sync_cc_pairs
from ee.danswer.external_permissions.permission_sync import (
run_permission_sync_entrypoint,
run_external_doc_permission_sync,
)
from ee.danswer.external_permissions.permission_sync import (
run_external_group_permission_sync,
)
from ee.danswer.server.reporting.usage_export_generation import create_new_usage_report

Expand All @@ -26,11 +39,18 @@
global_version.set_ee()


@build_celery_task_wrapper(name_sync_external_permissions_task)
@build_celery_task_wrapper(name_sync_external_doc_permissions_task)
@celery_app.task(soft_time_limit=JOB_TIMEOUT)
def sync_external_permissions_task(cc_pair_id: int) -> None:
def sync_external_doc_permissions_task(cc_pair_id: int) -> None:
with Session(get_sqlalchemy_engine()) as db_session:
run_permission_sync_entrypoint(db_session=db_session, cc_pair_id=cc_pair_id)
run_external_doc_permission_sync(db_session=db_session, cc_pair_id=cc_pair_id)


@build_celery_task_wrapper(name_sync_external_group_permissions_task)
@celery_app.task(soft_time_limit=JOB_TIMEOUT)
def sync_external_group_permissions_task(cc_pair_id: int) -> None:
with Session(get_sqlalchemy_engine()) as db_session:
run_external_group_permission_sync(db_session=db_session, cc_pair_id=cc_pair_id)


@build_celery_task_wrapper(name_chat_ttl_task)
Expand All @@ -44,18 +64,35 @@ def perform_ttl_management_task(retention_limit_days: int) -> None:
# Periodic Tasks
#####
@celery_app.task(
name="check_sync_external_permissions_task",
name="check_sync_external_doc_permissions_task",
soft_time_limit=JOB_TIMEOUT,
)
def check_sync_external_permissions_task() -> None:
def check_sync_external_doc_permissions_task() -> None:
"""Runs periodically to sync external permissions"""
with Session(get_sqlalchemy_engine()) as db_session:
cc_pairs = get_all_auto_sync_cc_pairs(db_session)
for cc_pair in cc_pairs:
if should_perform_external_permissions_check(
if should_perform_external_doc_permissions_check(
cc_pair=cc_pair, db_session=db_session
):
sync_external_permissions_task.apply_async(
sync_external_doc_permissions_task.apply_async(
kwargs=dict(cc_pair_id=cc_pair.id),
)


@celery_app.task(
name="check_sync_external_group_permissions_task",
soft_time_limit=JOB_TIMEOUT,
)
def check_sync_external_group_permissions_task() -> None:
"""Runs periodically to sync external group permissions"""
with Session(get_sqlalchemy_engine()) as db_session:
cc_pairs = get_all_auto_sync_cc_pairs(db_session)
for cc_pair in cc_pairs:
if should_perform_external_group_permissions_check(
cc_pair=cc_pair, db_session=db_session
):
sync_external_group_permissions_task.apply_async(
kwargs=dict(cc_pair_id=cc_pair.id),
)

Expand Down Expand Up @@ -94,9 +131,13 @@ def autogenerate_usage_report_task() -> None:
# Celery Beat (Periodic Tasks) Settings
#####
celery_app.conf.beat_schedule = {
"sync-external-permissions": {
"task": "check_sync_external_permissions_task",
"schedule": timedelta(seconds=60), # TODO: optimize this
"sync-external-doc-permissions": {
"task": "check_sync_external_doc_permissions_task",
"schedule": timedelta(seconds=5), # TODO: optimize this
},
"sync-external-group-permissions": {
"task": "check_sync_external_group_permissions_task",
"schedule": timedelta(seconds=5), # TODO: optimize this
},
"autogenerate_usage_report": {
"task": "autogenerate_usage_report_task",
Expand Down
30 changes: 27 additions & 3 deletions backend/ee/danswer/background/celery_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
from danswer.db.tasks import get_latest_task
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything in this module is just refactoring the group and doc permission syncing to be seperate

from danswer.utils.logger import setup_logger
from ee.danswer.background.task_name_builders import name_chat_ttl_task
from ee.danswer.background.task_name_builders import name_sync_external_permissions_task
from ee.danswer.background.task_name_builders import (
name_sync_external_doc_permissions_task,
)
from ee.danswer.background.task_name_builders import (
name_sync_external_group_permissions_task,
)
from ee.danswer.db.user_group import delete_user_group
from ee.danswer.db.user_group import fetch_user_group
from ee.danswer.db.user_group import mark_user_group_as_synced
Expand All @@ -38,13 +43,32 @@ def should_perform_chat_ttl_check(
return True


def should_perform_external_permissions_check(
def should_perform_external_doc_permissions_check(
cc_pair: ConnectorCredentialPair, db_session: Session
) -> bool:
if cc_pair.access_type != AccessType.SYNC:
return False

task_name = name_sync_external_permissions_task(cc_pair_id=cc_pair.id)
task_name = name_sync_external_doc_permissions_task(cc_pair_id=cc_pair.id)

latest_task = get_latest_task(task_name, db_session)
if not latest_task:
return True

if check_task_is_live_and_not_timed_out(latest_task, db_session):
logger.debug(f"{task_name} is already being performed. Skipping.")
return False

return True


def should_perform_external_group_permissions_check(
cc_pair: ConnectorCredentialPair, db_session: Session
) -> bool:
if cc_pair.access_type != AccessType.SYNC:
return False

task_name = name_sync_external_group_permissions_task(cc_pair_id=cc_pair.id)

latest_task = get_latest_task(task_name, db_session)
if not latest_task:
Expand Down
8 changes: 6 additions & 2 deletions backend/ee/danswer/background/task_name_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,9 @@ def name_chat_ttl_task(retention_limit_days: int) -> str:
return f"chat_ttl_{retention_limit_days}_days"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything in this module is just refactoring the group and doc permission syncing to be seperate


def name_sync_external_permissions_task(cc_pair_id: int) -> str:
return f"sync_external_permissions_task__{cc_pair_id}"
def name_sync_external_doc_permissions_task(cc_pair_id: int) -> str:
return f"sync_external_doc_permissions_task__{cc_pair_id}"


def name_sync_external_group_permissions_task(cc_pair_id: int) -> str:
return f"sync_external_group_permissions_task__{cc_pair_id}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Any

from atlassian import Confluence # type:ignore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to utils since this is done for both group and doc sync


def build_confluence_client(
connector_specific_config: dict[str, Any], raw_credentials_json: dict[str, Any]
) -> Confluence:
is_cloud = connector_specific_config.get("is_cloud", False)
return Confluence(
api_version="cloud" if is_cloud else "latest",
# Remove trailing slash from wiki_base if present
url=connector_specific_config["wiki_base"].rstrip("/"),
# passing in username causes issues for Confluence data center
username=raw_credentials_json["confluence_username"] if is_cloud else None,
password=raw_credentials_json["confluence_access_token"] if is_cloud else None,
token=raw_credentials_json["confluence_access_token"] if not is_cloud else None,
)
Loading
Loading