From 653d1dab4988e83293cc15e6cf3c2782a2134a0d Mon Sep 17 00:00:00 2001 From: mohammadehsanansari Date: Mon, 12 May 2025 00:42:02 +0530 Subject: [PATCH 1/3] feat: add batch delete documents support to API, SDK, CLI, and tests - Added /documents/batch_delete API endpoint for deleting multiple documents at once - Implemented service method to handle deletion and error resilience - Updated SDK (sync + async) to expose batch_delete_documents() - Added CLI command: `morphik batch-delete ` - Added test cases in both async and sync SDKs to verify batch deletion --- core/api.py | 25 +++++++++++++++++++++-- core/models/request.py | 9 +++++++++ core/services/document_service.py | 27 ++++++++++++++++++++++--- sdks/python/morphik/async_.py | 5 +++++ sdks/python/morphik/sync.py | 5 +++++ sdks/python/morphik/tests/test_async.py | 24 ++++++++++++++++++++++ sdks/python/morphik/tests/test_sync.py | 24 ++++++++++++++++++++++ 7 files changed, 114 insertions(+), 5 deletions(-) diff --git a/core/api.py b/core/api.py index 2d1986d1..0ea41a3a 100644 --- a/core/api.py +++ b/core/api.py @@ -35,6 +35,7 @@ from core.models.prompts import validate_prompt_overrides_with_http_exception from core.models.request import ( AgentQueryRequest, + BatchDeleteRequest, BatchIngestResponse, CompletionQueryRequest, CreateGraphRequest, @@ -1058,6 +1059,27 @@ async def delete_document(document_id: str, auth: AuthContext = Depends(verify_t raise HTTPException(status_code=403, detail=str(e)) +@app.post("/documents/batch_delete") +@telemetry.track(operation_type="batch_delete_documents", metadata_resolver=telemetry.document_delete_metadata) +async def batch_delete_documents(request: BatchDeleteRequest, auth: AuthContext = Depends(verify_token)): + """ + Batch delete documents by their IDs. + + Args: + request: List of document IDs in JSON. + auth: AuthContext + + Returns: + Dict: Status and count of deleted documents + """ + try: + deleted_count = await document_service.delete_documents(request.document_ids, auth) + return {"status": "success", "deleted": deleted_count, "requested": len(request.document_ids)} + except Exception as e: + logger.error(f"Batch deletion failed: {e}") + raise HTTPException(status_code=500, detail="Batch deletion failed") + + @app.get("/documents/filename/{filename}", response_model=Document) async def get_document_by_filename( filename: str, @@ -2050,8 +2072,7 @@ async def set_folder_rule( except Exception as rule_apply_error: last_error = rule_apply_error logger.warning( - f"Metadata extraction attempt {retry_count + 1} failed: " - f"{rule_apply_error}" + f"Metadata extraction attempt {retry_count + 1} failed: {rule_apply_error}" ) if retry_count == max_retries - 1: # Last attempt logger.error(f"All {max_retries} metadata extraction attempts failed") diff --git a/core/models/request.py b/core/models/request.py index f4220fdf..b0bc6988 100644 --- a/core/models/request.py +++ b/core/models/request.py @@ -143,3 +143,12 @@ class AgentQueryRequest(BaseModel): """Request model for agent queries""" query: str = Field(..., description="Natural language query for the Morphik agent") + + +class BatchDeleteRequest(BaseModel): + """Request model for delete batch documents""" + + document_ids: List[str] = Field( + ..., + description="List of document IDs to be deleted. Must be a list of strings.", + ) diff --git a/core/services/document_service.py b/core/services/document_service.py index 1a78e5a3..b461e079 100644 --- a/core/services/document_service.py +++ b/core/services/document_service.py @@ -244,7 +244,7 @@ async def retrieve_chunks( chunks = await self.reranker.rerank(query, chunks) chunks.sort(key=lambda x: x.score, reverse=True) chunks = chunks[:k] - logger.debug(f"Reranked {k*10} chunks and selected the top {k}") + logger.debug(f"Reranked {k * 10} chunks and selected the top {k}") # Combine multiple chunk sources if needed chunks = await self._combine_multi_and_regular_chunks( @@ -852,7 +852,7 @@ async def ingest_file_content( if folder_name: try: await self._ensure_folder_exists(folder_name, doc.external_id, auth) - logger.debug(f"Ensured folder '{folder_name}' exists " f"and contains document {doc.external_id}") + logger.debug(f"Ensured folder '{folder_name}' exists and contains document {doc.external_id}") except Exception as e: logger.error( f"Error during _ensure_folder_exists for doc {doc.external_id}" @@ -1210,7 +1210,7 @@ async def store_document_with_retry(): current_retry_delay *= 2 else: logger.error( - f"All database connection attempts failed " f"after {max_retries} retries: {error_msg}" + f"All database connection attempts failed after {max_retries} retries: {error_msg}" ) raise Exception("Failed to store document metadata after multiple retries") else: @@ -2036,6 +2036,27 @@ def _update_metadata_and_version( history.append(entry) + async def delete_documents(self, document_ids: List[str], auth: AuthContext) -> int: + """ + Batch delete documents and their associated data. + + Args: + document_ids: List of document IDs to delete. + auth: Authentication context. + + Returns: + int: Number of documents successfully deleted. + """ + deleted = 0 + for doc_id in document_ids: + try: + success = await self.delete_document(doc_id, auth) + if success: + deleted += 1 + except Exception as e: + logger.warning(f"Failed to delete document {doc_id}: {e}") + return deleted + # ------------------------------------------------------------------ # Helper – choose bucket per app (isolation) # ------------------------------------------------------------------ diff --git a/sdks/python/morphik/async_.py b/sdks/python/morphik/async_.py index d33d3138..53adcfd2 100644 --- a/sdks/python/morphik/async_.py +++ b/sdks/python/morphik/async_.py @@ -2540,3 +2540,8 @@ async def wait_for_graph_completion( raise RuntimeError(graph.error or "Graph processing failed") await asyncio.sleep(check_interval_seconds) raise TimeoutError("Timed out waiting for graph completion") + + async def batch_delete_documents(self, document_ids: List[str]) -> int: + """Delete multiple documents by their IDs (async).""" + response = await self._request("POST", "documents/batch_delete", data={"document_ids": document_ids}) + return response["deleted"] diff --git a/sdks/python/morphik/sync.py b/sdks/python/morphik/sync.py index 577a97d3..fc34d33e 100644 --- a/sdks/python/morphik/sync.py +++ b/sdks/python/morphik/sync.py @@ -2735,3 +2735,8 @@ def wait_for_graph_completion( raise RuntimeError(graph.error or "Graph processing failed") time.sleep(check_interval_seconds) raise TimeoutError("Timed out waiting for graph completion") + + def batch_delete_documents(self, document_ids: List[str]) -> int: + """Delete multiple documents by their IDs.""" + response = self._request("POST", "documents/batch_delete", data={"document_ids": document_ids}) + return response["deleted"] diff --git a/sdks/python/morphik/tests/test_async.py b/sdks/python/morphik/tests/test_async.py index 1880be49..e33120a7 100644 --- a/sdks/python/morphik/tests/test_async.py +++ b/sdks/python/morphik/tests/test_async.py @@ -382,3 +382,27 @@ async def test_query_with_dict_schema(self, db): finally: await db.delete_document(doc.external_id) + + @pytest.mark.asyncio + async def test_batch_delete_documents(self, db): + """Test batch deleting multiple documents""" + # Given + doc1 = await db.ingest_text( + content="This is batch delete document 1", + filename=f"batch_del_1_{uuid.uuid4().hex[:6]}.txt", + metadata={"test_case": "batch_delete"}, + ) + doc2 = await db.ingest_text( + content="This is batch delete document 2", + filename=f"batch_del_2_{uuid.uuid4().hex[:6]}.txt", + metadata={"test_case": "batch_delete"}, + ) + + # Then + assert doc1.external_id and doc2.external_id + + # When + deleted_count = await db.batch_delete_documents([doc1.external_id, doc2.external_id]) + + # Then + assert deleted_count == 2 diff --git a/sdks/python/morphik/tests/test_sync.py b/sdks/python/morphik/tests/test_sync.py index 6a73b22a..e17dd3d0 100644 --- a/sdks/python/morphik/tests/test_sync.py +++ b/sdks/python/morphik/tests/test_sync.py @@ -369,3 +369,27 @@ def test_query_with_dict_schema(self, db): finally: db.delete_document(doc.external_id) + + def test_batch_delete_documents(self, db): + """Test batch deleting multiple documents (sync)""" + # Given + doc1 = db.ingest_text( + content="First document to test batch delete", + filename=f"batch_delete_1_{uuid.uuid4().hex[:6]}.txt", + metadata={"test_id": "sync_batch_delete_test"}, + ) + doc2 = db.ingest_text( + content="Second document to test batch delete", + filename=f"batch_delete_2_{uuid.uuid4().hex[:6]}.txt", + metadata={"test_id": "sync_batch_delete_test"}, + ) + + # Then + assert doc1.external_id is not None + assert doc2.external_id is not None + + # When + deleted = db.batch_delete_documents([doc1.external_id, doc2.external_id]) + + # Then + assert deleted == 2 From 44f3f462104d62cbeb87c2702e64e757defee625 Mon Sep 17 00:00:00 2001 From: mohammadehsanansari Date: Mon, 12 May 2025 01:10:48 +0530 Subject: [PATCH 2/3] feat(api): Add telemetry and improved error handling to batch document deletion - Added limit check for batch size (MAX_BATCH_DELETE) to prevent abuse - Integrated telemetry metadata tracking for batch_delete_documents API - Enhanced error handling with specific logs and user-safe error messages --- core/api.py | 9 +++++++++ core/services/telemetry.py | 14 +++++++++++++- shell.py | 13 +++++++++++++ 3 files changed, 35 insertions(+), 1 deletion(-) diff --git a/core/api.py b/core/api.py index 0ea41a3a..171fb37e 100644 --- a/core/api.py +++ b/core/api.py @@ -1059,6 +1059,9 @@ async def delete_document(document_id: str, auth: AuthContext = Depends(verify_t raise HTTPException(status_code=403, detail=str(e)) +MAX_BATCH_DELETE = 100 + + @app.post("/documents/batch_delete") @telemetry.track(operation_type="batch_delete_documents", metadata_resolver=telemetry.document_delete_metadata) async def batch_delete_documents(request: BatchDeleteRequest, auth: AuthContext = Depends(verify_token)): @@ -1072,6 +1075,12 @@ async def batch_delete_documents(request: BatchDeleteRequest, auth: AuthContext Returns: Dict: Status and count of deleted documents """ + document_ids = request.document_ids + if not document_ids: + raise HTTPException(status_code=400, detail="No document IDs provided for deletion") + if len(document_ids) > MAX_BATCH_DELETE: + raise HTTPException(status_code=400, detail=f"Batch size exceeds maximum limit of {MAX_BATCH_DELETE}") + try: deleted_count = await document_service.delete_documents(request.document_ids, auth) return {"status": "success", "deleted": deleted_count, "requested": len(request.document_ids)} diff --git a/core/services/telemetry.py b/core/services/telemetry.py index cf759c79..e872c77f 100644 --- a/core/services/telemetry.py +++ b/core/services/telemetry.py @@ -295,7 +295,7 @@ def export(self, spans): # Use exponential backoff delay = self.retry_delay * (2 ** (retries - 1)) self.logger.warning( - f"Honeycomb trace export attempt {retries} failed: {str(e)}. " f"Retrying in {delay}s..." + f"Honeycomb trace export attempt {retries} failed: {str(e)}. Retrying in {delay}s..." ) time.sleep(delay) else: @@ -809,6 +809,18 @@ def _setup_metadata_extractors(self): ] ) + self.batch_document_delete_metadata = MetadataExtractor( + [ + MetadataField( + "document_count", + "request", + transform=lambda req: len(req.document_ids) if req and hasattr(req, "document_ids") else 0, + ), + MetadataField("folder_name", "request"), + MetadataField("end_user_id", "request"), + ] + ) + def track(self, operation_type: Optional[str] = None, metadata_resolver: Optional[Callable] = None): """ Decorator for tracking API operations with telemetry. diff --git a/shell.py b/shell.py index ed101666..c29e036e 100644 --- a/shell.py +++ b/shell.py @@ -811,6 +811,18 @@ def list_graphs(self) -> list: graphs = self._client.list_graphs() return [graph.model_dump() for graph in graphs] if graphs else [] + def batch_delete_documents(self, document_ids: List[str]) -> dict: + """ + Delete multiple documents in a single batch. + + Args: + document_ids: List of document external_ids to delete + + Returns: + dict: Deletion result from the server + """ + return self._client.batch_delete_documents(document_ids) + def close(self): """Close the client connection""" self._client.close() @@ -899,6 +911,7 @@ def query(self, query: str, max_tokens: int = None, temperature: float = None) - print(" db.create_graph('knowledge_graph', filters={'category': 'research'})") print(" db.query('How does X relate to Y?', graph_name='knowledge_graph', include_paths=True)") print("Type help(db) for documentation.") + print(" db.batch_delete_documents(['doc_id1', 'doc_id2'])") # Start the shell shell.interact(banner="") From 22b1711d624a718c8d024921846daa9b8ea9cfae Mon Sep 17 00:00:00 2001 From: mohammadehsanansari Date: Mon, 12 May 2025 01:20:11 +0530 Subject: [PATCH 3/3] added testcase --- core/api.py | 10 ++++++++-- core/services/document_service.py | 18 +++++++++++++----- sdks/python/morphik/tests/test_async.py | 19 +++++++++++++++++-- sdks/python/morphik/tests/test_sync.py | 18 ++++++++++++++++-- 4 files changed, 54 insertions(+), 11 deletions(-) diff --git a/core/api.py b/core/api.py index 171fb37e..dcf4ba7f 100644 --- a/core/api.py +++ b/core/api.py @@ -1082,8 +1082,14 @@ async def batch_delete_documents(request: BatchDeleteRequest, auth: AuthContext raise HTTPException(status_code=400, detail=f"Batch size exceeds maximum limit of {MAX_BATCH_DELETE}") try: - deleted_count = await document_service.delete_documents(request.document_ids, auth) - return {"status": "success", "deleted": deleted_count, "requested": len(request.document_ids)} + success, failed = await document_service.delete_documents(request.document_ids, auth) + return { + "status": "partial_success" if failed else "success", + "deleted_count": len(success), + "failed_count": len(failed), + "deleted_ids": success, + "failed_ids": failed, + } except Exception as e: logger.error(f"Batch deletion failed: {e}") raise HTTPException(status_code=500, detail="Batch deletion failed") diff --git a/core/services/document_service.py b/core/services/document_service.py index b461e079..a6763c04 100644 --- a/core/services/document_service.py +++ b/core/services/document_service.py @@ -2036,7 +2036,11 @@ def _update_metadata_and_version( history.append(entry) - async def delete_documents(self, document_ids: List[str], auth: AuthContext) -> int: + async def delete_documents( + self, + document_ids: List[str], + auth: AuthContext, + ) -> tuple[list[str | None], list[str | None]]: """ Batch delete documents and their associated data. @@ -2045,17 +2049,21 @@ async def delete_documents(self, document_ids: List[str], auth: AuthContext) -> auth: Authentication context. Returns: - int: Number of documents successfully deleted. + tuple: A tuple containing two lists: + - List of successfully deleted document IDs. + - List of failed document IDs. """ - deleted = 0 + success = [] + failed = [] for doc_id in document_ids: try: success = await self.delete_document(doc_id, auth) if success: - deleted += 1 + success.append(doc_id) except Exception as e: logger.warning(f"Failed to delete document {doc_id}: {e}") - return deleted + failed.append(doc_id) + return success, failed # ------------------------------------------------------------------ # Helper – choose bucket per app (isolation) diff --git a/sdks/python/morphik/tests/test_async.py b/sdks/python/morphik/tests/test_async.py index e33120a7..91acc6c8 100644 --- a/sdks/python/morphik/tests/test_async.py +++ b/sdks/python/morphik/tests/test_async.py @@ -402,7 +402,22 @@ async def test_batch_delete_documents(self, db): assert doc1.external_id and doc2.external_id # When - deleted_count = await db.batch_delete_documents([doc1.external_id, doc2.external_id]) + success, failed = await db.batch_delete_documents([doc1.external_id, doc2.external_id]) # Then - assert deleted_count == 2 + assert len(success) == 2 + assert len(failed) == 0 + + @pytest.mark.asyncio + async def test_batch_delete_with_invalid_id(self, db): + doc = await db.ingest_text( + content="This is batch delete document 1", + filename=f"batch_del_1_{uuid.uuid4().hex[:6]}.txt", + metadata={"test_case": "batch_delete"}, + ) + + # Include a fake ID + success, failed = db.batch_delete_documents([doc.external_id, "nonexistent_id_123"]) + + assert doc.external_id in success + assert "nonexistent_id_123" in failed diff --git a/sdks/python/morphik/tests/test_sync.py b/sdks/python/morphik/tests/test_sync.py index e17dd3d0..94e9ea8b 100644 --- a/sdks/python/morphik/tests/test_sync.py +++ b/sdks/python/morphik/tests/test_sync.py @@ -389,7 +389,21 @@ def test_batch_delete_documents(self, db): assert doc2.external_id is not None # When - deleted = db.batch_delete_documents([doc1.external_id, doc2.external_id]) + success, failed = db.batch_delete_documents([doc1.external_id, doc2.external_id]) # Then - assert deleted == 2 + assert len(success) == 2 + assert len(failed) == 0 + + def test_batch_delete_with_invalid_id(self, db): + doc = db.ingest_text( + content="Valid document", + filename=f"batch_delete_valid_{uuid.uuid4().hex[:6]}.txt", + metadata={"test_id": "partial_batch_delete_test"}, + ) + + # Include a fake ID + success, failed = db.batch_delete_documents([doc.external_id, "nonexistent_id_123"]) + + assert doc.external_id in success + assert "nonexistent_id_123" in failed