-
Notifications
You must be signed in to change notification settings - Fork 2k
feat: connector indexing decoupling #4893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
deployment/helm/charts/onyx/templates/celery-worker-document-processing.yaml
Show resolved
Hide resolved
try: | ||
with get_session_with_current_tenant() as db_session_temp: | ||
index_attempt = get_index_attempt(db_session_temp, index_attempt_id) | ||
index_attempt = get_index_attempt( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this eager load stuff seems to be exposing some DB behavior ... maybe a bit problematic
backend/onyx/background/celery/versioned_apps/document_processing.py
Outdated
Show resolved
Hide resolved
backend/onyx/background/celery/tasks/document_processing/tasks.py
Outdated
Show resolved
Hide resolved
backend/onyx/background/celery/tasks/document_processing/tasks.py
Outdated
Show resolved
Hide resolved
backend/onyx/background/celery/tasks/document_processing/tasks.py
Outdated
Show resolved
Hide resolved
af01b09
to
9592aa7
Compare
9592aa7
to
512e122
Compare
512e122
to
ee30d7b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Summary
Major architectural change to decouple document fetching from processing in the indexing pipeline, allowing parallel execution through a new blob store intermediary layer.
- Split monolithic indexing process into
docfetching_task
(fetches source documents) anddocument_indexing_pipeline_task
(handles processing/embedding/storage), coordinated via Redis locks - Added new
DocumentBatchStorage
system using FileStore abstraction to manage intermediate document storage between fetching and processing stages - Implemented robust error handling and state management for the decoupled tasks, including proper cleanup of temporary storage
- Updated
run_indexing.py
to support the new pipeline architecture with separateDocExtractionContext
andDocIndexingContext
models - Simplified session handling in
S3BackedFileStore
by movingdb_session
from constructor to method parameters for better dependency injection
35 files reviewed, 9 comments
Edit PR Review Bot Settings | Greptile
# Default parameters for creation | ||
DEFAULT_KWARGS = { | ||
"http2": True, | ||
"limits": lambda: httpx.Limits(), | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Remove empty comment and blank line as they no longer serve a purpose since DEFAULT_KWARGS was moved
backend/onyx/configs/constants.py
Outdated
POSTGRES_CELERY_WORKER_LIGHT_APP_NAME = "celery_worker_light" | ||
POSTGRES_CELERY_WORKER_HEAVY_APP_NAME = "celery_worker_heavy" | ||
POSTGRES_CELERY_WORKER_INDEXING_APP_NAME = "celery_worker_indexing" | ||
POSTGRES_CELERY_WORKER_docfetching_APP_NAME = "celery_worker_docfetching" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Use consistent casing ('docfetching' should be 'DOCFETCHING')
POSTGRES_CELERY_WORKER_docfetching_APP_NAME = "celery_worker_docfetching" | |
POSTGRES_CELERY_WORKER_DOCFETCHING_APP_NAME = "celery_worker_docfetching" |
def parallel_yield_from_funcs( | ||
funcs: list[Callable[..., R]], | ||
max_workers: int = 10, | ||
) -> Iterator[R]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Consider allowing args and kwargs parameters to support functions that take arguments. Currently only supports nullary functions.
def parallel_yield_from_funcs( | |
funcs: list[Callable[..., R]], | |
max_workers: int = 10, | |
) -> Iterator[R]: | |
def parallel_yield_from_funcs( | |
funcs: list[Callable[[], R]], | |
max_workers: int = 10, | |
) -> Iterator[R]: |
"onyx.background.celery.tasks.docfetching", | ||
"onyx.background.celery.tasks.indexing", # TODO: remove this and move the task to docfetching |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: The indexing tasks dependency creates coupling that contradicts the PR's decoupling goal. Need to complete the TODO task migration before merging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
# TODO: change to doc extraction if it doesnt break things | ||
callback.progress("_run_indexing", 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Callback still references '_run_indexing' but should be updated to 'doc_extraction' for clarity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also should docfetching specific stuff be moved into a docfetching folder?
if batches_processed > last_batches_completed: | ||
last_batches_completed = batches_processed | ||
last_progress_time = time.monotonic() | ||
elif time.monotonic() - last_progress_time > 3600 * 6: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: 6 hour timeout hardcoded - consider making this configurable via constant or config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
if not self.file_store.has_file( | ||
file_id=file_name, | ||
file_origin=FileOrigin.OTHER, | ||
file_type="application/json", | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: file_type should be constant, define FILE_TYPE = 'application/json' at class level
except Exception as e: | ||
logger.warning(f"Failed to delete extraction state: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: error message indicates 'extraction state' but this is in a loop over all state types
except Exception as e: | |
logger.warning(f"Failed to delete extraction state: {e}") | |
except Exception as e: | |
logger.warning(f"Failed to delete {state_type.value} state: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer logger.exception
here (prints out the stack trace nicely)
raise | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: The bare except block here could mask important errors. Consider handling specific exceptions (like ClientError for S3 operations and SQLAlchemyError for database operations) separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss how we want to do the monitoring, but overall looks good 🧐
"onyx.background.celery.tasks.docfetching", | ||
"onyx.background.celery.tasks.indexing", # TODO: remove this and move the task to docfetching |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
except Exception as e: | ||
logger.warning(f"Failed to delete extraction state: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer logger.exception
here (prints out the stack trace nicely)
|
||
# Get batch storage (transition to IN_PROGRESS is handled by run_indexing_entrypoint) | ||
with get_session_with_current_tenant() as db_session: | ||
batch_storage = get_document_batch_storage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems slightly problematic (it's also in a few other places). The db session will no longer be valid outside of the with
statement
# TODO: change to doc extraction if it doesnt break things | ||
callback.progress("_run_indexing", 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
logger.error(f"Failed to store {state_type} state: {e}") | ||
raise | ||
|
||
def _get_state(self, state_type: str) -> DocumentStorageState | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should state_type
be an Enum type?
) | ||
# Get the document batch storage | ||
with get_session_with_current_tenant() as db_session: | ||
storage = get_document_batch_storage(tenant_id, index_attempt_id, db_session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about db_session
if batches_processed > last_batches_completed: | ||
last_batches_completed = batches_processed | ||
last_progress_time = time.monotonic() | ||
elif time.monotonic() - last_progress_time > 3600 * 6: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
) | ||
|
||
with get_session_with_current_tenant() as db_session: | ||
storage = get_document_batch_storage(tenant_id, index_attempt_id, db_session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about db_session
tenant_id: str, | ||
) -> int | None: | ||
""" | ||
TODO: update docstring to reflect docfetching |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do this TODO now 🥺
bf3c890
to
da665a6
Compare
56e6b41
to
9046d88
Compare
4355e8f
to
1407e26
Compare
0ad3f95
to
0358f17
Compare
* fix bug in index swap (onyx-dot-app#5036) * Add PR labeller job (onyx-dot-app#4611) * fix: Fix Confluence pagination (onyx-dot-app#5035) * Re-implement pagination * Add note * Fix invalid integration test configs * Fix other failing test * Edit failing test * Revert test * Revert pagination size * Add comment on yielding style * Use fixture instead of manually initializing sql-engine * Fix failing tests * Move code back and copy-paste * fix: Have document show up before message starts streaming back (onyx-dot-app#5006) * Have document show up before message starts streaming back * Add docs * fix: Move around group-sync tests (since they require docker services to be running) (onyx-dot-app#5041) * Move around tests * Add missing fixtures + change directory structure up some more * Add env variables * remove chat session necessity from send message simple api (onyx-dot-app#5040) * Improve support for non-default postgres schemas (onyx-dot-app#5046) * fix: improve check for indexing status (onyx-dot-app#5042) * Improve check_for_indexing + check_for_vespa_sync_task * Remove unused * Fix * Simplify query * Add more logging * Address bot comments * Increase # of tasks generated since we're not going cc-pair by cc-pair * Only index 50 user files at a time * fix: improve assistant fetching efficiency (onyx-dot-app#5047) * Improve assistant fetching efficiency * More fix * Fix weird build stuff * Improve * feat: KG improvements (onyx-dot-app#5048) * improvements * drop views if SQL fails * mypy fix * feat: Search and Answer Quality Test Script (onyx-dot-app#4974) * aefads * search quality tests improvement Co-authored-by: wenxi-onyx <wenxi@onyx.app> * nits * refactor: config refactor * document context + skip genai fix * feat: answer eval * more error messages * mypy ragas * mypy * small fixes * feat: more metrics * fix * feat: grab content * typing * feat: lazy updates * mypy * all at front * feat: answer correctness * use api key so it works with auth enabled * update readme * feat: auto add path * feat: rate limit * fix: readme + remove rerank all * fix: raise exception immediately * docs: improved clarity * feat: federated handling * fix: mypy * nits --------- Co-authored-by: wenxi-onyx <wenxi@onyx.app> * Remove empty tooltip (onyx-dot-app#5050) * feat: Updated KG admin page (onyx-dot-app#5044) * Update KG admin UI * Styling changes * More changes * Make edits auto-save * Add more stylings / transitions * Fix opacity * Separate out modal into new component * Revert backend changes * Update styling * Add convenience / styling changes to date-picker * More styling / functional updates to kg admin-page * Avoid reducing opacity of active-toggle * Update backend APIs for new KG admin page * More updates of styling for kg-admin page * Remove nullability * Remove console log * Remove unused imports * Change type of `children` variable * Update web/src/app/admin/kg/interfaces.ts Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com> * Update web/src/components/CollapsibleCard.tsx Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com> * Remove null * Update web/src/components/CollapsibleCard.tsx Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> * Force non-null * Fix failing test --------- Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com> Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> * Make `from_.user` optional (use "Unknown User") if not found (onyx-dot-app#5051) * feat: connector indexing decoupling (onyx-dot-app#4893) * WIP * renamed and moved tasks (WIP) * minio migration * bug fixes and finally add document batch storage * WIP: can suceed but status is error * WIP * import fixes * working v1 of decoupled * catastrophe handling * refactor * remove unused db session in prep for new approach * renaming and docstrings (untested) * renames * WIP with no more indexing fences * robustness improvements * clean up rebase * migration and salesforce rate limits * minor tweaks * test fix * connector pausing behavior * correct checkpoint resumption logic * cleanups in docfetching * add heartbeat file * update template jsonc * deployment fixes * fix vespa httpx pool * error handling * cosmetic fixes * dumb * logging improvements and non checkpointed connector fixes * didnt save * misc fixes * fix import * fix deletion of old files * add in attempt prefix * fix attempt prefix * tiny log improvement * minor changes * fixed resumption behavior * passing int tests * fix unit test * fixed unit tests * trying timeout bump to see if int tests pass * trying timeout bump to see if int tests pass * fix autodiscovery * helm chart fixes * helm and logging * Tiny launch.json template improvement (onyx-dot-app#5055) * refactor: Update the error message that is logged when PR title fails Conventional Commits regex (onyx-dot-app#5062) * fix: Make pr-labeler run on edits too * fix: time discrepancy (onyx-dot-app#5056) * fix time discrepancy * remove log * remove log * handle empty doc batches (onyx-dot-app#5058) * fix: too many internet chunks (onyx-dot-app#5060) * minor internet search env vars * add limit to internet search chunks * note * nits * fix: remove extra group sync (onyx-dot-app#5061) * fix: remove extra group sync * second extra task * fix: regen api key (onyx-dot-app#5064) * feat: avoid full rerun (onyx-dot-app#5063) * fix: remove extra group sync * second extra task * minor improvement for non-checkpointed connectors * fix: explicit api_server dependency on minio in docker compose files (onyx-dot-app#5066) * fix: adjust template variable from .Chart.AppVersion to .Values.global.version to match versioning pattern. (onyx-dot-app#5069) * refactor: Update location of `sidebar` (onyx-dot-app#5067) * Use props instead of inline type def * Add new AppProvider * Remove unused component file * Move `sessionSidebar` to be inside of `components` instead of `app/chat` * Change name of `sessionSidebar` to `sidebar` * Remove `AppModeProvider` * Fix bug in how the cookies were set * fix: remove locks from indexing callback (onyx-dot-app#5070) * attempt fix for broken excel files (onyx-dot-app#5071) * fix: sharepoint lg files issue (onyx-dot-app#5065) * add SharePoint file size threshold check * Implement retry logic for SharePoint queries to handle rate limiting and server error * mypy fix * add content none check * remove unreachable code from retry logic in sharepoint connector * add library to fall back to for tokenizing (onyx-dot-app#5078) * fix: drive external links (onyx-dot-app#5079) * feat: support aspx files (onyx-dot-app#5068) * Support aspx files * Add fetching of site pages * Improve * Small enhancement * more improvements * Improvements * Fix tests * attempt to fix parsing of tricky template files (onyx-dot-app#5080) * typo (onyx-dot-app#5082) * fix: preserve error traces (onyx-dot-app#5083) * fix: sidebar ranges (onyx-dot-app#5084) * onyx metadata minio fix + permissive unstructured fail (onyx-dot-app#5085) * feat: pruning freq (onyx-dot-app#5097) * pruning frequency increase * add logs * [Vespa] Update to optimized configuration * Let's do this properly * [Vespa] Update to optimized configuration pt.2 (onyx-dot-app#5113) * Node option to avoid heap out of memory (#1) * set some resource limits that work (#2) * set some resource limits that work --------- Signed-off-by: nigel brown <nigel@stacklok.com> * Adds endpoint to the onyx API for using the search tool (#3) Usage: ```bash curl -X POST "http://localhost:8080/onyx-tools/search-tool" \ -H "Content-Type: application/json" \ -d '{"query": "key projects stacklok"}' | jq ``` * create a new node pool and used it and ECR (#4) Signed-off-by: nigel brown <nigel@stacklok.com> * Add document links to the output (#5) * Remove unused DocumentResult class (#6) * Enhance search API with filtering and structured response (#8) - Add time_cutoff parameter to filter results by date - Add document_sources parameter to filter by source types - Replace string response with structured FoundDocSearchTool objects - Update SearchToolRequest model with new optional parameters - Improve type hints using modern Python syntax (list[] vs List[]) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Claude <noreply@anthropic.com> * Enables basic auth in Onyx (#9) * Enables basi auth in Onyx Before we we're not authenticating in Onyx. Now the requests should be authenticated for them to be successful * Pass token user to SearchTool --------- Signed-off-by: nigel brown <nigel@stacklok.com> Co-authored-by: Evan Lohn <evan@danswer.ai> Co-authored-by: Raunak Bhagat <r@rabh.io> Co-authored-by: Wenxi <wenxi@onyx.app> Co-authored-by: Chris Weaver <chris@onyx.app> Co-authored-by: joachim-danswer <joachim@danswer.ai> Co-authored-by: Rei Meguro <36625832+Orbital-Web@users.noreply.github.com> Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com> Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Co-authored-by: Devin <49892118+Dcooley1350@users.noreply.github.com> Co-authored-by: PaulHLiatrio <100874415+PaulHLiatrio@users.noreply.github.com> Co-authored-by: SubashMohan <subashmohan75@gmail.com> Co-authored-by: justin-tahara <justintahara@gmail.com> Co-authored-by: Justin Tahara <105671973+justin-tahara@users.noreply.github.com> Co-authored-by: Nigel Brown <nigel@stacklok.com> Co-authored-by: Pankaj Telang <ptelang@gmail.com> Co-authored-by: Claude <noreply@anthropic.com>
* WIP * renamed and moved tasks (WIP) * minio migration * bug fixes and finally add document batch storage * WIP: can suceed but status is error * WIP * import fixes * working v1 of decoupled * catastrophe handling * refactor * remove unused db session in prep for new approach * renaming and docstrings (untested) * renames * WIP with no more indexing fences * robustness improvements * clean up rebase * migration and salesforce rate limits * minor tweaks * test fix * connector pausing behavior * correct checkpoint resumption logic * cleanups in docfetching * add heartbeat file * update template jsonc * deployment fixes * fix vespa httpx pool * error handling * cosmetic fixes * dumb * logging improvements and non checkpointed connector fixes * didnt save * misc fixes * fix import * fix deletion of old files * add in attempt prefix * fix attempt prefix * tiny log improvement * minor changes * fixed resumption behavior * passing int tests * fix unit test * fixed unit tests * trying timeout bump to see if int tests pass * trying timeout bump to see if int tests pass * fix autodiscovery * helm chart fixes * helm and logging
* WIP * renamed and moved tasks (WIP) * minio migration * bug fixes and finally add document batch storage * WIP: can suceed but status is error * WIP * import fixes * working v1 of decoupled * catastrophe handling * refactor * remove unused db session in prep for new approach * renaming and docstrings (untested) * renames * WIP with no more indexing fences * robustness improvements * clean up rebase * migration and salesforce rate limits * minor tweaks * test fix * connector pausing behavior * correct checkpoint resumption logic * cleanups in docfetching * add heartbeat file * update template jsonc * deployment fixes * fix vespa httpx pool * error handling * cosmetic fixes * dumb * logging improvements and non checkpointed connector fixes * didnt save * misc fixes * fix import * fix deletion of old files * add in attempt prefix * fix attempt prefix * tiny log improvement * minor changes * fixed resumption behavior * passing int tests * fix unit test * fixed unit tests * trying timeout bump to see if int tests pass * trying timeout bump to see if int tests pass * fix autodiscovery * helm chart fixes * helm and logging
* WIP * renamed and moved tasks (WIP) * minio migration * bug fixes and finally add document batch storage * WIP: can suceed but status is error * WIP * import fixes * working v1 of decoupled * catastrophe handling * refactor * remove unused db session in prep for new approach * renaming and docstrings (untested) * renames * WIP with no more indexing fences * robustness improvements * clean up rebase * migration and salesforce rate limits * minor tweaks * test fix * connector pausing behavior * correct checkpoint resumption logic * cleanups in docfetching * add heartbeat file * update template jsonc * deployment fixes * fix vespa httpx pool * error handling * cosmetic fixes * dumb * logging improvements and non checkpointed connector fixes * didnt save * misc fixes * fix import * fix deletion of old files * add in attempt prefix * fix attempt prefix * tiny log improvement * minor changes * fixed resumption behavior * passing int tests * fix unit test * fixed unit tests * trying timeout bump to see if int tests pass * trying timeout bump to see if int tests pass * fix autodiscovery * helm chart fixes * helm and logging
* WIP * renamed and moved tasks (WIP) * minio migration * bug fixes and finally add document batch storage * WIP: can suceed but status is error * WIP * import fixes * working v1 of decoupled * catastrophe handling * refactor * remove unused db session in prep for new approach * renaming and docstrings (untested) * renames * WIP with no more indexing fences * robustness improvements * clean up rebase * migration and salesforce rate limits * minor tweaks * test fix * connector pausing behavior * correct checkpoint resumption logic * cleanups in docfetching * add heartbeat file * update template jsonc * deployment fixes * fix vespa httpx pool * error handling * cosmetic fixes * dumb * logging improvements and non checkpointed connector fixes * didnt save * misc fixes * fix import * fix deletion of old files * add in attempt prefix * fix attempt prefix * tiny log improvement * minor changes * fixed resumption behavior * passing int tests * fix unit test * fixed unit tests * trying timeout bump to see if int tests pass * trying timeout bump to see if int tests pass * fix autodiscovery * helm chart fixes * helm and logging
Description
Addresses https://linear.app/danswer/issue/DAN-2151/decoupled-indexing-pipeline
Note: the following summary describes a single connector indexing, ignoring the mechanisms we use for preventing race conditions, overlapping connector runs, etc
Pre-this-PR indexing pipeline:
0. "check for indexing" task runs, which calls "try_creating_indexing_task" which spawns a new process
for a new scheduled index attempt. That new process (which runs the connector_indexing_task function) does the following:
Note that steps 1-6 are all done in the same spawned process. In this (draft) PR, we decouple step 1 from the remaining steps 2-6 to allow them to run in parallel. On a high level, the approach is to run step (1) in a task that writes its results to a blob store and have steps (2-6) run in an independent task that reads from that blob store as input.
An early implementation had the following issues:
a) both new tasks are run directly with send_task
b) the blob store implementations are only local file storage and s3; need to implement Azure, GCP. Also need to check the actual implementation code to make sure it leverages existing code
c) several minor features from the old implementation are missing, most notably the indexing callback. It isn't clear to me how to restructure that callback across multiple new tasks (each task gets its own callback with information synchronized by index attempt and tenant id maybe?)
^^ these are now addressed, remaining issues:
a) some of the code is located in the wrong files, some dead code (_run_indexing in particular since I was using it as a reference)
b) The error raised when pausing a connector mid-run isn't caught correctly, leading to an "error" state when it should be cancelled.
c) need to test with multi-tenant
d) Want to test with a variety of connectors and varying levels of parallelism
e) TODO: tell docfetching to back off if the indexing jobs are piling up. Also will need logic for fully stopping docfetching and marking the attempt as an error if i.e. the index tasks are just never picking anything up
How Has This Been Tested?
N/A but planning to test across the main connectors in the UI and with all blob store solutions
Backporting (check the box to trigger backport action)
Note: You have to check that the action passes, otherwise resolve the conflicts manually and tag the patches.