-
Notifications
You must be signed in to change notification settings - Fork 130
Implement checkpoints usage in local #1367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Reviewer's GuideThis PR introduces a local job lifecycle by implementing a central JobManager service that creates, links, and finalizes DataChain jobs for plain Python script runs, and wires it into the checkpoint and save logic so that local runs behave like SaaS-managed jobs. Sequence diagram for local script job lifecycle with JobManagersequenceDiagram
participant User as actor User
participant Script as "Python Script"
participant JobManager
participant Session
participant Metastore
participant Catalog
User->>Script: Run script (python my_script.py)
Script->>JobManager: get_or_create(session)
JobManager->>Metastore: get_last_job_by_name(script_path)
Metastore-->>JobManager: return last job (if any)
JobManager->>Metastore: create_job(name=script_path, parent_job_id=last_job.id)
Metastore-->>JobManager: return new job id
JobManager->>Metastore: get_job(new_job_id)
Metastore-->>JobManager: return new job
JobManager-->>Script: return job
Script->>Catalog: create_dataset(..., job_id=job.id)
Catalog->>Metastore: create_checkpoint(job.id, hash)
Note over JobManager: On exit, JobManager finalizes job status
Class diagram for new JobManager and Job lifecycle changesclassDiagram
class JobManager {
+Job job
+JobStatus status
+bool owned
+get_or_create(session)
+reset()
+finalize_success(session)
+finalize_failure(session, exc_type, exc_value, tb)
}
class Job {
+str id
+str name
+str query
+JobStatus status
+str parent_job_id
}
JobManager --> Job : manages
JobManager --> "Session" : uses
Job --> "Job" : parent_job_id (optional)
class Metastore {
+get_job(job_id)
+get_last_job_by_name(name)
+create_job(...)
+set_job_status(job_id, status, ...)
}
JobManager --> Metastore : interacts
class DataChain {
+save(...)
+_resolve_checkpoint(...)
}
DataChain --> JobManager : uses
DataChain --> Job : uses
Metastore --> Job : creates/queries
class Catalog {
+create_dataset(..., job_id)
}
DataChain --> Catalog : uses
Catalog --> Job : links via job_id
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Deploying datachain-documentation with
|
Latest commit: |
c6f7266
|
Status: | ✅ Deploy successful! |
Preview URL: | https://aa3112dc.datachain-documentation.pages.dev |
Branch Preview URL: | https://ilongin-1361-local-checkpoin.datachain-documentation.pages.dev |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Move atexit and sys.excepthook registration to a class‐level singleton (instead of per-instance) so hooks aren’t registered multiple times when multiple JobManager instances are created.
- Wrap the existing sys.excepthook instead of clobbering it—store the prior hook and invoke it after finalize_failure to avoid losing upstream exception handling.
- Consider making get_user_script_source more robust (e.g. handle module runs via
-m
or scripts without.py
extensions) so that source detection works in more launch scenarios.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Move atexit and sys.excepthook registration to a class‐level singleton (instead of per-instance) so hooks aren’t registered multiple times when multiple JobManager instances are created.
- Wrap the existing sys.excepthook instead of clobbering it—store the prior hook and invoke it after finalize_failure to avoid losing upstream exception handling.
- Consider making get_user_script_source more robust (e.g. handle module runs via `-m` or scripts without `.py` extensions) so that source detection works in more launch scenarios.
## Individual Comments
### Comment 1
<location> `tests/unit/test_job_manager.py:102-109` </location>
<code_context>
+ assert job2.parent_job_id == job1.id
+
+
+def test_get_or_create_fallback_query(test_session, patch_argv):
+ # Patch to return None for script source
+ with patch("datachain.job.get_user_script_source", return_value=None):
+ jm = JobManager()
+ job = jm.get_or_create(test_session)
+
+ assert job.query.startswith("python ")
+ assert job.name.endswith("script.py")
+
+
</code_context>
<issue_to_address>
**suggestion (testing):** Consider testing fallback behavior for interactive sessions.
Please add a test case for when sys.argv is empty or not a script, to verify correct fallback query and job name behavior.
```suggestion
def test_get_or_create_fallback_query(test_session, patch_argv):
# Patch to return None for script source
with patch("datachain.job.get_user_script_source", return_value=None):
jm = JobManager()
job = jm.get_or_create(test_session)
assert job.query.startswith("python ")
assert job.name.endswith("script.py")
def test_get_or_create_interactive_fallback(test_session, monkeypatch):
# Simulate interactive session: sys.argv is empty
monkeypatch.setattr("sys.argv", [])
with patch("datachain.job.get_user_script_source", return_value=None):
jm = JobManager()
job = jm.get_or_create(test_session)
# Check fallback query and job name for interactive session
assert job.query.startswith("python ")
assert job.name == "interactive"
```
</issue_to_address>
### Comment 2
<location> `tests/conftest.py:1080-1089` </location>
<code_context>
return compressed_parquet_data(dog_entries("1.0.0"), src_uri)
+
+
+@pytest.fixture(autouse=True)
+def disable_jobmanager_hooks(request):
+ # Run the test
+ yield
+
+ # After test finishes: remove only JobManager hooks, unless explicitly requested
+ if "use_jobmanager_hooks" not in request.keywords:
+ for fn in list(JobManager._hook_refs):
+ try:
+ atexit.unregister(fn)
+ except Exception as e: # noqa: BLE001
+ print(f"Failed to unregister atexit hook: {e}")
+ JobManager._hook_refs.clear()
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test that explicitly enables JobManager hooks.
Since hooks are disabled by default, adding a test with the 'use_jobmanager_hooks' marker would help confirm that hook registration and execution work correctly when enabled.
</issue_to_address>
### Comment 3
<location> `src/datachain/data_storage/metastore.py:1610-1612` </location>
<code_context>
def get_last_job_by_name(self, name: str, conn=None) -> Optional["Job"]:
query = (
self._jobs_query()
.where(self._jobs.c.name == name)
.order_by(self._jobs.c.created_at.desc())
.limit(1)
)
results = list(self.db.execute(query, conn=conn))
if not results:
return None
return self._parse_job(results[0])
</code_context>
<issue_to_address>
**suggestion (code-quality):** We've found these issues:
- Lift code into else after jump in control flow ([`reintroduce-else`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/reintroduce-else/))
- Replace if statement with if expression ([`assign-if-exp`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/assign-if-exp/))
```suggestion
return None if not results else self._parse_job(results[0])
```
</issue_to_address>
### Comment 4
<location> `tests/unit/test_job_manager.py:126-129` </location>
<code_context>
def test_finalize_failure_delegates_to_sys_excepthook(
test_session, patch_argv, patch_user_script
):
jm = JobManager()
job = jm.get_or_create(test_session)
called = {}
def fake_excepthook(exc_type, exc_value, tb):
called["exc"] = (exc_type, str(exc_value))
sys.__excepthook__, old_hook = fake_excepthook, sys.__excepthook__
try:
try:
raise ValueError("bad stuff")
except ValueError as e:
jm.finalize_failure(test_session, type(e), e, e.__traceback__)
finally:
sys.__excepthook__ = old_hook
assert "bad stuff" in called["exc"][1]
db_job = test_session.catalog.metastore.get_job(job.id)
assert db_job.status == JobStatus.FAILED
</code_context>
<issue_to_address>
**suggestion (code-quality):** Merge nested try-statement into a single try ([`flatten-nested-try`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/flatten-nested-try/))
```suggestion
raise ValueError("bad stuff")
except ValueError as e:
jm.finalize_failure(test_session, type(e), e, e.__traceback__)
```
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
2b570d9
to
0921aea
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1367 +/- ##
==========================================
+ Coverage 87.73% 87.95% +0.22%
==========================================
Files 160 160
Lines 15122 14958 -164
Branches 2171 2128 -43
==========================================
- Hits 13267 13157 -110
+ Misses 1356 1298 -58
- Partials 499 503 +4
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Consider moving the atexit and excepthook registration to a class‐level guard so hooks are only registered once per process, avoiding duplicate registrations when multiple JobManager instances are created.
- The abstract metastore interface now requires get_last_job_by_name; please ensure all supported backend implementations (not just SQLite) override this method to avoid runtime errors in other environments.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider moving the atexit and excepthook registration to a class‐level guard so hooks are only registered once per process, avoiding duplicate registrations when multiple JobManager instances are created.
- The abstract metastore interface now requires get_last_job_by_name; please ensure all supported backend implementations (not just SQLite) override this method to avoid runtime errors in other environments.
## Individual Comments
### Comment 1
<location> `src/datachain/lib/udf.py:165` </location>
<code_context>
inputs and outputs.
"""
parts = [
- hash_callable(self._func),
+ hash_callable(self._func) if self._func else "",
self.params.hash() if self.params else "",
self.output.hash(),
</code_context>
<issue_to_address>
**question (bug_risk):** Returning an empty string for missing _func may affect hash uniqueness.
Using an empty string may cause hash collisions for UDFs without a function. Consider using a distinct marker to preserve hash uniqueness.
</issue_to_address>
### Comment 2
<location> `tests/func/test_datachain.py:1670-1672` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 3
<location> `tests/test_job_manager_e2e.py:9` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Don't import test modules. ([`dont-import-test-modules`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/dont-import-test-modules))
<details><summary>Explanation</summary>Don't import test modules.
Tests should be self-contained and don't depend on each other.
If a helper function is used by multiple tests,
define it in a helper module,
instead of importing one test from the other.
</details>
</issue_to_address>
### Comment 4
<location> `tests/test_job_manager_e2e.py:73-76` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 5
<location> `tests/unit/lib/test_checkpoints.py:48-51` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 6
<location> `tests/unit/lib/test_checkpoints.py:66-70` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 7
<location> `src/datachain/data_storage/metastore.py:1610-1612` </location>
<code_context>
def get_last_job_by_name(self, name: str, conn=None) -> Optional["Job"]:
query = (
self._jobs_query()
.where(self._jobs.c.name == name)
.order_by(self._jobs.c.created_at.desc())
.limit(1)
)
results = list(self.db.execute(query, conn=conn))
if not results:
return None
return self._parse_job(results[0])
</code_context>
<issue_to_address>
**suggestion (code-quality):** We've found these issues:
- Lift code into else after jump in control flow ([`reintroduce-else`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/reintroduce-else/))
- Replace if statement with if expression ([`assign-if-exp`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/assign-if-exp/))
```suggestion
return None if not results else self._parse_job(results[0])
```
</issue_to_address>
### Comment 8
<location> `tests/unit/test_job_manager.py:128-131` </location>
<code_context>
def test_finalize_failure_delegates_to_previous_excepthook(
test_session, patch_argv, patch_user_script
):
jm = JobManager()
job = jm.get_or_create(test_session)
called = {}
def fake_excepthook(exc_type, exc_value, tb):
called["exc"] = (exc_type, str(exc_value))
# Replace the previous hook that JobManager saved
old_hook = jm._previous_excepthook
jm._previous_excepthook = fake_excepthook
try:
try:
raise ValueError("bad stuff")
except ValueError as e:
jm.finalize_failure(test_session, type(e), e, e.__traceback__)
finally:
jm._previous_excepthook = old_hook
assert "bad stuff" in called["exc"][1]
db_job = test_session.catalog.metastore.get_job(job.id)
assert db_job.status == JobStatus.FAILED
</code_context>
<issue_to_address>
**suggestion (code-quality):** Merge nested try-statement into a single try ([`flatten-nested-try`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/flatten-nested-try/))
```suggestion
raise ValueError("bad stuff")
except ValueError as e:
jm.finalize_failure(test_session, type(e), e, e.__traceback__)
```
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
src/datachain/lib/udf.py
Outdated
inputs and outputs. | ||
""" | ||
parts = [ | ||
hash_callable(self._func), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question (bug_risk): Returning an empty string for missing _func may affect hash uniqueness.
Using an empty string may cause hash collisions for UDFs without a function. Consider using a distinct marker to preserve hash uniqueness.
tests/unit/lib/test_checkpoints.py
Outdated
|
||
monkeypatch.setenv("DATACHAIN_CHECKPOINTS_RESET", str(False)) | ||
|
||
chain = dc.read_dataset("nums", session=test_session).settings(parallel=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at what point it create a job? at first save? can we remove / or a add a test where save is happening after UDF - probably not a big of a change ... just to make sure ...
does persist btw create a JOB? can I have a job w/o save with a mapper that I would like to restart ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at what point it create a job? at first save?
Correct.
can we remove / or a add a test where save is happening after UDF - probably not a big of a change ... just to make sure ...
Not sure I understand. In these tests save is actually happening after UDF.
does persist btw create a JOB?
Persist doesn't create a job. Job is created in actual DataChain.save()
function before running steps (before persist).
can I have a job w/o save with a mapper that I would like to restart ?
If there is no .save()
there is no Job
created and therefor no checkpoints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if we have a script with UDF and show() ? we don't want to save anything at all ... but we also want to reuse results as we iterate.
It feels like quite a limitation and even wrong design tbh that JOB is actually running, but no JOB is created... and JOB is created in save and only if we have save
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, checpoints / job should be created in that scenario as well but I would do that in follow up issue when implementing UDF checkpoints as here that is not important.
Regarding the place where job should be created - it should probably be before applying steps from DatasetQuery as thats where actual "job" starts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk, let's reconsider this in the next step
@ilongin there are a few other comments, let's please review them |
catalog = test_session.catalog | ||
|
||
dc.read_values(num=[1, 2, 3], session=test_session).save("nums") | ||
dc.read_values(num=list(range(1000)), session=test_session).save("nums") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it better to patch the value of the variable that defines the batch size or something?
Enables checkpoint usage with just running
python my_job.py
.For that to work couple of changes were required:
Job
instance for every dataset, regardless of if it's created with justpython my_script.py
or from SaaS. When we create dataset from SaaS then SaaS is taking care of job lifecycle (creating, updating state at the end etc.) but on local run it's managed indatachain
itself. Job instance in local run has name as the full path to the script, so 2 sequential runs of the same script means the next job created is a child of previous job / run.JobManager
was added to be a single place which we can ask to get aJob
for current process and which updates job state at the end in local env. IfDATACHAIN_JOB_ID
env is present, it means we manage job's lifecycle from outside and that job is just returned from manager, but otherwise new job is being created.JobManager
is used inDataChain.save()
and in current checkpoints testsJobManager
and e2e tests for checking if job is present when running local script.Summary by Sourcery
Centralize and standardize job lifecycle management in DataChain by adopting JobManager for both SaaS and local runs, integrating job IDs into save and checkpoint workflows, and bolstering coverage with new unit and end-to-end tests.
New Features:
Enhancements:
Tests: