-
Notifications
You must be signed in to change notification settings - Fork 129
fix(dispatcher): simplify and fix termination handling #1384
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer's GuideSimplify and fix termination handling in the UDF parallel dispatcher to prevent hanging, ensure proper worker cleanup using non-blocking queue polling and unified shutdown, and add parameterized tests verifying immediate termination across multiple failure modes. Sequence diagram for improved UDF parallel dispatcher termination handlingsequenceDiagram
participant MainProcess
participant Worker1
participant Worker2
participant TaskQueue
participant DoneQueue
MainProcess->>TaskQueue: Put tasks
MainProcess->>Worker1: Start
MainProcess->>Worker2: Start
loop Process tasks
Worker1->>TaskQueue: Get task
Worker1->>DoneQueue: Put result
Worker2->>TaskQueue: Get task
Worker2->>DoneQueue: Put result
MainProcess->>DoneQueue: Non-blocking get result
alt Worker exits unexpectedly
MainProcess->>Worker1: Detect exitcode != 0
MainProcess->>Worker1: Terminate
end
end
MainProcess->>Worker1: Terminate if alive
MainProcess->>Worker2: Terminate if alive
MainProcess->>Worker1: Join
MainProcess->>Worker2: Join
MainProcess->>TaskQueue: cancel_join_thread, close
MainProcess->>DoneQueue: cancel_join_thread, close
Class diagram for updated UDFDispatcher and related queue usageclassDiagram
class UDFDispatcher {
-task_queue: MultiprocessQueue | None
-done_queue: MultiprocessQueue | None
-ctx: multiprocess.get_context
+__init__(udf_info, buffer_size)
+run_udf_parallel(...)
}
class UDFWorker {
+__init__(catalog, udf, task_queue, done_queue, ...)
}
class DownloadCallback {
+__init__(queue: MultiprocessQueue)
}
class ProcessedCallback {
+__init__(name, queue: MultiprocessQueue)
}
UDFDispatcher "1" o-- "*" UDFWorker
UDFDispatcher "1" o-- "1" DownloadCallback
UDFDispatcher "1" o-- "1" ProcessedCallback
UDFDispatcher "1" --> "1" MultiprocessQueue : task_queue
UDFDispatcher "1" --> "1" MultiprocessQueue : done_queue
UDFWorker "1" --> "1" MultiprocessQueue : task_queue
UDFWorker "1" --> "1" MultiprocessQueue : done_queue
DownloadCallback "1" --> "1" MultiprocessQueue
ProcessedCallback "1" --> "1" MultiprocessQueue
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
src/datachain/query/dispatch.py
Outdated
self.done_queue.join_thread() | ||
|
||
# Wait for workers to stop | ||
for p in pool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a reason from the to of my head to complicate it much tbh ... Let's see tests ...
9f25d25
to
ece96a2
Compare
Deploying datachain-documentation with
|
Latest commit: |
66cef22
|
Status: | ✅ Deploy successful! |
Preview URL: | https://3e3b42ce.datachain-documentation.pages.dev |
Branch Preview URL: | https://fix-parallel-udf-dispatcher.datachain-documentation.pages.dev |
0a5c5eb
to
6b4aa33
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1384 +/- ##
==========================================
- Coverage 87.80% 87.69% -0.12%
==========================================
Files 159 159
Lines 14983 15013 +30
Branches 2158 2161 +3
==========================================
+ Hits 13156 13165 +9
- Misses 1330 1351 +21
Partials 497 497
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
2a51fcf
to
d14914b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Consider replacing the busy‐polling loop around
get_nowait
andsleep(0.01)
with a blockingget
call and a timeout—this will simplify the logic and reduce CPU spin. - The inline queue cleanup and state resets at the end of
run_udf_parallel
could be extracted into a dedicated helper to improve readability and ensure consistent teardown. - The 1-second barrier wait in the new parallel worker failure test may be too tight on slower CI agents—consider increasing or parameterizing that timeout to prevent intermittent flakes.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider replacing the busy‐polling loop around `get_nowait` and `sleep(0.01)` with a blocking `get` call and a timeout—this will simplify the logic and reduce CPU spin.
- The inline queue cleanup and state resets at the end of `run_udf_parallel` could be extracted into a dedicated helper to improve readability and ensure consistent teardown.
- The 1-second barrier wait in the new parallel worker failure test may be too tight on slower CI agents—consider increasing or parameterizing that timeout to prevent intermittent flakes.
## Individual Comments
### Comment 1
<location> `src/datachain/query/dispatch.py:142-143` </location>
<code_context>
udf: UDFAdapter = loads(self.udf_data)
# Ensure all registered DataModels have rebuilt schemas in worker processes.
ModelStore.rebuild_all()
+ assert self.task_queue is not None
+ assert self.done_queue is not None
return UDFWorker(
self.catalog,
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Using assert for runtime checks may be bypassed in optimized mode.
Since assertions can be disabled with the -O flag, replace them with explicit exception raising (e.g., ValueError) to ensure these checks are always enforced.
```suggestion
if self.task_queue is None:
raise ValueError("self.task_queue must not be None")
if self.done_queue is None:
raise ValueError("self.done_queue must not be None")
```
</issue_to_address>
### Comment 2
<location> `tests/func/test_udf.py:670-574` </location>
<code_context>
+ f"Expected exit code {expected_exit_code}, got: {error_message}"
+ )
+
+ if error_marker:
+ assert error_marker in captured.err, (
+ f"Expected '{error_marker}' in stderr for {failure_mode} mode. "
+ f"stderr output: {captured.err[:500]}"
+ )
+
+ assert "semaphore" not in captured.err
</code_context>
<issue_to_address>
**suggestion (testing):** Test does not cover the case where all workers complete successfully.
Please add a test case for successful completion to verify that normal execution does not trigger error handling or cleanup logic.
Suggested implementation:
```python
assert "semaphore" not in captured.err
def test_udf_all_workers_success(monkeypatch, capfd):
"""
Test that verifies all workers complete successfully and no error handling or cleanup logic is triggered.
"""
# Simulate successful worker completion
class DummyUDF:
def run(self):
return "success"
# Patch the UDF runner to use DummyUDF
monkeypatch.setattr("udf_module.UDF", DummyUDF)
# Run the UDF
try:
udf_instance = DummyUDF()
result = udf_instance.run()
except Exception as exc:
pytest.fail(f"Unexpected exception raised: {exc}")
captured = capfd.readouterr()
# Assert that the result is as expected
assert result == "success"
# Assert that no error messages are present in stderr
assert "UDF Execution Failed!" not in captured.err
assert "semaphore" not in captured.err
assert captured.err.strip() == ""
@pytest.mark.parametrize(
"cloud_type,version_aware",
[("s3", True)],
```
- Replace `"udf_module.UDF"` in the monkeypatch line with the actual import path of your UDF class if it differs.
- If your test setup requires more context (e.g., fixtures, configuration), adapt the test accordingly.
- Ensure the DummyUDF class matches the interface expected by your UDF runner.
</issue_to_address>
### Comment 3
<location> `tests/func/test_udf.py:597-598` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 4
<location> `tests/func/test_udf.py:614-623` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 5
<location> `tests/func/test_udf.py:621-622` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 6
<location> `tests/func/test_udf.py:625-633` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 7
<location> `tests/func/test_udf.py:626-627` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 8
<location> `tests/func/test_udf.py:628-629` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 9
<location> `tests/func/test_udf.py:630-631` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 10
<location> `tests/func/test_udf.py:632-633` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 11
<location> `tests/func/test_udf.py:670-674` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
d14914b
to
a2bdf72
Compare
a2bdf72
to
66cef22
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes hanging issues in UDF parallel mode by simplifying termination handling and removing complex completion tracking. The main fix switches from blocking to non-blocking queue operations with active monitoring of worker exit codes to detect failures immediately.
- Replaces blocking queue reads with non-blocking reads and exit code monitoring to prevent deadlocks
- Removes the complex
normal_completion
flag and simplifies cleanup by always terminating alive workers - Adds comprehensive test coverage for different worker failure scenarios
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
File | Description |
---|---|
src/datachain/query/dispatch.py | Core fix implementing non-blocking queue reads, exit code monitoring, and simplified worker termination handling |
src/datachain/query/queue.py | Updates queue imports to use multiprocess.queues.Queue instead of standard library queue |
tests/func/test_udf.py | Adds comprehensive test for parallel worker failure scenarios with different exit modes |
Comments suppressed due to low confidence (1)
src/datachain/query/dispatch.py:1
- The f-string formatting is missing for the message variables. The variables
p.name
andexitcode
are not being interpolated into the string.
import contextlib
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
task_queue: MultiprocessQueue | None = None | ||
done_queue: MultiprocessQueue | None = None |
Copilot
AI
Oct 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding assertions in the run_udf_parallel
method to verify these queues are initialized before creating workers, as suggested in the PR description.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me! 👍🔥
Fixes hanging of the UDF in parallel mode. Simple script to reproduce is below. A more advanced script to test this is here:
https://gist.github.com/shcheklein/c988ab49da95628fc7f4b871d4e033b6
TODO:
Basic script to reproduce
A bit more advanced script to test it in different
Summary by Sourcery
Simplify and fix termination handling in UDF parallel dispatcher to prevent hanging and ensure workers are cleaned up correctly.
Bug Fixes:
Enhancements:
Summary by Sourcery
Simplify and robustly handle parallel UDF termination by switching to non-blocking queue reads with exit code monitoring, ensuring timely worker shutdown and avoiding deadlocks.
Bug Fixes:
Enhancements:
Tests: