-
Notifications
You must be signed in to change notification settings - Fork 130
Fix parallel workers stuck #1379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer's GuideThis PR implements a fail-fast strategy in run_udf_parallel by stopping new task scheduling on the first worker error, sending STOP signals with suppressed errors, forcibly terminating lingering processes, and simplifying queue teardown; it also updates put_into_queue to drop non-critical OK/NOTIFY messages when the queue is full to avoid blocking. File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR attempts to fix an issue with parallel workers getting stuck by implementing a fail-fast approach and avoiding blocking on non-critical queue operations.
- Adds early exit for progress notifications when queue is full to prevent blocking
- Implements immediate worker termination on failures instead of graceful shutdown
- Replaces blocking queue draining with non-blocking cleanup using exception suppression
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
File | Description |
---|---|
src/datachain/query/queue.py | Adds logic to drop progress notifications when queue is full |
src/datachain/query/dispatch.py | Replaces graceful worker shutdown with immediate termination and non-blocking cleanup |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
status = item.get("status") if isinstance(item, dict) else None | ||
if status in (OK_STATUS, NOTIFY_STATUS): |
Copilot
AI
Oct 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constants OK_STATUS and NOTIFY_STATUS are referenced but not imported or defined in this file. This will cause a NameError at runtime.
Copilot uses AI. Check for mistakes.
|
||
self.done_queue.close() | ||
self.done_queue.join_thread() | ||
with contextlib.suppress(Exception): |
Copilot
AI
Oct 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute.
Copilot uses AI. Check for mistakes.
put_into_queue(self.task_queue, STOP_SIGNAL) | ||
|
||
# Close task queue to unblock any pending puts/gets. | ||
with contextlib.suppress(Exception): |
Copilot
AI
Oct 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute.
Copilot uses AI. Check for mistakes.
|
||
# Terminate any remaining workers immediately to avoid deadlocks | ||
for p in pool: | ||
with contextlib.suppress(Exception): |
Copilot
AI
Oct 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute.
Copilot uses AI. Check for mistakes.
p.terminate() | ||
|
||
# Do not block draining done_queue indefinitely; just close it. | ||
with contextlib.suppress(Exception): |
Copilot
AI
Oct 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute.
Copilot uses AI. Check for mistakes.
with contextlib.suppress(Exception): | ||
put_into_queue(self.task_queue, STOP_SIGNAL) |
Copilot
AI
Oct 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suppressing all exceptions when sending STOP_SIGNAL could hide important errors. Consider being more specific about which exceptions to suppress (e.g., only queue-related exceptions like Full or BrokenPipeError).
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deploying datachain-documentation with
|
Latest commit: |
4a53833
|
Status: | ✅ Deploy successful! |
Preview URL: | https://1d91a4fe.datachain-documentation.pages.dev |
Branch Preview URL: | https://fix-stuck-parallel-workers.datachain-documentation.pages.dev |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1379 +/- ##
==========================================
- Coverage 87.78% 87.75% -0.03%
==========================================
Files 160 160
Lines 14994 14999 +5
Branches 2149 2150 +1
==========================================
+ Hits 13162 13163 +1
- Misses 1345 1348 +3
- Partials 487 488 +1
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Fix in #1384 |
Trying to fix issue with parallel workers stuck using AI.
Draft, experimenting, update and reasoning required.
Summary by Sourcery
Improve fault handling and cleanup in parallel UDF execution to prevent workers from hanging and avoid queue blocking
Bug Fixes:
Enhancements: