Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions src/datachain/query/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ def run_udf_parallel( # noqa: C901, PLR0912
elif status == FINISHED_STATUS:
n_workers -= 1 # Worker finished
else: # Failed / error
# On first failure, stop scheduling more work and raise.
# Reduce worker count for the failing one;
# cleanup in finally will terminate others.
n_workers -= 1
if exc := result.get("exception"):
raise exc
Expand All @@ -325,26 +328,26 @@ def run_udf_parallel( # noqa: C901, PLR0912
normal_completion = True
finally:
if not normal_completion:
# Stop all workers if there is an unexpected exception
# Failure path: fail fast. Stop feeding tasks and terminate workers.
for _ in pool:
put_into_queue(self.task_queue, STOP_SIGNAL)

# This allows workers (and this process) to exit without
# consuming any remaining data in the queues.
# (If they exit due to an exception.)
self.task_queue.close()
self.task_queue.join_thread()

# Flush all items from the done queue.
# This is needed if any workers are still running.
while n_workers > 0:
result = get_from_queue(self.done_queue)
status = result["status"]
if status != OK_STATUS:
n_workers -= 1

self.done_queue.close()
self.done_queue.join_thread()
with contextlib.suppress(Exception):
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute.

Copilot uses AI. Check for mistakes.

put_into_queue(self.task_queue, STOP_SIGNAL)
Comment on lines +333 to +334
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppressing all exceptions when sending STOP_SIGNAL could hide important errors. Consider being more specific about which exceptions to suppress (e.g., only queue-related exceptions like Full or BrokenPipeError).

Copilot uses AI. Check for mistakes.


# Close task queue to unblock any pending puts/gets.
with contextlib.suppress(Exception):
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute.

Copilot uses AI. Check for mistakes.

self.task_queue.close()
self.task_queue.join_thread()

# Terminate any remaining workers immediately to avoid deadlocks
for p in pool:
with contextlib.suppress(Exception):
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute.

Copilot uses AI. Check for mistakes.

if p.is_alive():
p.terminate()

# Do not block draining done_queue indefinitely; just close it.
with contextlib.suppress(Exception):
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute.

Copilot uses AI. Check for mistakes.

self.done_queue.close()
self.done_queue.join_thread()

# Wait for workers to stop
for p in pool:
Expand Down
5 changes: 5 additions & 0 deletions src/datachain/query/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def put_into_queue(queue: Queue, item: Any) -> None:
queue.put_nowait(item)
return
except Full:
# If queue is full, avoid blocking on progress notifications
# that can be lossy without affecting correctness.
status = item.get("status") if isinstance(item, dict) else None
if status in (OK_STATUS, NOTIFY_STATUS):
Comment on lines +57 to +58
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constants OK_STATUS and NOTIFY_STATUS are referenced but not imported or defined in this file. This will cause a NameError at runtime.

Copilot uses AI. Check for mistakes.

return
sleep(0.01)


Expand Down