Skip to content

Commit 2e80684

Browse files
committed
hook up permission syncs to celery task
1 parent 7e65811 commit 2e80684

File tree

3 files changed

+367
-10
lines changed

3 files changed

+367
-10
lines changed

backend/ee/onyx/background/celery/tasks/doc_permission_syncing/tasks.py

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,18 @@
5656
from onyx.db.enums import SyncStatus
5757
from onyx.db.enums import SyncType
5858
from onyx.db.models import ConnectorCredentialPair
59+
from onyx.db.permission_sync_attempt import create_doc_permission_sync_attempt
60+
from onyx.db.permission_sync_attempt import (
61+
mark_doc_permission_sync_attempt_completed_with_errors,
62+
)
63+
from onyx.db.permission_sync_attempt import mark_doc_permission_sync_attempt_failed
64+
from onyx.db.permission_sync_attempt import (
65+
mark_doc_permission_sync_attempt_in_progress,
66+
)
67+
from onyx.db.permission_sync_attempt import (
68+
mark_doc_permission_sync_attempt_succeeded,
69+
)
70+
from onyx.db.permission_sync_attempt import update_doc_permission_sync_progress
5971
from onyx.db.sync_record import insert_sync_record
6072
from onyx.db.sync_record import update_sync_record_status
6173
from onyx.db.users import batch_add_ext_perm_user_if_not_exists
@@ -379,6 +391,15 @@ def connector_permission_sync_generator_task(
379391
doc_permission_sync_ctx_dict["request_id"] = self.request.id
380392
doc_permission_sync_ctx.set(doc_permission_sync_ctx_dict)
381393

394+
with get_session_with_current_tenant() as db_session:
395+
attempt_id = create_doc_permission_sync_attempt(
396+
connector_credential_pair_id=cc_pair_id,
397+
db_session=db_session,
398+
)
399+
task_logger.info(
400+
f"Created doc permission sync attempt: {attempt_id} for cc_pair={cc_pair_id}"
401+
)
402+
382403
redis_connector = RedisConnector(tenant_id, cc_pair_id)
383404

384405
r = get_redis_client()
@@ -483,6 +504,8 @@ def connector_permission_sync_generator_task(
483504

484505
logger.info(f"Syncing docs for {source_type} with cc_pair={cc_pair_id}")
485506

507+
mark_doc_permission_sync_attempt_in_progress(attempt_id, db_session)
508+
486509
payload = redis_connector.permissions.payload
487510
if not payload:
488511
raise ValueError(f"No fence payload found: cc_pair={cc_pair_id}")
@@ -533,22 +556,53 @@ def fetch_all_existing_docs_ids_fn() -> list[str]:
533556
)
534557

535558
tasks_generated = 0
559+
docs_with_errors = 0
536560
for doc_external_access in document_external_accesses:
537-
redis_connector.permissions.update_db(
538-
lock=lock,
539-
new_permissions=[doc_external_access],
540-
source_string=source_type,
541-
connector_id=cc_pair.connector.id,
542-
credential_id=cc_pair.credential.id,
543-
task_logger=task_logger,
544-
)
545-
tasks_generated += 1
561+
try:
562+
redis_connector.permissions.update_db(
563+
lock=lock,
564+
new_permissions=[doc_external_access],
565+
source_string=source_type,
566+
connector_id=cc_pair.connector.id,
567+
credential_id=cc_pair.credential.id,
568+
task_logger=task_logger,
569+
)
570+
tasks_generated += 1
571+
except Exception as e:
572+
docs_with_errors += 1
573+
task_logger.exception(
574+
f"Error updating permissions for doc {doc_external_access.doc_id}: {e}"
575+
)
576+
# Continue processing other documents
546577

547578
task_logger.info(
548579
f"RedisConnector.permissions.generate_tasks finished. "
549-
f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}"
580+
f"cc_pair={cc_pair_id} tasks_generated={tasks_generated} docs_with_errors={docs_with_errors}"
550581
)
551582

583+
update_doc_permission_sync_progress(
584+
db_session=db_session,
585+
attempt_id=attempt_id,
586+
total_docs_synced=tasks_generated,
587+
docs_with_permission_errors=docs_with_errors,
588+
)
589+
task_logger.info(
590+
f"Updated progress for attempt {attempt_id}: {tasks_generated} docs, {docs_with_errors} errors"
591+
)
592+
593+
if docs_with_errors > 0:
594+
mark_doc_permission_sync_attempt_completed_with_errors(
595+
attempt_id, db_session
596+
)
597+
task_logger.info(
598+
f"Marked doc permission sync attempt {attempt_id} as completed with errors"
599+
)
600+
else:
601+
mark_doc_permission_sync_attempt_succeeded(attempt_id, db_session)
602+
task_logger.info(
603+
f"Marked doc permission sync attempt {attempt_id} as succeeded"
604+
)
605+
552606
redis_connector.permissions.generator_complete = tasks_generated
553607

554608
except Exception as e:
@@ -561,6 +615,12 @@ def fetch_all_existing_docs_ids_fn() -> list[str]:
561615
f"Permission sync exceptioned: cc_pair={cc_pair_id} payload_id={payload_id}"
562616
)
563617

