Skip to content

Conversation

shcheklein
Copy link
Member

@shcheklein shcheklein commented Oct 6, 2025

Fixes hanging of the UDF in parallel mode. Simple script to reproduce is below. A more advanced script to test this is here:

https://gist.github.com/shcheklein/c988ab49da95628fc7f4b871d4e033b6

TODO:

  • Review and add tests

Basic script to reproduce

# ruff: noqa: INP001

"""Quick local check of running a UDF with parallel execution.

Run this script directly to see the UDF results printed to stdout.
"""
import multiprocess as mp

import datachain as dc

_SEEN_WORKERS: set[str] = set()


def compute_metrics(word: str) -> tuple[str, int]:
    """Return the upper-cased word and its length."""
    proc = mp.current_process()  # type: ignore[attr-defined]
    if proc.name not in _SEEN_WORKERS:
        print(f"running on {proc.name} (pid={proc.pid})")
        _SEEN_WORKERS.add(proc.name)

    if proc.name == "Worker-UDF-1":
        return None.upper(), len(word)
    return word.upper(), len(word)


def main() -> None:
    base_words = ["alpha", "beta", "gamma", "delta", "epsilon"]
    words = base_words * 20_000  # 5 * 20_000 = 100_000 entries

    chain = (
        dc.read_values(word=words)
        .settings(parallel=2)
        .map(
            compute_metrics,
            params=["word"],
            output={"upper": str, "length": int},
        )
        .select("word", "upper", "length")
    )

    total = 0
    for total, (original, upper, length) in enumerate(chain, start=1):
        if total <= 10:
            print(f"{original!r} -> {upper!r}, length={length}")

    print(f"processed {total} rows")


if __name__ == "__main__":
    main()

A bit more advanced script to test it in different

Summary by Sourcery

Simplify and fix termination handling in UDF parallel dispatcher to prevent hanging and ensure workers are cleaned up correctly.

Bug Fixes:

  • Prevent UDF hang in parallel mode by switching to non-blocking queue reads and continuously monitoring worker exit codes.
  • Ensure worker processes are properly terminated and joined on completion or error to avoid deadlocks.

Enhancements:

  • Remove complex normal_completion flag and simplify cleanup by always terminating alive workers and using cancel_join_thread on queues.
  • Rename queue types to MultiprocessQueue for clarity and switch to multiprocess.get_context for spawning processes.

Summary by Sourcery

Simplify and robustly handle parallel UDF termination by switching to non-blocking queue reads with exit code monitoring, ensuring timely worker shutdown and avoiding deadlocks.

Bug Fixes:

  • Prevent UDF hanging in parallel mode by using non-blocking queue reads and actively monitoring worker exit codes.
  • Ensure worker processes are properly terminated and joined on errors or completion to avoid deadlocks.

Enhancements:

  • Remove complex normal_completion flag and streamline cleanup by always terminating alive workers and cancelling join threads on queues.
  • Rename queue type hints to MultiprocessQueue and use multiprocess.get_context for process spawning.
  • Add assertions to verify task and done queues are initialized before worker creation.

Tests:

  • Add functional test to verify that all peer UDF workers exit immediately when one worker fails under various failure modes with correct exit codes and messages.

Copy link
Contributor

sourcery-ai bot commented Oct 6, 2025

Reviewer's Guide

Simplify and fix termination handling in the UDF parallel dispatcher to prevent hanging, ensure proper worker cleanup using non-blocking queue polling and unified shutdown, and add parameterized tests verifying immediate termination across multiple failure modes.

Sequence diagram for improved UDF parallel dispatcher termination handling

sequenceDiagram
  participant MainProcess
  participant Worker1
  participant Worker2
  participant TaskQueue
  participant DoneQueue

  MainProcess->>TaskQueue: Put tasks
  MainProcess->>Worker1: Start
  MainProcess->>Worker2: Start
  loop Process tasks
    Worker1->>TaskQueue: Get task
    Worker1->>DoneQueue: Put result
    Worker2->>TaskQueue: Get task
    Worker2->>DoneQueue: Put result
    MainProcess->>DoneQueue: Non-blocking get result
    alt Worker exits unexpectedly
      MainProcess->>Worker1: Detect exitcode != 0
      MainProcess->>Worker1: Terminate
    end
  end
  MainProcess->>Worker1: Terminate if alive
  MainProcess->>Worker2: Terminate if alive
  MainProcess->>Worker1: Join
  MainProcess->>Worker2: Join
  MainProcess->>TaskQueue: cancel_join_thread, close
  MainProcess->>DoneQueue: cancel_join_thread, close
Loading

Class diagram for updated UDFDispatcher and related queue usage

classDiagram
  class UDFDispatcher {
    -task_queue: MultiprocessQueue | None
    -done_queue: MultiprocessQueue | None
    -ctx: multiprocess.get_context
    +__init__(udf_info, buffer_size)
    +run_udf_parallel(...)
  }
  class UDFWorker {
    +__init__(catalog, udf, task_queue, done_queue, ...)
  }
  class DownloadCallback {
    +__init__(queue: MultiprocessQueue)
  }
  class ProcessedCallback {
    +__init__(name, queue: MultiprocessQueue)
  }
  UDFDispatcher "1" o-- "*" UDFWorker
  UDFDispatcher "1" o-- "1" DownloadCallback
  UDFDispatcher "1" o-- "1" ProcessedCallback
  UDFDispatcher "1" --> "1" MultiprocessQueue : task_queue
  UDFDispatcher "1" --> "1" MultiprocessQueue : done_queue
  UDFWorker "1" --> "1" MultiprocessQueue : task_queue
  UDFWorker "1" --> "1" MultiprocessQueue : done_queue
  DownloadCallback "1" --> "1" MultiprocessQueue
  ProcessedCallback "1" --> "1" MultiprocessQueue
Loading

File-Level Changes

Change Details Files
Refactor UDFDispatcher for simplified termination and queue handling
  • Replaced blocking queue.get calls with non-blocking get_nowait loop handling Empty and checking worker exit codes
  • Removed normal_completion flag and consolidated cleanup to terminate alive workers, join them, then cancel and close queues
  • Switched to multiprocess.get_context("spawn") and introduced MultiprocessQueue type annotations for task_queue/done_queue
  • Added asserts in worker creation to guarantee queues are initialized before spawning workers
src/datachain/query/dispatch.py
Add parameterized tests for parallel worker failure handling
  • Added a pytest parameterized test covering exception, KeyboardInterrupt, sys.exit, and os._exit in worker threads
  • Used filesystem barrier to ensure all workers start before failure triggers
  • Asserted immediate shutdown of all peers, correct exit codes, meaningful error messages, and no semaphore leaks
tests/func/test_udf.py

Possibly linked issues

  • #issue_id_placeholder: The PR prevents jobs from hanging in parallel mode by improving worker termination and error handling, which directly addresses the issue's symptom of jobs getting stuck when a file error occurs.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

self.done_queue.join_thread()

# Wait for workers to stop
for p in pool:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason from the to of my head to complicate it much tbh ... Let's see tests ...

@shcheklein shcheklein force-pushed the fix-parallel-udf-dispatcher branch from 9f25d25 to ece96a2 Compare October 6, 2025 02:55
Copy link

cloudflare-workers-and-pages bot commented Oct 6, 2025

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 66cef22
Status: ✅  Deploy successful!
Preview URL: https://3e3b42ce.datachain-documentation.pages.dev
Branch Preview URL: https://fix-parallel-udf-dispatcher.datachain-documentation.pages.dev

View logs

@shcheklein shcheklein force-pushed the fix-parallel-udf-dispatcher branch 2 times, most recently from 0a5c5eb to 6b4aa33 Compare October 6, 2025 02:59
Copy link

codecov bot commented Oct 6, 2025

Codecov Report

❌ Patch coverage is 25.00000% with 42 lines in your changes missing coverage. Please review.
✅ Project coverage is 87.69%. Comparing base (6c17b68) to head (66cef22).
⚠️ Report is 7 commits behind head on main.

Files with missing lines Patch % Lines
src/datachain/query/dispatch.py 22.22% 42 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1384      +/-   ##
==========================================
- Coverage   87.80%   87.69%   -0.12%     
==========================================
  Files         159      159              
  Lines       14983    15013      +30     
  Branches     2158     2161       +3     
==========================================
+ Hits        13156    13165       +9     
- Misses       1330     1351      +21     
  Partials      497      497              
