-
Notifications
You must be signed in to change notification settings - Fork 132
Fix parallel workers stuck #1379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -310,6 +310,9 @@ def run_udf_parallel( # noqa: C901, PLR0912 | |
| elif status == FINISHED_STATUS: | ||
| n_workers -= 1 # Worker finished | ||
| else: # Failed / error | ||
| # On first failure, stop scheduling more work and raise. | ||
| # Reduce worker count for the failing one; | ||
| # cleanup in finally will terminate others. | ||
| n_workers -= 1 | ||
| if exc := result.get("exception"): | ||
| raise exc | ||
|
|
@@ -325,26 +328,26 @@ def run_udf_parallel( # noqa: C901, PLR0912 | |
| normal_completion = True | ||
| finally: | ||
| if not normal_completion: | ||
| # Stop all workers if there is an unexpected exception | ||
| # Failure path: fail fast. Stop feeding tasks and terminate workers. | ||
| for _ in pool: | ||
| put_into_queue(self.task_queue, STOP_SIGNAL) | ||
|
|
||
| # This allows workers (and this process) to exit without | ||
| # consuming any remaining data in the queues. | ||
| # (If they exit due to an exception.) | ||
| self.task_queue.close() | ||
| self.task_queue.join_thread() | ||
|
|
||
| # Flush all items from the done queue. | ||
| # This is needed if any workers are still running. | ||
| while n_workers > 0: | ||
| result = get_from_queue(self.done_queue) | ||
| status = result["status"] | ||
| if status != OK_STATUS: | ||
| n_workers -= 1 | ||
|
|
||
| self.done_queue.close() | ||
| self.done_queue.join_thread() | ||
| with contextlib.suppress(Exception): | ||
| put_into_queue(self.task_queue, STOP_SIGNAL) | ||
|
Comment on lines
+333
to
+334
|
||
|
|
||
| # Close task queue to unblock any pending puts/gets. | ||
| with contextlib.suppress(Exception): | ||
|
||
| self.task_queue.close() | ||
| self.task_queue.join_thread() | ||
|
|
||
| # Terminate any remaining workers immediately to avoid deadlocks | ||
| for p in pool: | ||
| with contextlib.suppress(Exception): | ||
|
||
| if p.is_alive(): | ||
| p.terminate() | ||
|
|
||
| # Do not block draining done_queue indefinitely; just close it. | ||
| with contextlib.suppress(Exception): | ||
|
||
| self.done_queue.close() | ||
| self.done_queue.join_thread() | ||
|
|
||
| # Wait for workers to stop | ||
| for p in pool: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,6 +52,11 @@ def put_into_queue(queue: Queue, item: Any) -> None: | |
| queue.put_nowait(item) | ||
| return | ||
| except Full: | ||
| # If queue is full, avoid blocking on progress notifications | ||
| # that can be lossy without affecting correctness. | ||
| status = item.get("status") if isinstance(item, dict) else None | ||
| if status in (OK_STATUS, NOTIFY_STATUS): | ||
|
Comment on lines
+57
to
+58
|
||
| return | ||
| sleep(0.01) | ||
|
|
||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute.