-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Speed up prepdocs for file strategy with parallel async pools #2553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Speed up prepdocs for file strategy with parallel async pools #2553
Conversation
At c=10 it hit the rate limit for the embeddings API. Most of the time is spent on the embeddings API. |
FYI, I am working on a change that will also add in calls to the Vision API and Chat Completions API during file processing, for developers that want support for multimodal documents. Those calls are currently made after file processing for the current version of multimodal ingestion, but I'm moving them to the |
Also FYI @mattgotteiner is porting prepdocs to an Azure Function, to be used as a skillset by AI Search. That will use a specific FunctionFileStrategy though, similar to UploadFileStrategy (per-file basis) - so may be orthogonal to this change. |
Please merge |
This change introduces the drawback that the log from prepdocs becomes harder to follow, since things are done in different orders. I'm okay with the drawback now that the logging is prettier. I'll push those changes to the branch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good, now that I've cleaned up the logging output to be easier to read across multiple concurrently processed files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds parallel asynchronous processing for the file ingestion strategy and harmonizes log message formats across modules.
- Unified and improved log message formatting (consistent quoting and prefixes).
- Introduced a
concurrency
parameter inFileStrategy
and CLI, leveragingasyncio.Semaphore
andasyncio.gather
for parallel file processing. - Changed some high-volume logs from INFO to DEBUG in
embeddings.py
.
Reviewed Changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
app/backend/prepdocslib/searchmanager.py | Standardized quoting in log messages |
app/backend/prepdocslib/pdfparser.py | Updated log prefix format for local parsing |
app/backend/prepdocslib/mediadescriber.py | Removed trailing ellipses in log entries |
app/backend/prepdocslib/listfilestrategy.py | Adjusted log message format for MD5 checks |
app/backend/prepdocslib/integratedvectorizerstrategy.py | Revised info log formatting |
app/backend/prepdocslib/htmlparser.py | Updated log prefix for HTML parsing |
app/backend/prepdocslib/filestrategy.py | Added concurrency support and parallel processing |
app/backend/prepdocslib/embeddings.py | Lowered log level for embedding computation |
app/backend/prepdocslib/blobmanager.py | Consistent log formatting for blob operations |
app/backend/prepdocs.py | Introduced --concurrency CLI arg and adjusted log level setting |
Comments suppressed due to low confidence (2)
app/backend/prepdocslib/filestrategy.py:45
- [nitpick] Add a note in the class docstring explaining the purpose of
concurrency
and acceptable value ranges to improve discoverability.
DEFAULT_CONCURRENCY = 4
app/backend/prepdocslib/filestrategy.py:104
- Introduce tests that simulate multiple files and verify the parallel processing behavior under different
concurrency
settings to ensure reliability.
async def run(self):
logger.info("Running with concurrency: %d", self.concurrency) | ||
semaphore = asyncio.Semaphore(self.concurrency) | ||
tasks = [process_file_worker(semaphore, file) async for file in files] | ||
await asyncio.gather(*tasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using asyncio.gather(*tasks, return_exceptions=True)
or handling exceptions within process_file_worker
so that a single task failure doesn't cancel the entire batch.
await asyncio.gather(*tasks) | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
for result in results: | |
if isinstance(result, Exception): | |
logger.error("Task failed with exception: %s", str(result), exc_info=True) |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tonybaloney Thoughts on this suggestion from Copilot? Is it correct?
Purpose
Implements #2516