Flag Coverage Δ
datachain 87.64% <25.00%> (-0.12%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
src/datachain/query/queue.py 57.37% <100.00%> (+0.71%) ⬆️
src/datachain/query/dispatch.py 22.55% <22.22%> (+0.61%) ⬆️
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@shcheklein shcheklein force-pushed the fix-parallel-udf-dispatcher branch 9 times, most recently from 2a51fcf to d14914b Compare October 12, 2025 05:47
@shcheklein shcheklein marked this pull request as ready for review October 13, 2025 00:10
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • Consider replacing the busy‐polling loop around get_nowait and sleep(0.01) with a blocking get call and a timeout—this will simplify the logic and reduce CPU spin.
  • The inline queue cleanup and state resets at the end of run_udf_parallel could be extracted into a dedicated helper to improve readability and ensure consistent teardown.
  • The 1-second barrier wait in the new parallel worker failure test may be too tight on slower CI agents—consider increasing or parameterizing that timeout to prevent intermittent flakes.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Consider replacing the busy‐polling loop around `get_nowait` and `sleep(0.01)` with a blocking `get` call and a timeout—this will simplify the logic and reduce CPU spin.
- The inline queue cleanup and state resets at the end of `run_udf_parallel` could be extracted into a dedicated helper to improve readability and ensure consistent teardown.
- The 1-second barrier wait in the new parallel worker failure test may be too tight on slower CI agents—consider increasing or parameterizing that timeout to prevent intermittent flakes.

## Individual Comments

### Comment 1
<location> `src/datachain/query/dispatch.py:142-143` </location>
<code_context>
         udf: UDFAdapter = loads(self.udf_data)
         # Ensure all registered DataModels have rebuilt schemas in worker processes.
         ModelStore.rebuild_all()
+        assert self.task_queue is not None
+        assert self.done_queue is not None
         return UDFWorker(
             self.catalog,
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Using assert for runtime checks may be bypassed in optimized mode.

Since assertions can be disabled with the -O flag, replace them with explicit exception raising (e.g., ValueError) to ensure these checks are always enforced.

```suggestion
        if self.task_queue is None:
            raise ValueError("self.task_queue must not be None")
        if self.done_queue is None:
            raise ValueError("self.done_queue must not be None")
```
</issue_to_address>

### Comment 2
<location> `tests/func/test_udf.py:670-574` </location>
<code_context>
+        f"Expected exit code {expected_exit_code}, got: {error_message}"
+    )
+
+    if error_marker:
+        assert error_marker in captured.err, (
+            f"Expected '{error_marker}' in stderr for {failure_mode} mode. "
+            f"stderr output: {captured.err[:500]}"
+        )
+
+    assert "semaphore" not in captured.err
</code_context>

<issue_to_address>
**suggestion (testing):** Test does not cover the case where all workers complete successfully.

Please add a test case for successful completion to verify that normal execution does not trigger error handling or cleanup logic.

Suggested implementation:

```python
    assert "semaphore" not in captured.err


def test_udf_all_workers_success(monkeypatch, capfd):
    """
    Test that verifies all workers complete successfully and no error handling or cleanup logic is triggered.
    """
    # Simulate successful worker completion
    class DummyUDF:
        def run(self):
            return "success"

    # Patch the UDF runner to use DummyUDF
    monkeypatch.setattr("udf_module.UDF", DummyUDF)

    # Run the UDF
    try:
        udf_instance = DummyUDF()
        result = udf_instance.run()
    except Exception as exc:
        pytest.fail(f"Unexpected exception raised: {exc}")

    captured = capfd.readouterr()

    # Assert that the result is as expected
    assert result == "success"

    # Assert that no error messages are present in stderr
    assert "UDF Execution Failed!" not in captured.err
    assert "semaphore" not in captured.err
    assert captured.err.strip() == ""



@pytest.mark.parametrize(
    "cloud_type,version_aware",
    [("s3", True)],

```

- Replace `"udf_module.UDF"` in the monkeypatch line with the actual import path of your UDF class if it differs.
- If your test setup requires more context (e.g., fixtures, configuration), adapt the test accordingly.
- Ensure the DummyUDF class matches the interface expected by your UDF runner.
</issue_to_address>

### Comment 3
<location> `tests/func/test_udf.py:597-598` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 4
<location> `tests/func/test_udf.py:614-623` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 5
<location> `tests/func/test_udf.py:621-622` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 6
<location> `tests/func/test_udf.py:625-633` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 7
<location> `tests/func/test_udf.py:626-627` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 8
<location> `tests/func/test_udf.py:628-629` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 9
<location> `tests/func/test_udf.py:630-631` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 10
<location> `tests/func/test_udf.py:632-633` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 11
<location> `tests/func/test_udf.py:670-674` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@shcheklein shcheklein force-pushed the fix-parallel-udf-dispatcher branch from d14914b to a2bdf72 Compare October 13, 2025 01:20
@shcheklein shcheklein force-pushed the fix-parallel-udf-dispatcher branch from a2bdf72 to 66cef22 Compare October 13, 2025 01:26
@shcheklein shcheklein requested review from a team, Copilot and dreadatour October 13, 2025 01:40
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes hanging issues in UDF parallel mode by simplifying termination handling and removing complex completion tracking. The main fix switches from blocking to non-blocking queue operations with active monitoring of worker exit codes to detect failures immediately.

  • Replaces blocking queue reads with non-blocking reads and exit code monitoring to prevent deadlocks
  • Removes the complex normal_completion flag and simplifies cleanup by always terminating alive workers
  • Adds comprehensive test coverage for different worker failure scenarios

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

File Description
src/datachain/query/dispatch.py Core fix implementing non-blocking queue reads, exit code monitoring, and simplified worker termination handling
src/datachain/query/queue.py Updates queue imports to use multiprocess.queues.Queue instead of standard library queue
tests/func/test_udf.py Adds comprehensive test for parallel worker failure scenarios with different exit modes
Comments suppressed due to low confidence (1)

src/datachain/query/dispatch.py:1

  • The f-string formatting is missing for the message variables. The variables p.name and exitcode are not being interpolated into the string.
import contextlib

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +107 to +108
task_queue: MultiprocessQueue | None = None
done_queue: MultiprocessQueue | None = None
Copy link

Copilot AI Oct 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding assertions in the run_udf_parallel method to verify these queues are initialized before creating workers, as suggested in the PR description.

Copilot uses AI. Check for mistakes.

Copy link
Contributor

@dreadatour dreadatour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me! 👍🔥

@shcheklein shcheklein merged commit 406a0a1 into main Oct 16, 2025
307 of 314 checks passed
@shcheklein shcheklein deleted the fix-parallel-udf-dispatcher branch October 16, 2025 03:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants