-
Notifications
You must be signed in to change notification settings - Fork 130
Fix parallel workers stuck #1379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -310,6 +310,9 @@ def run_udf_parallel( # noqa: C901, PLR0912 | |
elif status == FINISHED_STATUS: | ||
n_workers -= 1 # Worker finished | ||
else: # Failed / error | ||
# On first failure, stop scheduling more work and raise. | ||
# Reduce worker count for the failing one; | ||
# cleanup in finally will terminate others. | ||
n_workers -= 1 | ||
if exc := result.get("exception"): | ||
raise exc | ||
|
@@ -325,26 +328,26 @@ def run_udf_parallel( # noqa: C901, PLR0912 | |
normal_completion = True | ||
finally: | ||
if not normal_completion: | ||
# Stop all workers if there is an unexpected exception | ||
# Failure path: fail fast. Stop feeding tasks and terminate workers. | ||
for _ in pool: | ||
put_into_queue(self.task_queue, STOP_SIGNAL) | ||
|
||
# This allows workers (and this process) to exit without | ||
# consuming any remaining data in the queues. | ||
# (If they exit due to an exception.) | ||
self.task_queue.close() | ||
self.task_queue.join_thread() | ||
|
||
# Flush all items from the done queue. | ||
# This is needed if any workers are still running. | ||
while n_workers > 0: | ||
result = get_from_queue(self.done_queue) | ||
status = result["status"] | ||
if status != OK_STATUS: | ||
n_workers -= 1 | ||
|
||
self.done_queue.close() | ||
self.done_queue.join_thread() | ||
with contextlib.suppress(Exception): | ||
put_into_queue(self.task_queue, STOP_SIGNAL) | ||
Comment on lines
+333
to
+334
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suppressing all exceptions when sending STOP_SIGNAL could hide important errors. Consider being more specific about which exceptions to suppress (e.g., only queue-related exceptions like Full or BrokenPipeError). Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
|
||
# Close task queue to unblock any pending puts/gets. | ||
with contextlib.suppress(Exception): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
self.task_queue.close() | ||
self.task_queue.join_thread() | ||
|
||
# Terminate any remaining workers immediately to avoid deadlocks | ||
for p in pool: | ||
with contextlib.suppress(Exception): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
if p.is_alive(): | ||
p.terminate() | ||
|
||
# Do not block draining done_queue indefinitely; just close it. | ||
with contextlib.suppress(Exception): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
self.done_queue.close() | ||
self.done_queue.join_thread() | ||
|
||
# Wait for workers to stop | ||
for p in pool: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,6 +52,11 @@ def put_into_queue(queue: Queue, item: Any) -> None: | |
queue.put_nowait(item) | ||
return | ||
except Full: | ||
# If queue is full, avoid blocking on progress notifications | ||
# that can be lossy without affecting correctness. | ||
status = item.get("status") if isinstance(item, dict) else None | ||
if status in (OK_STATUS, NOTIFY_STATUS): | ||
Comment on lines
+57
to
+58
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The constants OK_STATUS and NOTIFY_STATUS are referenced but not imported or defined in this file. This will cause a NameError at runtime. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
return | ||
sleep(0.01) | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute.
Copilot uses AI. Check for mistakes.