618+
with get_session_with_current_tenant() as db_session:
619+
mark_doc_permission_sync_attempt_failed(attempt_id, db_session)
620+
task_logger.info(
621+
f"Marked doc permission sync attempt {attempt_id} as failed"
622+
)
623+
564624
redis_connector.permissions.generator_clear()
565625
redis_connector.permissions.taskset_clear()
566626
redis_connector.permissions.set_fence(None)

backend/ee/onyx/background/celery/tasks/external_group_syncing/tasks.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,21 @@
4949
from onyx.db.enums import SyncStatus
5050
from onyx.db.enums import SyncType
5151
from onyx.db.models import ConnectorCredentialPair
52+
from onyx.db.permission_sync_attempt import (
53+
create_external_group_sync_attempt,
54+
)
55+
from onyx.db.permission_sync_attempt import (
56+
mark_external_group_sync_attempt_failed,
57+
)
58+
from onyx.db.permission_sync_attempt import (
59+
mark_external_group_sync_attempt_in_progress,
60+
)
61+
from onyx.db.permission_sync_attempt import (
62+
mark_external_group_sync_attempt_succeeded,
63+
)
64+
from onyx.db.permission_sync_attempt import (
65+
update_external_group_sync_progress,
66+
)
5267
from onyx.db.sync_record import insert_sync_record
5368
from onyx.db.sync_record import update_sync_record_status
5469
from onyx.redis.redis_connector import RedisConnector
@@ -449,6 +464,16 @@ def _perform_external_group_sync(
449464
cc_pair_id: int,
450465
tenant_id: str,
451466
) -> None:
467+
# Create attempt record at the start
468+
with get_session_with_current_tenant() as db_session:
469+
attempt_id = create_external_group_sync_attempt(
470+
connector_credential_pair_id=cc_pair_id,
471+
db_session=db_session,
472+
)
473+
logger.info(
474+
f"Created external group sync attempt: {attempt_id} for cc_pair={cc_pair_id}"
475+
)
476+
452477
with get_session_with_current_tenant() as db_session:
453478
cc_pair = get_connector_credential_pair_from_id(
454479
db_session=db_session,
@@ -477,14 +502,27 @@ def _perform_external_group_sync(
477502
)
478503
mark_old_external_groups_as_stale(db_session, cc_pair_id)
479504

505+
# Mark attempt as in progress
506+
mark_external_group_sync_attempt_in_progress(attempt_id, db_session)
507+
logger.info(f"Marked external group sync attempt {attempt_id} as in progress")
508+
480509
logger.info(
481510
f"Syncing external groups for {source_type} for cc_pair: {cc_pair_id}"
482511
)
483512
external_user_group_batch: list[ExternalUserGroup] = []
513+
total_users_processed = 0
514+
total_groups_processed = 0
515+
total_group_memberships_synced = 0
484516
try:
485517
external_user_group_generator = ext_group_sync_func(tenant_id, cc_pair)
486518
for external_user_group in external_user_group_generator:
487519
external_user_group_batch.append(external_user_group)
520+
521+
# Track progress
522+
total_groups_processed += 1
523+
total_users_processed += len(external_user_group.user_emails)
524+
total_group_memberships_synced += len(external_user_group.user_emails)
525+
488526
if len(external_user_group_batch) >= _EXTERNAL_GROUP_BATCH_SIZE:
489527
logger.debug(
490528
f"New external user groups: {external_user_group_batch}"
@@ -506,6 +544,10 @@ def _perform_external_group_sync(
506544
source=cc_pair.connector.source,
507545
)
508546
except Exception as e:
547+
# Mark attempt as failed
548+
mark_external_group_sync_attempt_failed(attempt_id, db_session)
549+
logger.info(f"Marked external group sync attempt {attempt_id} as failed")
550+
509551
# TODO: add some notification to the admins here
510552
logger.exception(
511553
f"Error syncing external groups for {source_type} for cc_pair: {cc_pair_id} {e}"
@@ -517,6 +559,23 @@ def _perform_external_group_sync(
517559
)
518560
remove_stale_external_groups(db_session, cc_pair_id)
519561

562+
# Update progress and mark as succeeded
563+
update_external_group_sync_progress(
564+
db_session=db_session,
565+
attempt_id=attempt_id,
566+
total_users_processed=total_users_processed,
567+
total_groups_processed=total_groups_processed,
568+
total_group_memberships_synced=total_group_memberships_synced,
569+
)
570+
logger.info(
571+
f"Updated progress for attempt {attempt_id}: "
572+
f"{total_groups_processed} groups, {total_users_processed} users, "
573+
f"{total_group_memberships_synced} memberships"
574+
)
575+
576+
mark_external_group_sync_attempt_succeeded(attempt_id, db_session)
577+
logger.info(f"Marked external group sync attempt {attempt_id} as succeeded")
578+
520579
mark_all_relevant_cc_pairs_as_external_group_synced(db_session, cc_pair)
521580

522581

0 commit comments

Comments
 (0)