diff --git a/backend/onyx/connectors/web/connector.py b/backend/onyx/connectors/web/connector.py index 2c3ea064d86..f15632b1037 100644 --- a/backend/onyx/connectors/web/connector.py +++ b/backend/onyx/connectors/web/connector.py @@ -359,6 +359,7 @@ def load_from_state(self) -> GenerateDocumentsOutput: continue parsed_html = web_html_cleanup(soup, self.mintlify_cleanup) + doc_batch.append( Document( id=current_url, diff --git a/backend/onyx/db/models.py b/backend/onyx/db/models.py index 730a69de284..47170f93b22 100644 --- a/backend/onyx/db/models.py +++ b/backend/onyx/db/models.py @@ -504,7 +504,6 @@ class Document(Base): last_synced: Mapped[datetime.datetime | None] = mapped_column( DateTime(timezone=True), nullable=True, index=True ) - # The following are not attached to User because the account/email may not be known # within Onyx # Something like the document creator diff --git a/backend/onyx/document_index/interfaces.py b/backend/onyx/document_index/interfaces.py index 3d27415a069..1f6386b09ea 100644 --- a/backend/onyx/document_index/interfaces.py +++ b/backend/onyx/document_index/interfaces.py @@ -148,6 +148,7 @@ class Indexable(abc.ABC): def index( self, chunks: list[DocMetadataAwareIndexChunk], + fresh_index: bool = False, ) -> set[DocumentInsertionRecord]: """ Takes a list of document chunks and indexes them in the document index @@ -165,9 +166,15 @@ def index( only needs to index chunks into the PRIMARY index. Do not update the secondary index here, it is done automatically outside of this code. + NOTE: The fresh_index parameter, when set to True, assumes no documents have been previously + indexed for the given index/tenant. This can be used to optimize the indexing process for + new or empty indices. + Parameters: - chunks: Document chunks with all of the information needed for indexing to the document index. + - fresh_index: Boolean indicating whether this is a fresh index with no existing documents. + Returns: List of document ids which map to unique documents and are used for deduping chunks when updating, as well as if the document is newly indexed or already existed and diff --git a/backend/onyx/document_index/vespa/app_config/schemas/danswer_chunk.sd b/backend/onyx/document_index/vespa/app_config/schemas/danswer_chunk.sd index 4f2aaa9a7d4..2fd861b779e 100644 --- a/backend/onyx/document_index/vespa/app_config/schemas/danswer_chunk.sd +++ b/backend/onyx/document_index/vespa/app_config/schemas/danswer_chunk.sd @@ -10,9 +10,6 @@ schema DANSWER_CHUNK_NAME { field chunk_id type int { indexing: summary | attribute } - field current_index_time type int { - indexing: summary | attribute - } # Displayed in the UI as the main identifier for the doc field semantic_identifier type string { indexing: summary | attribute diff --git a/backend/onyx/document_index/vespa/index.py b/backend/onyx/document_index/vespa/index.py index abdd37d1baf..1b7478f8cd3 100644 --- a/backend/onyx/document_index/vespa/index.py +++ b/backend/onyx/document_index/vespa/index.py @@ -42,15 +42,12 @@ from onyx.document_index.vespa.indexing_utils import batch_index_vespa_chunks from onyx.document_index.vespa.indexing_utils import clean_chunk_id_copy from onyx.document_index.vespa.indexing_utils import ( - find_existing_docs_in_vespa_by_doc_id, + get_existing_documents_from_chunks, ) from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client from onyx.document_index.vespa.shared_utils.utils import ( replace_invalid_doc_id_characters, ) -from onyx.document_index.vespa.shared_utils.vespa_request_builders import ( - build_deletion_selection_query, -) from onyx.document_index.vespa.shared_utils.vespa_request_builders import ( build_vespa_filters, ) @@ -310,35 +307,47 @@ def register_multitenant_indices( def index( self, chunks: list[DocMetadataAwareIndexChunk], + fresh_index: bool = False, ) -> set[DocumentInsertionRecord]: - """ - Index a list of chunks into Vespa. We rely on 'current_index_time' - to keep track of when each chunk was added/updated in the index. We also raise a ValueError - if any chunk is missing a 'current_index_time' timestamp. - """ - - # Clean chunks if needed (remove invalid chars, etc.) + """Receive a list of chunks from a batch of documents and index the chunks into Vespa along + with updating the associated permissions. Assumes that a document will not be split into + multiple chunk batches calling this function multiple times, otherwise only the last set of + chunks will be kept""" + # IMPORTANT: This must be done one index at a time, do not use secondary index here cleaned_chunks = [clean_chunk_id_copy(chunk) for chunk in chunks] - # We will store the set of doc_ids that previously existed in Vespa - doc_ids_to_current_index_time = { - chunk.source_document.id: chunk.current_index_time - for chunk in cleaned_chunks - } - existing_doc_ids = set() - - with get_vespa_http_client() as http_client, concurrent.futures.ThreadPoolExecutor( - max_workers=NUM_THREADS - ) as executor: - # a) Find which docs already exist in Vespa - existing_doc_ids = find_existing_docs_in_vespa_by_doc_id( - doc_ids=list(doc_ids_to_current_index_time.keys()), - index_name=self.index_name, - http_client=http_client, - executor=executor, - ) + existing_docs: set[str] = set() + + # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for + # indexing / updates / deletes since we have to make a large volume of requests. + with ( + concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, + get_vespa_http_client() as http_client, + ): + if not fresh_index: + # Check for existing documents, existing documents need to have all of their chunks deleted + # prior to indexing as the document size (num chunks) may have shrunk + first_chunks = [ + chunk for chunk in cleaned_chunks if chunk.chunk_id == 0 + ] + for chunk_batch in batch_generator(first_chunks, BATCH_SIZE): + existing_docs.update( + get_existing_documents_from_chunks( + chunks=chunk_batch, + index_name=self.index_name, + http_client=http_client, + executor=executor, + ) + ) + + for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE): + delete_vespa_docs( + document_ids=doc_id_batch, + index_name=self.index_name, + http_client=http_client, + executor=executor, + ) - # b) Feed new/updated chunks in batches for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE): batch_index_vespa_chunks( chunks=chunk_batch, @@ -348,34 +357,14 @@ def index( executor=executor, ) - # c) Remove chunks with using versioning scheme 'current_index_time' - for doc_id in existing_doc_ids: - version_cutoff = int(doc_ids_to_current_index_time[doc_id].timestamp()) - query_str = build_deletion_selection_query( - doc_id=doc_id, - version_cutoff=version_cutoff, - doc_type=self.index_name, - ) - delete_url = ( - f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}/" - f"?{query_str}&cluster={DOCUMENT_INDEX_NAME}" - ) - try: - resp = http_client.delete(delete_url) - resp.raise_for_status() - except httpx.HTTPStatusError: - logger.exception( - f"Selection-based delete failed for doc_id='{doc_id}'" - ) - raise + all_doc_ids = {chunk.source_document.id for chunk in cleaned_chunks} - # Produce insertion records specifying which documents existed prior return { DocumentInsertionRecord( document_id=doc_id, - already_existed=(doc_id in existing_doc_ids), + already_existed=doc_id in existing_docs, ) - for doc_id in doc_ids_to_current_index_time + for doc_id in all_doc_ids } @staticmethod diff --git a/backend/onyx/document_index/vespa/indexing_utils.py b/backend/onyx/document_index/vespa/indexing_utils.py index 1d52c2e67e7..bfb0bd94163 100644 --- a/backend/onyx/document_index/vespa/indexing_utils.py +++ b/backend/onyx/document_index/vespa/indexing_utils.py @@ -1,11 +1,8 @@ import concurrent.futures import json -import urllib.parse from datetime import datetime from datetime import timezone from http import HTTPStatus -from typing import List -from typing import Set import httpx from retry import retry @@ -24,7 +21,6 @@ from onyx.document_index.vespa_constants import CHUNK_ID from onyx.document_index.vespa_constants import CONTENT from onyx.document_index.vespa_constants import CONTENT_SUMMARY -from onyx.document_index.vespa_constants import CURRENT_INDEX_TIME from onyx.document_index.vespa_constants import DOC_UPDATED_AT from onyx.document_index.vespa_constants import DOCUMENT_ID from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT @@ -36,7 +32,6 @@ from onyx.document_index.vespa_constants import METADATA_SUFFIX from onyx.document_index.vespa_constants import NUM_THREADS from onyx.document_index.vespa_constants import PRIMARY_OWNERS -from onyx.document_index.vespa_constants import SEARCH_ENDPOINT from onyx.document_index.vespa_constants import SECONDARY_OWNERS from onyx.document_index.vespa_constants import SECTION_CONTINUATION from onyx.document_index.vespa_constants import SEMANTIC_IDENTIFIER @@ -173,7 +168,6 @@ def _index_vespa_chunk( METADATA_SUFFIX: chunk.metadata_suffix_keyword, EMBEDDINGS: embeddings_name_vector_map, TITLE_EMBEDDING: chunk.title_embedding, - CURRENT_INDEX_TIME: _vespa_get_updated_at_attribute(chunk.current_index_time), DOC_UPDATED_AT: _vespa_get_updated_at_attribute(document.doc_updated_at), PRIMARY_OWNERS: get_experts_stores_representations(document.primary_owners), SECONDARY_OWNERS: get_experts_stores_representations(document.secondary_owners), @@ -254,85 +248,3 @@ def clean_chunk_id_copy( } ) return clean_chunk - - -def _does_doc_exist_in_vespa( - doc_id: str, - index_name: str, - http_client: httpx.Client, -) -> bool: - """ - Checks whether there's a chunk/doc matching doc_id in Vespa using YQL. - """ - encoded_doc_id = urllib.parse.quote(doc_id) - - # Construct the URL with YQL query - url = ( - f"{SEARCH_ENDPOINT}" - f'?yql=select+*+from+sources+{index_name}+where+document_id+contains+"{encoded_doc_id}"' - "&hits=0" - ) - - logger.debug(f"Checking existence for doc_id={doc_id} with URL={url}") - resp = http_client.get(url) - - if resp.status_code == 200: - data = resp.json() - try: - total_count = data["root"]["fields"]["totalCount"] - return total_count > 0 - except (KeyError, TypeError): - logger.exception(f"Unexpected JSON structure from {url}: {data}") - raise - - elif resp.status_code == 404: - return False - - else: - logger.warning( - f"Unexpected HTTP {resp.status_code} checking doc existence for doc_id={doc_id}" - ) - return False - - -def find_existing_docs_in_vespa_by_doc_id( - doc_ids: List[str], - index_name: str, - http_client: httpx.Client, - executor: concurrent.futures.ThreadPoolExecutor | None = None, -) -> Set[str]: - """ - For each doc_id in doc_ids, returns whether it already exists in Vespa. - We do this concurrently for performance if doc_ids is large. - """ - if not doc_ids: - return set() - - external_executor = True - if executor is None: - # Create our own if not given - external_executor = False - executor = concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) - - existing_doc_ids = set() - - try: - future_map = { - executor.submit( - _does_doc_exist_in_vespa, doc_id, index_name, http_client - ): doc_id - for doc_id in doc_ids - } - for future in concurrent.futures.as_completed(future_map): - doc_id = future_map[future] - try: - if future.result(): - existing_doc_ids.add(doc_id) - except Exception: - logger.exception(f"Error checking doc existence for doc_id={doc_id}") - raise - - finally: - if not external_executor: - executor.shutdown(wait=True) - return existing_doc_ids diff --git a/backend/onyx/document_index/vespa/shared_utils/vespa_request_builders.py b/backend/onyx/document_index/vespa/shared_utils/vespa_request_builders.py index 85ce5e48e3b..dda75c85337 100644 --- a/backend/onyx/document_index/vespa/shared_utils/vespa_request_builders.py +++ b/backend/onyx/document_index/vespa/shared_utils/vespa_request_builders.py @@ -1,14 +1,12 @@ from datetime import datetime from datetime import timedelta from datetime import timezone -from urllib.parse import urlencode from onyx.configs.constants import INDEX_SEPARATOR from onyx.context.search.models import IndexFilters from onyx.document_index.interfaces import VespaChunkRequest from onyx.document_index.vespa_constants import ACCESS_CONTROL_LIST from onyx.document_index.vespa_constants import CHUNK_ID -from onyx.document_index.vespa_constants import CURRENT_INDEX_TIME from onyx.document_index.vespa_constants import DOC_UPDATED_AT from onyx.document_index.vespa_constants import DOCUMENT_ID from onyx.document_index.vespa_constants import DOCUMENT_SETS @@ -108,24 +106,3 @@ def build_vespa_id_based_retrieval_yql( id_based_retrieval_yql_section += ")" return id_based_retrieval_yql_section - - -def build_deletion_selection_query( - doc_id: str, version_cutoff: int, doc_type: str -) -> str: - """ - Build a Vespa selection expression that includes: - - {doc_type}.document_id == - - {doc_type}.current_index_time < version_cutoff - - Returns the URL-encoded selection query parameter. - """ - # Escape single quotes by doubling them for Vespa selection expressions - escaped_doc_id = doc_id.replace("'", "''") - - filter_str = ( - f"({doc_type}.document_id=='{escaped_doc_id}') and " - f"({doc_type}.{CURRENT_INDEX_TIME} < {version_cutoff})" - ) - - return urlencode({"selection": filter_str}) diff --git a/backend/onyx/document_index/vespa_constants.py b/backend/onyx/document_index/vespa_constants.py index 3bf5ddb9e16..aff4e85566d 100644 --- a/backend/onyx/document_index/vespa_constants.py +++ b/backend/onyx/document_index/vespa_constants.py @@ -52,7 +52,6 @@ TENANT_ID = "tenant_id" DOCUMENT_ID = "document_id" -CURRENT_INDEX_TIME = "current_index_time" CHUNK_ID = "chunk_id" BLURB = "blurb" CONTENT = "content" diff --git a/backend/onyx/indexing/indexing_pipeline.py b/backend/onyx/indexing/indexing_pipeline.py index bf15a71a6bf..f9ed3eb7b8e 100644 --- a/backend/onyx/indexing/indexing_pipeline.py +++ b/backend/onyx/indexing/indexing_pipeline.py @@ -1,7 +1,5 @@ import traceback from collections.abc import Callable -from datetime import datetime -from datetime import timezone from functools import partial from http import HTTPStatus from typing import Protocol @@ -402,8 +400,6 @@ def index_doc_batch( else DEFAULT_BOOST ), tenant_id=tenant_id, - # Use a timezone-aware datetime, here we set to current UTC time - current_index_time=datetime.now(tz=timezone.utc), ) for chunk in chunks_with_embeddings ] diff --git a/backend/onyx/indexing/models.py b/backend/onyx/indexing/models.py index b09d5570a45..e9a155d172b 100644 --- a/backend/onyx/indexing/models.py +++ b/backend/onyx/indexing/models.py @@ -1,4 +1,3 @@ -import datetime from typing import TYPE_CHECKING from pydantic import BaseModel @@ -74,14 +73,12 @@ class DocMetadataAwareIndexChunk(IndexChunk): of. This is used for filtering / personas. boost: influences the ranking of this chunk at query time. Positive -> ranked higher, negative -> ranked lower. - current_index_time: the timestamp of when this chunk is being indexed. """ tenant_id: str | None = None access: "DocumentAccess" document_sets: set[str] boost: int - current_index_time: datetime.datetime @classmethod def from_index_chunk( @@ -91,7 +88,6 @@ def from_index_chunk( document_sets: set[str], boost: int, tenant_id: str | None, - current_index_time: datetime.datetime, ) -> "DocMetadataAwareIndexChunk": index_chunk_data = index_chunk.model_dump() return cls( @@ -100,7 +96,6 @@ def from_index_chunk( document_sets=document_sets, boost=boost, tenant_id=tenant_id, - current_index_time=current_index_time, ) diff --git a/backend/onyx/seeding/load_docs.py b/backend/onyx/seeding/load_docs.py index ed818ce6991..b629b6ac3bc 100644 --- a/backend/onyx/seeding/load_docs.py +++ b/backend/onyx/seeding/load_docs.py @@ -86,7 +86,6 @@ def _create_indexable_chunks( access=default_public_access, document_sets=set(), boost=DEFAULT_BOOST, - current_index_time=datetime.datetime.now(datetime.timezone.utc), ) chunks.append(chunk) @@ -218,7 +217,7 @@ def seed_initial_documents( # as we just sent over the Vespa schema and there is a slight delay index_with_retries = retry_builder(tries=15)(document_index.index) - index_with_retries(chunks=chunks) + index_with_retries(chunks=chunks, fresh_index=cohere_enabled) # Mock a run for the UI even though it did not actually call out to anything mock_successful_index_attempt( diff --git a/backend/scripts/query_time_check/seed_dummy_docs.py b/backend/scripts/query_time_check/seed_dummy_docs.py index 79d506c1d5c..e7a7805690f 100644 --- a/backend/scripts/query_time_check/seed_dummy_docs.py +++ b/backend/scripts/query_time_check/seed_dummy_docs.py @@ -10,7 +10,6 @@ """ import random from datetime import datetime -from datetime import timezone from onyx.access.models import DocumentAccess from onyx.configs.constants import DocumentSource @@ -97,7 +96,6 @@ def generate_dummy_chunk( document_sets={document_set for document_set in document_set_names}, boost=random.randint(-1, 1), tenant_id=POSTGRES_DEFAULT_SCHEMA, - current_index_time=datetime.now(tz=timezone.utc), ) diff --git a/web/src/app/chat/documentSidebar/ChatFilters.tsx b/web/src/app/chat/documentSidebar/ChatFilters.tsx index 98fb9b36c37..f91529f918f 100644 --- a/web/src/app/chat/documentSidebar/ChatFilters.tsx +++ b/web/src/app/chat/documentSidebar/ChatFilters.tsx @@ -81,6 +81,7 @@ export const ChatFilters = forwardRef( const dedupedDocuments = removeDuplicateDocs(currentDocuments || []); const tokenLimitReached = selectedDocumentTokens > maxTokens - 75; + console.log("SELECTED MESSAGE is", selectedMessage); const hasSelectedDocuments = selectedDocumentIds.length > 0;