From f85c2059d3f80a425f229868533f740a12e40af9 Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Fri, 25 Oct 2024 18:56:07 -0700 Subject: [PATCH 1/2] tenant seeding docs --- backend/danswer/background/celery/apps/beat.py | 1 + backend/danswer/document_index/vespa/index.py | 6 ++++++ .../danswer/document_index/vespa/indexing_utils.py | 3 ++- backend/danswer/document_index/vespa_constants.py | 1 + backend/danswer/main.py | 2 +- backend/danswer/seeding/load_docs.py | 13 +++++++------ backend/danswer/setup.py | 4 ++-- backend/ee/danswer/server/tenants/api.py | 2 +- 8 files changed, 21 insertions(+), 11 deletions(-) diff --git a/backend/danswer/background/celery/apps/beat.py b/backend/danswer/background/celery/apps/beat.py index 8ddc17efc52..f88295fa139 100644 --- a/backend/danswer/background/celery/apps/beat.py +++ b/backend/danswer/background/celery/apps/beat.py @@ -78,6 +78,7 @@ def on_setup_logging( }, ] + # Build the celery beat schedule dynamically beat_schedule = {} diff --git a/backend/danswer/document_index/vespa/index.py b/backend/danswer/document_index/vespa/index.py index 86bc481e573..59522157727 100644 --- a/backend/danswer/document_index/vespa/index.py +++ b/backend/danswer/document_index/vespa/index.py @@ -311,6 +311,8 @@ def index( with updating the associated permissions. Assumes that a document will not be split into multiple chunk batches calling this function multiple times, otherwise only the last set of chunks will be kept""" + print("INDEXING") + print(chunks[0].tenant_id) # IMPORTANT: This must be done one index at a time, do not use secondary index here cleaned_chunks = [clean_chunk_id_copy(chunk) for chunk in chunks] @@ -322,10 +324,12 @@ def index( concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, get_vespa_http_client() as http_client, ): + print("existing docs") # Check for existing documents, existing documents need to have all of their chunks deleted # prior to indexing as the document size (num chunks) may have shrunk first_chunks = [chunk for chunk in cleaned_chunks if chunk.chunk_id == 0] for chunk_batch in batch_generator(first_chunks, BATCH_SIZE): + print("batch") existing_docs.update( get_existing_documents_from_chunks( chunks=chunk_batch, @@ -334,6 +338,7 @@ def index( executor=executor, ) ) + print("delete docs ") for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE): delete_vespa_docs( @@ -343,6 +348,7 @@ def index( executor=executor, ) + print("index chunks") for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE): batch_index_vespa_chunks( chunks=chunk_batch, diff --git a/backend/danswer/document_index/vespa/indexing_utils.py b/backend/danswer/document_index/vespa/indexing_utils.py index 8ecdc22672b..1e7d7a6d25c 100644 --- a/backend/danswer/document_index/vespa/indexing_utils.py +++ b/backend/danswer/document_index/vespa/indexing_utils.py @@ -57,7 +57,7 @@ def _does_document_exist( chunk. This checks for whether the chunk exists already in the index""" doc_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}" doc_fetch_response = http_client.get(doc_url) - + print("doc fetch response") if doc_fetch_response.status_code == 404: return False @@ -96,6 +96,7 @@ def get_existing_documents_from_chunks( document_ids: set[str] = set() try: + print("chunk existence future") chunk_existence_future = { executor.submit( _does_document_exist, diff --git a/backend/danswer/document_index/vespa_constants.py b/backend/danswer/document_index/vespa_constants.py index d4a36ef9725..30039922f1a 100644 --- a/backend/danswer/document_index/vespa_constants.py +++ b/backend/danswer/document_index/vespa_constants.py @@ -29,6 +29,7 @@ # main search application VESPA_APP_CONTAINER_URL = VESPA_CLOUD_URL or f"http://{VESPA_HOST}:{VESPA_PORT}" + # danswer_chunk below is defined in vespa/app_configs/schemas/danswer_chunk.sd DOCUMENT_ID_ENDPOINT = ( f"{VESPA_APP_CONTAINER_URL}/document/v1/default/{{index_name}}/docid" diff --git a/backend/danswer/main.py b/backend/danswer/main.py index a6a338b4c4c..5244d5f1ad7 100644 --- a/backend/danswer/main.py +++ b/backend/danswer/main.py @@ -183,7 +183,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator: # If we are multi-tenant, we need to only set up initial public tables with Session(engine) as db_session: - setup_danswer(db_session) + setup_danswer(db_session, None) else: setup_multitenant_danswer() diff --git a/backend/danswer/seeding/load_docs.py b/backend/danswer/seeding/load_docs.py index 2756e7ddf61..edd52d9b590 100644 --- a/backend/danswer/seeding/load_docs.py +++ b/backend/danswer/seeding/load_docs.py @@ -31,7 +31,6 @@ from danswer.key_value_store.interface import KvKeyNotFoundError from danswer.server.documents.models import ConnectorBase from danswer.utils.logger import setup_logger -from danswer.utils.retry_wrapper import retry_builder logger = setup_logger() @@ -39,6 +38,7 @@ def _create_indexable_chunks( preprocessed_docs: list[dict], + tenant_id: str | None, ) -> tuple[list[Document], list[DocMetadataAwareIndexChunk]]: ids_to_documents = {} chunks = [] @@ -80,7 +80,7 @@ def _create_indexable_chunks( mini_chunk_embeddings=[], ), title_embedding=preprocessed_doc["title_embedding"], - tenant_id=None, + tenant_id=tenant_id, access=default_public_access, document_sets=set(), boost=DEFAULT_BOOST, @@ -90,7 +90,7 @@ def _create_indexable_chunks( return list(ids_to_documents.values()), chunks -def seed_initial_documents(db_session: Session) -> None: +def seed_initial_documents(db_session: Session, tenant_id: str | None) -> None: """ Seed initial documents so users don't have an empty index to start @@ -177,7 +177,7 @@ def seed_initial_documents(db_session: Session) -> None: ) processed_docs = json.load(open(initial_docs_path)) - docs, chunks = _create_indexable_chunks(processed_docs) + docs, chunks = _create_indexable_chunks(processed_docs, tenant_id) index_doc_batch_prepare( document_batch=docs, @@ -198,8 +198,9 @@ def seed_initial_documents(db_session: Session) -> None: # Retries here because the index may take a few seconds to become ready # as we just sent over the Vespa schema and there is a slight delay - index_with_retries = retry_builder()(document_index.index) - index_with_retries(chunks=chunks) + + document_index.index(chunks=chunks) + # index_with_retries(chunks=chunks) # Mock a run for the UI even though it did not actually call out to anything mock_successful_index_attempt( diff --git a/backend/danswer/setup.py b/backend/danswer/setup.py index a27580a2b15..426dc5e4b59 100644 --- a/backend/danswer/setup.py +++ b/backend/danswer/setup.py @@ -58,7 +58,7 @@ logger = setup_logger() -def setup_danswer(db_session: Session) -> None: +def setup_danswer(db_session: Session, tenant_id: str | None) -> None: """ Setup Danswer for a particular tenant. In the Single Tenant case, it will set it up for the default schema on server startup. In the MT case, it will be called when the tenant is created. @@ -147,7 +147,7 @@ def setup_danswer(db_session: Session) -> None: # update multipass indexing setting based on GPU availability update_default_multipass_indexing(db_session) - seed_initial_documents(db_session) + seed_initial_documents(db_session, tenant_id) def translate_saved_search_settings(db_session: Session) -> None: diff --git a/backend/ee/danswer/server/tenants/api.py b/backend/ee/danswer/server/tenants/api.py index 342554c1c43..3776c726e1d 100644 --- a/backend/ee/danswer/server/tenants/api.py +++ b/backend/ee/danswer/server/tenants/api.py @@ -59,7 +59,7 @@ def create_tenant( run_alembic_migrations(tenant_id) with get_session_with_tenant(tenant_id) as db_session: - setup_danswer(db_session) + setup_danswer(db_session, tenant_id) add_users_to_tenant([email], tenant_id) From 2313a1909db289f6920ce8f083f98ac2b710da7a Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Fri, 25 Oct 2024 18:59:10 -0700 Subject: [PATCH 2/2] k --- backend/danswer/document_index/vespa/index.py | 6 ------ backend/danswer/document_index/vespa/indexing_utils.py | 2 -- backend/danswer/seeding/load_docs.py | 5 +++-- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/backend/danswer/document_index/vespa/index.py b/backend/danswer/document_index/vespa/index.py index 59522157727..86bc481e573 100644 --- a/backend/danswer/document_index/vespa/index.py +++ b/backend/danswer/document_index/vespa/index.py @@ -311,8 +311,6 @@ def index( with updating the associated permissions. Assumes that a document will not be split into multiple chunk batches calling this function multiple times, otherwise only the last set of chunks will be kept""" - print("INDEXING") - print(chunks[0].tenant_id) # IMPORTANT: This must be done one index at a time, do not use secondary index here cleaned_chunks = [clean_chunk_id_copy(chunk) for chunk in chunks] @@ -324,12 +322,10 @@ def index( concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, get_vespa_http_client() as http_client, ): - print("existing docs") # Check for existing documents, existing documents need to have all of their chunks deleted # prior to indexing as the document size (num chunks) may have shrunk first_chunks = [chunk for chunk in cleaned_chunks if chunk.chunk_id == 0] for chunk_batch in batch_generator(first_chunks, BATCH_SIZE): - print("batch") existing_docs.update( get_existing_documents_from_chunks( chunks=chunk_batch, @@ -338,7 +334,6 @@ def index( executor=executor, ) ) - print("delete docs ") for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE): delete_vespa_docs( @@ -348,7 +343,6 @@ def index( executor=executor, ) - print("index chunks") for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE): batch_index_vespa_chunks( chunks=chunk_batch, diff --git a/backend/danswer/document_index/vespa/indexing_utils.py b/backend/danswer/document_index/vespa/indexing_utils.py index 1e7d7a6d25c..aafc6bf4efe 100644 --- a/backend/danswer/document_index/vespa/indexing_utils.py +++ b/backend/danswer/document_index/vespa/indexing_utils.py @@ -57,7 +57,6 @@ def _does_document_exist( chunk. This checks for whether the chunk exists already in the index""" doc_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}" doc_fetch_response = http_client.get(doc_url) - print("doc fetch response") if doc_fetch_response.status_code == 404: return False @@ -96,7 +95,6 @@ def get_existing_documents_from_chunks( document_ids: set[str] = set() try: - print("chunk existence future") chunk_existence_future = { executor.submit( _does_document_exist, diff --git a/backend/danswer/seeding/load_docs.py b/backend/danswer/seeding/load_docs.py index edd52d9b590..2e9c13b10ba 100644 --- a/backend/danswer/seeding/load_docs.py +++ b/backend/danswer/seeding/load_docs.py @@ -31,6 +31,7 @@ from danswer.key_value_store.interface import KvKeyNotFoundError from danswer.server.documents.models import ConnectorBase from danswer.utils.logger import setup_logger +from danswer.utils.retry_wrapper import retry_builder logger = setup_logger() @@ -199,8 +200,8 @@ def seed_initial_documents(db_session: Session, tenant_id: str | None) -> None: # Retries here because the index may take a few seconds to become ready # as we just sent over the Vespa schema and there is a slight delay - document_index.index(chunks=chunks) - # index_with_retries(chunks=chunks) + index_with_retries = retry_builder()(document_index.index) + index_with_retries(chunks=chunks) # Mock a run for the UI even though it did not actually call out to anything mock_successful_index_attempt(