Skip to content

Commit 4c7a2e4

Browse files
authored
fix: skip huge files on sdk fallback (#5421)
1 parent 01e0ba6 commit 4c7a2e4

File tree

2 files changed

+92
-38
lines changed

2 files changed

+92
-38
lines changed

backend/onyx/connectors/sharepoint/connector.py

Lines changed: 60 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ class SharepointAuthMethod(Enum):
142142
CERTIFICATE = "certificate"
143143

144144

145+
class SizeCapExceeded(Exception):
146+
"""Exception raised when the size cap is exceeded."""
147+
148+
145149
def load_certificate_from_pfx(pfx_data: bytes, password: str) -> CertificateData | None:
146150
"""Load certificate from .pfx file for MSAL authentication"""
147151
try:
@@ -240,7 +244,7 @@ def _download_with_cap(url: str, timeout: int, cap: int) -> bytes:
240244
Behavior:
241245
- Checks `Content-Length` first and aborts early if it exceeds `cap`.
242246
- Otherwise streams the body in chunks and stops once `cap` is surpassed.
243-
- Raises `RuntimeError('size_cap_exceeded')` when the cap would be exceeded.
247+
- Raises `SizeCapExceeded` when the cap would be exceeded.
244248
- Returns the full bytes if the content fits within `cap`.
245249
"""
246250
with requests.get(url, stream=True, timeout=timeout) as resp:
@@ -254,7 +258,7 @@ def _download_with_cap(url: str, timeout: int, cap: int) -> bytes:
254258
logger.warning(
255259
f"Content-Length {content_len} exceeds cap {cap}; skipping download."
256260
)
257-
raise RuntimeError("size_cap_exceeded")
261+
raise SizeCapExceeded("pre_download")
258262

259263
buf = io.BytesIO()
260264
# Stream in 64KB chunks; adjust if needed for slower networks.
@@ -267,11 +271,32 @@ def _download_with_cap(url: str, timeout: int, cap: int) -> bytes:
267271
logger.warning(
268272
f"Streaming download exceeded cap {cap} bytes; aborting early."
269273
)
270-
raise RuntimeError("size_cap_exceeded")
274+
raise SizeCapExceeded("during_download")
271275

272276
return buf.getvalue()
273277

274278

279+
def _download_via_sdk_with_cap(
280+
driveitem: DriveItem, bytes_allowed: int, chunk_size: int = 64 * 1024
281+
) -> bytes:
282+
"""Use the Office365 SDK streaming download with a hard byte cap.
283+
284+
Raises SizeCapExceeded("during_sdk_download") if the cap would be exceeded.
285+
"""
286+
buf = io.BytesIO()
287+
288+
def on_chunk(bytes_read: int) -> None:
289+
# bytes_read is total bytes seen so far per SDK contract
290+
if bytes_read > bytes_allowed:
291+
raise SizeCapExceeded("during_sdk_download")
292+
293+
# modifies the driveitem to change its download behavior
294+
driveitem.download_session(buf, chunk_downloaded=on_chunk, chunk_size=chunk_size)
295+
# Execute the configured request with retries using existing helper
296+
sleep_and_retry(driveitem.context, "download_session")
297+
return buf.getvalue()
298+
299+
275300
def _convert_driveitem_to_document_with_permissions(
276301
driveitem: DriveItem,
277302
drive_name: str,
@@ -322,19 +347,16 @@ def _convert_driveitem_to_document_with_permissions(
322347
content_bytes: bytes | None = None
323348
if download_url:
324349
try:
350+
# Use this to test the sdk size cap
351+
# raise requests.RequestException("test")
325352
content_bytes = _download_with_cap(
326353
download_url,
327354
REQUEST_TIMEOUT_SECONDS,
328355
SHAREPOINT_CONNECTOR_SIZE_THRESHOLD,
329356
)
330-
except RuntimeError as e:
331-
if "size_cap_exceeded" in str(e):
332-
logger.warning(
333-
f"Skipping '{driveitem.name}' exceeded size cap during streaming."
334-
)
335-
return None
336-
else:
337-
raise
357+
except SizeCapExceeded as e:
358+
logger.warning(f"Skipping '{driveitem.name}' exceeded size cap: {str(e)}")
359+
return None
338360
except requests.RequestException as e:
339361
status = e.response.status_code if e.response is not None else -1
340362
logger.warning(
@@ -343,13 +365,15 @@ def _convert_driveitem_to_document_with_permissions(
343365

344366
# Fallback to SDK content if needed
345367
if content_bytes is None:
346-
content = sleep_and_retry(driveitem.get_content(), "get_content")
347-
if content is None or not isinstance(
348-
getattr(content, "value", None), (bytes, bytearray)
349-
):
350-
logger.warning(f"Could not access content for '{driveitem.name}'")
351-
raise ValueError(f"Could not access content for '{driveitem.name}'")
352-
content_bytes = bytes(content.value)
368+
try:
369+
content_bytes = _download_via_sdk_with_cap(
370+
driveitem, SHAREPOINT_CONNECTOR_SIZE_THRESHOLD
371+
)
372+
except SizeCapExceeded:
373+
logger.warning(
374+
f"Skipping '{driveitem.name}' exceeded size cap during SDK streaming."
375+
)
376+
return None
353377

354378
sections: list[TextSection | ImageSection] = []
355379
file_ext = driveitem.name.split(".")[-1]
@@ -370,24 +394,27 @@ def _convert_driveitem_to_document_with_permissions(
370394
sections.append(image_section)
371395
else:
372396
# Note: we don't process Onyx metadata for connectors like Drive & Sharepoint, but could
373-
extraction_result = extract_text_and_images(
374-
file=io.BytesIO(content_bytes), file_name=driveitem.name
375-
)
376-
if extraction_result.text_content:
377-
sections.append(
378-
TextSection(link=driveitem.web_url, text=extraction_result.text_content)
379-
)
380-
381-
for idx, (img_data, img_name) in enumerate(extraction_result.embedded_images):
397+
def _store_embedded_image(img_data: bytes, img_name: str) -> None:
382398
image_section, _ = store_image_and_create_section(
383399
image_data=img_data,
384-
file_id=f"{driveitem.id}_img_{idx}",
385-
display_name=img_name or f"{driveitem.name} - image {idx}",
400+
file_id=f"{driveitem.id}_img_{len(sections)}",
401+
display_name=img_name or f"{driveitem.name} - image {len(sections)}",
386402
file_origin=FileOrigin.CONNECTOR,
387403
)
388404
image_section.link = driveitem.web_url
389405
sections.append(image_section)
390406

407+
extraction_result = extract_text_and_images(
408+
file=io.BytesIO(content_bytes),
409+
file_name=driveitem.name,
410+
image_callback=_store_embedded_image,
411+
)
412+
if extraction_result.text_content:
413+
sections.append(
414+
TextSection(link=driveitem.web_url, text=extraction_result.text_content)
415+
)
416+
# Any embedded images were stored via the callback; the returned list may be empty.
417+
391418
if include_permissions and ctx is not None:
392419
logger.info(f"Getting external access for {driveitem.name}")
393420
external_access = get_sharepoint_external_access(
@@ -729,6 +756,7 @@ def _get_drive_items_for_drive_name(
729756
for folder_part in site_descriptor.folder_path.split("/"):
730757
root_folder = root_folder.get_by_path(folder_part)
731758

759+
# TODO: consider ways to avoid materializing the entire list of files in memory
732760
query = root_folder.get_files(
733761
recursive=True,
734762
page_size=1000,
@@ -837,6 +865,7 @@ def _fetch_driveitems(
837865
root_folder = root_folder.get_by_path(folder_part)
838866

839867
# Get all items recursively
868+
# TODO: consider ways to avoid materializing the entire list of files in memory
840869
query = root_folder.get_files(
841870
recursive=True,
842871
page_size=1000,
@@ -985,6 +1014,8 @@ def _fetch_site_pages(
9851014
all_pages = pages_data.get("value", [])
9861015

9871016
# Handle pagination if there are more pages
1017+
# TODO: This accumulates all pages in memory and can be heavy on large tenants.
1018+
# We should process each page incrementally to avoid unbounded growth.
9881019
while "@odata.nextLink" in pages_data:
9891020
next_url = pages_data["@odata.nextLink"]
9901021
response = requests.get(

backend/onyx/file_processing/extract_file_text.py

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,10 @@ def pdf_to_text(file: IO[Any], pdf_pass: str | None = None) -> str:
242242

243243

244244
def read_pdf_file(
245-
file: IO[Any], pdf_pass: str | None = None, extract_images: bool = False
245+
file: IO[Any],
246+
pdf_pass: str | None = None,
247+
extract_images: bool = False,
248+
image_callback: Callable[[bytes, str], None] | None = None,
246249
) -> tuple[str, dict[str, Any], Sequence[tuple[bytes, str]]]:
247250
"""
248251
Returns the text, basic PDF metadata, and optionally extracted images.
@@ -292,7 +295,11 @@ def read_pdf_file(
292295
f"page_{page_num + 1}_image_{image_file_object.name}."
293296
f"{image.format.lower() if image.format else 'png'}"
294297
)
295-
extracted_images.append((img_bytes, image_name))
298+
if image_callback is not None:
299+
# Stream image out immediately
300+
image_callback(img_bytes, image_name)
301+
else:
302+
extracted_images.append((img_bytes, image_name))
296303

297304
return text, metadata, extracted_images
298305

@@ -304,28 +311,32 @@ def read_pdf_file(
304311
return "", metadata, []
305312

306313

307-
def extract_docx_images(docx_bytes: IO[Any]) -> list[tuple[bytes, str]]:
314+
def extract_docx_images(docx_bytes: IO[Any]) -> Iterator[tuple[bytes, str]]:
308315
"""
309316
Given the bytes of a docx file, extract all the images.
310317
Returns a list of tuples (image_bytes, image_name).
311318
"""
312-
out = []
313319
try:
314320
with zipfile.ZipFile(docx_bytes) as z:
315321
for name in z.namelist():
316322
if name.startswith("word/media/"):
317-
out.append((z.read(name), name.split("/")[-1]))
323+
yield (z.read(name), name.split("/")[-1])
318324
except Exception:
319325
logger.exception("Failed to extract all docx images")
320-
return out
321326

322327

323328
def docx_to_text_and_images(
324-
file: IO[Any], file_name: str = ""
329+
file: IO[Any],
330+
file_name: str = "",
331+
image_callback: Callable[[bytes, str], None] | None = None,
325332
) -> tuple[str, Sequence[tuple[bytes, str]]]:
326333
"""
327334
Extract text from a docx.
328335
Return (text_content, list_of_images).
336+
337+
The caller can choose to provide a callback to handle images with the intent
338+
of avoiding materializing the list of images in memory.
339+
The images list returned is empty in this case.
329340
"""
330341
md = MarkItDown(enable_plugins=False)
331342
try:
@@ -349,7 +360,15 @@ def docx_to_text_and_images(
349360
return text_content_raw or "", []
350361

351362
file.seek(0)
352-
return doc.markdown, extract_docx_images(to_bytesio(file))
363+
if image_callback is None:
364+
return doc.markdown, list(extract_docx_images(to_bytesio(file)))
365+
# If a callback is provided, iterate and stream images without accumulating
366+
try:
367+
for img_file_bytes, img_file_name in extract_docx_images(to_bytesio(file)):
368+
image_callback(img_file_bytes, img_file_name)
369+
except Exception:
370+
logger.exception("Failed to stream docx images")
371+
return doc.markdown, []
353372

354373

355374
def pptx_to_text(file: IO[Any], file_name: str = "") -> str:
@@ -506,6 +525,7 @@ def extract_text_and_images(
506525
file_name: str,
507526
pdf_pass: str | None = None,
508527
content_type: str | None = None,
528+
image_callback: Callable[[bytes, str], None] | None = None,
509529
) -> ExtractionResult:
510530
"""
511531
Primary new function for the updated connector.
@@ -539,7 +559,9 @@ def extract_text_and_images(
539559

540560
# docx example for embedded images
541561
if extension == ".docx":
542-
text_content, images = docx_to_text_and_images(file, file_name)
562+
text_content, images = docx_to_text_and_images(
563+
file, file_name, image_callback=image_callback
564+
)
543565
return ExtractionResult(
544566
text_content=text_content, embedded_images=images, metadata={}
545567
)
@@ -551,6 +573,7 @@ def extract_text_and_images(
551573
file,
552574
pdf_pass,
553575
extract_images=get_image_extraction_and_analysis_enabled(),
576+
image_callback=image_callback,
554577
)
555578
return ExtractionResult(
556579
text_content=text_content, embedded_images=images, metadata=pdf_metadata

0 commit comments

Comments
 (0)