Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/pr-Integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ jobs:
-e SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN} \
-e TEST_WEB_HOSTNAME=test-runner \
danswer/danswer-integration:test \
/app/tests/integration/tests
/app/tests/integration/tests \
/app/tests/integration/connector_job_tests
continue-on-error: true
id: run_tests

Expand Down
4 changes: 2 additions & 2 deletions .vscode/launch.template.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@
"--loglevel=INFO",
"--hostname=light@%n",
"-Q",
"vespa_metadata_sync,connector_deletion",
"vespa_metadata_sync,connector_deletion,doc_permissions_upsert",
],
"presentation": {
"group": "2",
Expand Down Expand Up @@ -232,7 +232,7 @@
"--loglevel=INFO",
"--hostname=heavy@%n",
"-Q",
"connector_pruning",
"connector_pruning,connector_doc_permissions_sync,connector_external_group_sync",
],
"presentation": {
"group": "2",
Expand Down
30 changes: 30 additions & 0 deletions backend/alembic/versions/2daa494a0851_add_group_sync_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""add-group-sync-time
Revision ID: 2daa494a0851
Revises: c0fd6e4da83a
Create Date: 2024-11-11 10:57:22.991157
"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = "2daa494a0851"
down_revision = "c0fd6e4da83a"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
"connector_credential_pair",
sa.Column(
"last_time_external_group_sync",
sa.DateTime(timezone=True),
nullable=True,
),
)


def downgrade() -> None:
op.drop_column("connector_credential_pair", "last_time_external_group_sync")
35 changes: 35 additions & 0 deletions backend/danswer/access/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,41 @@ class ExternalAccess:
is_public: bool


@dataclass(frozen=True)
class DocExternalAccess:
external_access: ExternalAccess
# The document ID
doc_id: str

def to_dict(self) -> dict:
return {
"external_access": {
"external_user_emails": list(self.external_access.external_user_emails),
"external_user_group_ids": list(
self.external_access.external_user_group_ids
),
"is_public": self.external_access.is_public,
},
"doc_id": self.doc_id,
}

@classmethod
def from_dict(cls, data: dict) -> "DocExternalAccess":
external_access = ExternalAccess(
external_user_emails=set(
data["external_access"].get("external_user_emails", [])
),
external_user_group_ids=set(
data["external_access"].get("external_user_group_ids", [])
),
is_public=data["external_access"]["is_public"],
)
return cls(
external_access=external_access,
doc_id=data["doc_id"],
)


@dataclass(frozen=True)
class DocumentAccess(ExternalAccess):
# User emails for Danswer users, None indicates admin
Expand Down
18 changes: 18 additions & 0 deletions backend/danswer/background/celery/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from danswer.redis.redis_connector import RedisConnector
from danswer.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
from danswer.redis.redis_connector_delete import RedisConnectorDelete
from danswer.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync
from danswer.redis.redis_connector_ext_group_sync import RedisConnectorExternalGroupSync
from danswer.redis.redis_connector_prune import RedisConnectorPrune
from danswer.redis.redis_document_set import RedisDocumentSet
from danswer.redis.redis_pool import get_redis_client
Expand Down Expand Up @@ -136,6 +138,22 @@ def on_task_postrun(
RedisConnectorPrune.remove_from_taskset(int(cc_pair_id), task_id, r)
return

if task_id.startswith(RedisConnectorPermissionSync.SUBTASK_PREFIX):
cc_pair_id = RedisConnector.get_id_from_task_id(task_id)
if cc_pair_id is not None:
RedisConnectorPermissionSync.remove_from_taskset(
int(cc_pair_id), task_id, r
)
return

if task_id.startswith(RedisConnectorExternalGroupSync.SUBTASK_PREFIX):
cc_pair_id = RedisConnector.get_id_from_task_id(task_id)
if cc_pair_id is not None:
RedisConnectorExternalGroupSync.remove_from_taskset(
int(cc_pair_id), task_id, r
)
return


def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None:
"""The first signal sent on celery worker startup"""
Expand Down
2 changes: 2 additions & 0 deletions backend/danswer/background/celery/apps/heavy.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,7 @@ def on_setup_logging(
celery_app.autodiscover_tasks(
[
"danswer.background.celery.tasks.pruning",
"danswer.background.celery.tasks.doc_permission_syncing",
"danswer.background.celery.tasks.external_group_syncing",
]
)
1 change: 1 addition & 0 deletions backend/danswer/background/celery/apps/light.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,6 @@ def on_setup_logging(
"danswer.background.celery.tasks.shared",
"danswer.background.celery.tasks.vespa",
"danswer.background.celery.tasks.connector_deletion",
"danswer.background.celery.tasks.doc_permission_syncing",
]
)
8 changes: 8 additions & 0 deletions backend/danswer/background/celery/apps/primary.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from danswer.db.engine import SqlEngine
from danswer.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
from danswer.redis.redis_connector_delete import RedisConnectorDelete
from danswer.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync
from danswer.redis.redis_connector_ext_group_sync import RedisConnectorExternalGroupSync
from danswer.redis.redis_connector_index import RedisConnectorIndex
from danswer.redis.redis_connector_prune import RedisConnectorPrune
from danswer.redis.redis_connector_stop import RedisConnectorStop
Expand Down Expand Up @@ -134,6 +136,10 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:

RedisConnectorStop.reset_all(r)

RedisConnectorPermissionSync.reset_all(r)

RedisConnectorExternalGroupSync.reset_all(r)


@worker_ready.connect
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
Expand Down Expand Up @@ -233,6 +239,8 @@ def stop(self, worker: Any) -> None:
"danswer.background.celery.tasks.connector_deletion",
"danswer.background.celery.tasks.indexing",
"danswer.background.celery.tasks.periodic",
"danswer.background.celery.tasks.doc_permission_syncing",
"danswer.background.celery.tasks.external_group_syncing",
"danswer.background.celery.tasks.pruning",
"danswer.background.celery.tasks.shared",
"danswer.background.celery.tasks.vespa",
Expand Down
2 changes: 1 addition & 1 deletion backend/danswer/background/celery/celery_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def extract_ids_from_runnable_connector(
callback: RunIndexingCallbackInterface | None = None,
) -> set[str]:
"""
If the PruneConnector hasnt been implemented for the given connector, just pull
If the SlimConnector hasnt been implemented for the given connector, just pull
all docs using the load_from_state and grab out the IDs.

Optionally, a callback can be passed to handle the length of each document batch.
Expand Down
12 changes: 12 additions & 0 deletions backend/danswer/background/celery/tasks/beat_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@
"schedule": timedelta(seconds=5),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-doc-permissions-sync",
"task": "check_for_doc_permissions_sync",
"schedule": timedelta(seconds=30),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-external-group-sync",
"task": "check_for_external_group_sync",
"schedule": timedelta(seconds=20),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ def try_generate_document_cc_pair_cleanup_tasks(
f"cc_pair={cc_pair_id}"
)

if redis_connector.permissions.fenced:
raise TaskDependencyError(
f"Connector deletion - Delayed (permissions in progress): "
f"cc_pair={cc_pair_id}"
)

# add tasks to celery and build up the task set to monitor in redis
redis_connector.delete.taskset_clear()

Expand Down
Loading
Loading