Skip to content

Conversation

dreadatour
Copy link
Contributor

@dreadatour dreadatour commented Oct 4, 2025

Trying to fix issue with parallel workers stuck using AI.

Draft, experimenting, update and reasoning required.

Summary by Sourcery

Improve fault handling and cleanup in parallel UDF execution to prevent workers from hanging and avoid queue blocking

Bug Fixes:

  • Fail fast on first worker exception by stopping task scheduling and terminating remaining workers to prevent deadlocks

Enhancements:

  • Use contextlib.suppress to safely close queues and terminate processes during cleanup without blocking
  • Drop non-essential status updates when the task queue is full to avoid producer blocking in put_into_queue

@dreadatour dreadatour self-assigned this Oct 4, 2025
@dreadatour dreadatour marked this pull request as draft October 4, 2025 04:42
Copy link
Contributor

sourcery-ai bot commented Oct 4, 2025

Reviewer's Guide

This PR implements a fail-fast strategy in run_udf_parallel by stopping new task scheduling on the first worker error, sending STOP signals with suppressed errors, forcibly terminating lingering processes, and simplifying queue teardown; it also updates put_into_queue to drop non-critical OK/NOTIFY messages when the queue is full to avoid blocking.

File-Level Changes

Change Details Files
Fail-fast termination and robust cleanup in run_udf_parallel
  • Stop scheduling new tasks and decrement worker count immediately on first failure
  • Use contextlib.suppress when sending STOP_SIGNAL to avoid blocking or errors
  • Close and join the task_queue within suppress to unblock pending operations
  • Forcibly terminate any alive worker processes to prevent deadlocks
  • Close and join the done_queue with suppress instead of manual draining
src/datachain/query/dispatch.py
Non-blocking drop of non-critical queue messages when full
  • Inspect item status and return immediately for OK_STATUS or NOTIFY_STATUS when the queue is full
  • Fallback to original sleep-and-retry for other items
src/datachain/query/queue.py

Possibly linked issues

  • #File error in parallel mode: PR prevents parallel workers from getting stuck by improving error handling and ensuring proper worker termination.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR attempts to fix an issue with parallel workers getting stuck by implementing a fail-fast approach and avoiding blocking on non-critical queue operations.

  • Adds early exit for progress notifications when queue is full to prevent blocking
  • Implements immediate worker termination on failures instead of graceful shutdown
  • Replaces blocking queue draining with non-blocking cleanup using exception suppression

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.

File Description
src/datachain/query/queue.py Adds logic to drop progress notifications when queue is full
src/datachain/query/dispatch.py Replaces graceful worker shutdown with immediate termination and non-blocking cleanup

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +57 to +58
status = item.get("status") if isinstance(item, dict) else None
if status in (OK_STATUS, NOTIFY_STATUS):
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constants OK_STATUS and NOTIFY_STATUS are referenced but not imported or defined in this file. This will cause a NameError at runtime.

Copilot uses AI. Check for mistakes.


self.done_queue.close()
self.done_queue.join_thread()
with contextlib.suppress(Exception):
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute.

Copilot uses AI. Check for mistakes.

put_into_queue(self.task_queue, STOP_SIGNAL)

# Close task queue to unblock any pending puts/gets.
with contextlib.suppress(Exception):
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute.

Copilot uses AI. Check for mistakes.


# Terminate any remaining workers immediately to avoid deadlocks
for p in pool:
with contextlib.suppress(Exception):
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute.

Copilot uses AI. Check for mistakes.

p.terminate()

# Do not block draining done_queue indefinitely; just close it.
with contextlib.suppress(Exception):
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module 'contextlib' is used but not imported. This will cause a NameError when these lines execute.

Copilot uses AI. Check for mistakes.

Comment on lines +333 to +334
with contextlib.suppress(Exception):
put_into_queue(self.task_queue, STOP_SIGNAL)
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppressing all exceptions when sending STOP_SIGNAL could hide important errors. Consider being more specific about which exceptions to suppress (e.g., only queue-related exceptions like Full or BrokenPipeError).

Copilot uses AI. Check for mistakes.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and they look great!


Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Copy link

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 4a53833
Status: ✅  Deploy successful!
Preview URL: https://1d91a4fe.datachain-documentation.pages.dev
Branch Preview URL: https://fix-stuck-parallel-workers.datachain-documentation.pages.dev

View logs

Copy link

codecov bot commented Oct 4, 2025

Codecov Report

❌ Patch coverage is 6.66667% with 14 lines in your changes missing coverage. Please review.
✅ Project coverage is 87.75%. Comparing base (67bc6e1) to head (4a53833).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
src/datachain/query/dispatch.py 0.00% 12 Missing ⚠️
src/datachain/query/queue.py 33.33% 1 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1379      +/-   ##
==========================================
- Coverage   87.78%   87.75%   -0.03%     
==========================================
  Files         160      160              
  Lines       14994    14999       +5     
  Branches     2149     2150       +1     
==========================================
+ Hits        13162    13163       +1     
- Misses       1345     1348       +3     
- Partials      487      488       +1     
Flag Coverage Δ
datachain 87.68% <6.66%> (-0.03%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
src/datachain/query/queue.py 55.55% <33.33%> (-1.12%) ⬇️
src/datachain/query/dispatch.py 22.13% <0.00%> (-0.19%) ⬇️
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@dreadatour dreadatour closed this Oct 6, 2025
@dreadatour
Copy link
Contributor Author

Fix in #1384

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant