Skip to content

Conversation

ilongin
Copy link
Contributor

@ilongin ilongin commented Sep 29, 2025

Enables checkpoint usage with just running python my_job.py.
For that to work couple of changes were required:

  • We now create a Job instance for every dataset, regardless of if it's created with just python my_script.py or from SaaS. When we create dataset from SaaS then SaaS is taking care of job lifecycle (creating, updating state at the end etc.) but on local run it's managed in datachain itself. Job instance in local run has name as the full path to the script, so 2 sequential runs of the same script means the next job created is a child of previous job / run.
  • JobManager was added to be a single place which we can ask to get a Job for current process and which updates job state at the end in local env. If DATACHAIN_JOB_ID env is present, it means we manage job's lifecycle from outside and that job is just returned from manager, but otherwise new job is being created.
  • JobManager is used in DataChain.save() and in current checkpoints tests
  • New tests were added for JobManager and e2e tests for checking if job is present when running local script.

Summary by Sourcery

Centralize and standardize job lifecycle management in DataChain by adopting JobManager for both SaaS and local runs, integrating job IDs into save and checkpoint workflows, and bolstering coverage with new unit and end-to-end tests.

New Features:

  • Introduce JobManager to orchestrate DataChain job creation, parent-child linking, and lifecycle management for local script runs
  • Always create and attach a Job instance for each save call, using DATACHAIN_JOB_ID for SaaS or local script path as name and linking sequential runs

Enhancements:

  • Integrate JobManager into DataChain.save and checkpoint resolution to propagate job IDs into dataset versions and checkpoints
  • Add metastore API get_last_job_by_name and utility get_user_script_source for reading the user script as job query

Tests:

  • Add unit tests for JobManager covering job reuse, creation, finalization, parent linking, reset behavior, and exception hook delegation
  • Extend checkpoint unit tests to exercise both SaaS and local job modes
  • Add tests for get_user_script_source and hash consistency for various read methods
  • Introduce end-to-end tests verifying single-job creation per script and failure handling with job status updates

@ilongin ilongin marked this pull request as draft September 29, 2025 22:05
Copy link
Contributor

sourcery-ai bot commented Sep 29, 2025

Reviewer's Guide

This PR introduces a local job lifecycle by implementing a central JobManager service that creates, links, and finalizes DataChain jobs for plain Python script runs, and wires it into the checkpoint and save logic so that local runs behave like SaaS-managed jobs.

Sequence diagram for local script job lifecycle with JobManager

sequenceDiagram
    participant User as actor User
    participant Script as "Python Script"
    participant JobManager
    participant Session
    participant Metastore
    participant Catalog

    User->>Script: Run script (python my_script.py)
    Script->>JobManager: get_or_create(session)
    JobManager->>Metastore: get_last_job_by_name(script_path)
    Metastore-->>JobManager: return last job (if any)
    JobManager->>Metastore: create_job(name=script_path, parent_job_id=last_job.id)
    Metastore-->>JobManager: return new job id
    JobManager->>Metastore: get_job(new_job_id)
    Metastore-->>JobManager: return new job
    JobManager-->>Script: return job
    Script->>Catalog: create_dataset(..., job_id=job.id)
    Catalog->>Metastore: create_checkpoint(job.id, hash)
    Note over JobManager: On exit, JobManager finalizes job status
Loading

Class diagram for new JobManager and Job lifecycle changes

classDiagram
    class JobManager {
        +Job job
        +JobStatus status
        +bool owned
        +get_or_create(session)
        +reset()
        +finalize_success(session)
        +finalize_failure(session, exc_type, exc_value, tb)
    }
    class Job {
        +str id
        +str name
        +str query
        +JobStatus status
        +str parent_job_id
    }
    JobManager --> Job : manages
    JobManager --> "Session" : uses
    Job --> "Job" : parent_job_id (optional)
    class Metastore {
        +get_job(job_id)
        +get_last_job_by_name(name)
        +create_job(...)
        +set_job_status(job_id, status, ...)
    }
    JobManager --> Metastore : interacts
    class DataChain {
        +save(...)
        +_resolve_checkpoint(...)
    }
    DataChain --> JobManager : uses
    DataChain --> Job : uses
    Metastore --> Job : creates/queries
    class Catalog {
        +create_dataset(..., job_id)
    }
    DataChain --> Catalog : uses
    Catalog --> Job : links via job_id
Loading

File-Level Changes

Change Details Files
Add JobManager to manage local and SaaS job lifecycles
  • Defined JobManager class with get_or_create, finalize_success/failure, reset and hook registration
  • Registered atexit and excepthook handlers for automatic completion/failure
  • Exposed a singleton instance job_manager
src/datachain/job.py
Integrate JobManager into DataChain save and checkpoint resolution
  • Call job_manager.get_or_create in save() to obtain current Job
  • Refactored _resolve_checkpoint signature to accept Job and always create checkpoint with job.id
  • Passed job_id into create_dataset operations
src/datachain/lib/dc/datachain.py
Extend metastore to support parent job lookups
  • Added abstract get_last_job_by_name to interface
  • Implemented get_last_job_by_name to retrieve the most recent job by name
src/datachain/data_storage/metastore.py
Support script source capture for job queries
  • Implemented get_user_script_source to read .py file from sys.argv
  • Updated job creation logic to use script source or fallback to command line
src/datachain/utils.py
src/datachain/job.py
Allow associating job_id with new dataset versions
  • Added job_id parameter to create_dataset and create_new_dataset_version signatures
  • Propagated job_id when creating dataset records
src/datachain/catalog/catalog.py
Expand and refactor tests for JobManager and checkpoint behavior
  • Refactored existing checkpoint tests to use job_manager and reset fixture
  • Added unit tests for JobManager lifecycle, hooks, and idempotency
  • Created end-to-end tests for job creation and failure scenarios
  • Added tests for get_user_script_source and updated datachain_hash tests
tests/unit/lib/test_checkpoints.py
tests/unit/test_job_manager.py
tests/test_job_manager_e2e.py
tests/unit/test_utils.py
tests/unit/test_datachain_hash.py
tests/func/test_datachain.py
tests/conftest.py

Possibly linked issues

  • #ISSUE-123: The PR implements JobManager to manage local job lifecycles, naming jobs by script paths, and linking re-runs, fulfilling the issue's core needs.
  • #Skip chains if checkpoint exists (Checkpoints Phase 1): The PR introduces a JobManager to handle job creation and lifecycle for local script runs, which is essential for the checkpointing mechanism described in the issue to function.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

cloudflare-workers-and-pages bot commented Sep 29, 2025

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: c6f7266
Status: ✅  Deploy successful!
Preview URL: https://aa3112dc.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-1361-local-checkpoin.datachain-documentation.pages.dev

View logs

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • Move atexit and sys.excepthook registration to a class‐level singleton (instead of per-instance) so hooks aren’t registered multiple times when multiple JobManager instances are created.
  • Wrap the existing sys.excepthook instead of clobbering it—store the prior hook and invoke it after finalize_failure to avoid losing upstream exception handling.
  • Consider making get_user_script_source more robust (e.g. handle module runs via -m or scripts without .py extensions) so that source detection works in more launch scenarios.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Move atexit and sys.excepthook registration to a class‐level singleton (instead of per-instance) so hooks aren’t registered multiple times when multiple JobManager instances are created.
- Wrap the existing sys.excepthook instead of clobbering it—store the prior hook and invoke it after finalize_failure to avoid losing upstream exception handling.
- Consider making get_user_script_source more robust (e.g. handle module runs via `-m` or scripts without `.py` extensions) so that source detection works in more launch scenarios.

## Individual Comments

### Comment 1
<location> `tests/unit/test_job_manager.py:102-109` </location>
<code_context>
+    assert job2.parent_job_id == job1.id
+
+
+def test_get_or_create_fallback_query(test_session, patch_argv):
+    # Patch to return None for script source
+    with patch("datachain.job.get_user_script_source", return_value=None):
+        jm = JobManager()
+        job = jm.get_or_create(test_session)
+
+    assert job.query.startswith("python ")
+    assert job.name.endswith("script.py")
+
+
</code_context>

<issue_to_address>
**suggestion (testing):** Consider testing fallback behavior for interactive sessions.

Please add a test case for when sys.argv is empty or not a script, to verify correct fallback query and job name behavior.

```suggestion
def test_get_or_create_fallback_query(test_session, patch_argv):
    # Patch to return None for script source
    with patch("datachain.job.get_user_script_source", return_value=None):
        jm = JobManager()
        job = jm.get_or_create(test_session)

    assert job.query.startswith("python ")
    assert job.name.endswith("script.py")


def test_get_or_create_interactive_fallback(test_session, monkeypatch):
    # Simulate interactive session: sys.argv is empty
    monkeypatch.setattr("sys.argv", [])
    with patch("datachain.job.get_user_script_source", return_value=None):
        jm = JobManager()
        job = jm.get_or_create(test_session)

    # Check fallback query and job name for interactive session
    assert job.query.startswith("python ")
    assert job.name == "interactive"
```
</issue_to_address>

### Comment 2
<location> `tests/conftest.py:1080-1089` </location>
<code_context>
     return compressed_parquet_data(dog_entries("1.0.0"), src_uri)
+
+
+@pytest.fixture(autouse=True)
+def disable_jobmanager_hooks(request):
+    # Run the test
+    yield
+
+    # After test finishes: remove only JobManager hooks, unless explicitly requested
+    if "use_jobmanager_hooks" not in request.keywords:
+        for fn in list(JobManager._hook_refs):
+            try:
+                atexit.unregister(fn)
+            except Exception as e:  # noqa: BLE001
+                print(f"Failed to unregister atexit hook: {e}")
+        JobManager._hook_refs.clear()
</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding a test that explicitly enables JobManager hooks.

Since hooks are disabled by default, adding a test with the 'use_jobmanager_hooks' marker would help confirm that hook registration and execution work correctly when enabled.
</issue_to_address>

### Comment 3
<location> `src/datachain/data_storage/metastore.py:1610-1612` </location>
<code_context>
    def get_last_job_by_name(self, name: str, conn=None) -> Optional["Job"]:
        query = (
            self._jobs_query()
            .where(self._jobs.c.name == name)
            .order_by(self._jobs.c.created_at.desc())
            .limit(1)
        )
        results = list(self.db.execute(query, conn=conn))
        if not results:
            return None
        return self._parse_job(results[0])

</code_context>

<issue_to_address>
**suggestion (code-quality):** We've found these issues:

- Lift code into else after jump in control flow ([`reintroduce-else`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/reintroduce-else/))
- Replace if statement with if expression ([`assign-if-exp`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/assign-if-exp/))

```suggestion
        return None if not results else self._parse_job(results[0])
```
</issue_to_address>

### Comment 4
<location> `tests/unit/test_job_manager.py:126-129` </location>
<code_context>
def test_finalize_failure_delegates_to_sys_excepthook(
    test_session, patch_argv, patch_user_script
):
    jm = JobManager()
    job = jm.get_or_create(test_session)

    called = {}

    def fake_excepthook(exc_type, exc_value, tb):
        called["exc"] = (exc_type, str(exc_value))

    sys.__excepthook__, old_hook = fake_excepthook, sys.__excepthook__

    try:
        try:
            raise ValueError("bad stuff")
        except ValueError as e:
            jm.finalize_failure(test_session, type(e), e, e.__traceback__)
    finally:
        sys.__excepthook__ = old_hook

    assert "bad stuff" in called["exc"][1]
    db_job = test_session.catalog.metastore.get_job(job.id)
    assert db_job.status == JobStatus.FAILED

</code_context>

<issue_to_address>
**suggestion (code-quality):** Merge nested try-statement into a single try ([`flatten-nested-try`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/flatten-nested-try/))

```suggestion
        raise ValueError("bad stuff")
    except ValueError as e:
        jm.finalize_failure(test_session, type(e), e, e.__traceback__)
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@ilongin ilongin linked an issue Sep 29, 2025 that may be closed by this pull request
Base automatically changed from ilongin/1350-checkpoints-skip-chains to main September 30, 2025 20:32
@ilongin ilongin force-pushed the ilongin/1361-local-checkpoints-usage branch from 2b570d9 to 0921aea Compare September 30, 2025 22:30
Copy link

codecov bot commented Oct 1, 2025

Codecov Report

❌ Patch coverage is 83.33333% with 14 lines in your changes missing coverage. Please review.
✅ Project coverage is 87.95%. Comparing base (54c03fe) to head (c6f7266).

Files with missing lines Patch % Lines
src/datachain/query/session.py 80.28% 7 Missing and 7 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1367      +/-   ##
==========================================
+ Coverage   87.73%   87.95%   +0.22%     
==========================================
  Files         160      160              
  Lines       15122    14958     -164     
  Branches     2171     2128      -43     
==========================================
- Hits        13267    13157     -110     
+ Misses       1356     1298      -58     
- Partials      499      503       +4     
Flag Coverage Δ
datachain 87.91% <83.33%> (+0.22%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
src/datachain/catalog/catalog.py 83.62% <ø> (+0.23%) ⬆️
src/datachain/data_storage/metastore.py 94.13% <100.00%> (+0.06%) ⬆️
src/datachain/job.py 100.00% <ø> (ø)
src/datachain/lib/dc/datachain.py 91.84% <100.00%> (-0.07%) ⬇️
src/datachain/lib/dc/records.py 100.00% <ø> (ø)
src/datachain/query/dataset.py 93.39% <ø> (-0.01%) ⬇️
src/datachain/query/session.py 87.73% <80.28%> (+2.82%) ⬆️

... and 1 file with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@ilongin ilongin marked this pull request as ready for review October 2, 2025 08:27
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • Consider moving the atexit and excepthook registration to a class‐level guard so hooks are only registered once per process, avoiding duplicate registrations when multiple JobManager instances are created.
  • The abstract metastore interface now requires get_last_job_by_name; please ensure all supported backend implementations (not just SQLite) override this method to avoid runtime errors in other environments.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Consider moving the atexit and excepthook registration to a class‐level guard so hooks are only registered once per process, avoiding duplicate registrations when multiple JobManager instances are created.
- The abstract metastore interface now requires get_last_job_by_name; please ensure all supported backend implementations (not just SQLite) override this method to avoid runtime errors in other environments.

## Individual Comments

### Comment 1
<location> `src/datachain/lib/udf.py:165` </location>
<code_context>
         inputs and outputs.
         """
         parts = [
-            hash_callable(self._func),
+            hash_callable(self._func) if self._func else "",
             self.params.hash() if self.params else "",
             self.output.hash(),
</code_context>

<issue_to_address>
**question (bug_risk):** Returning an empty string for missing _func may affect hash uniqueness.

Using an empty string may cause hash collisions for UDFs without a function. Consider using a distinct marker to preserve hash uniqueness.
</issue_to_address>

### Comment 2
<location> `tests/func/test_datachain.py:1670-1672` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 3
<location> `tests/test_job_manager_e2e.py:9` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Don't import test modules. ([`dont-import-test-modules`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/dont-import-test-modules))

<details><summary>Explanation</summary>Don't import test modules.

Tests should be self-contained and don't depend on each other.

If a helper function is used by multiple tests,
define it in a helper module,
instead of importing one test from the other.
</details>
</issue_to_address>

### Comment 4
<location> `tests/test_job_manager_e2e.py:73-76` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid loops in tests. ([`no-loop-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-loop-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 5
<location> `tests/unit/lib/test_checkpoints.py:48-51` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 6
<location> `tests/unit/lib/test_checkpoints.py:66-70` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 7
<location> `src/datachain/data_storage/metastore.py:1610-1612` </location>
<code_context>
    def get_last_job_by_name(self, name: str, conn=None) -> Optional["Job"]:
        query = (
            self._jobs_query()
            .where(self._jobs.c.name == name)
            .order_by(self._jobs.c.created_at.desc())
            .limit(1)
        )
        results = list(self.db.execute(query, conn=conn))
        if not results:
            return None
        return self._parse_job(results[0])

</code_context>

<issue_to_address>
**suggestion (code-quality):** We've found these issues:

- Lift code into else after jump in control flow ([`reintroduce-else`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/reintroduce-else/))
- Replace if statement with if expression ([`assign-if-exp`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/assign-if-exp/))

```suggestion
        return None if not results else self._parse_job(results[0])
```
</issue_to_address>

### Comment 8
<location> `tests/unit/test_job_manager.py:128-131` </location>
<code_context>
def test_finalize_failure_delegates_to_previous_excepthook(
    test_session, patch_argv, patch_user_script
):
    jm = JobManager()
    job = jm.get_or_create(test_session)

    called = {}

    def fake_excepthook(exc_type, exc_value, tb):
        called["exc"] = (exc_type, str(exc_value))

    # Replace the previous hook that JobManager saved
    old_hook = jm._previous_excepthook
    jm._previous_excepthook = fake_excepthook

    try:
        try:
            raise ValueError("bad stuff")
        except ValueError as e:
            jm.finalize_failure(test_session, type(e), e, e.__traceback__)
    finally:
        jm._previous_excepthook = old_hook

    assert "bad stuff" in called["exc"][1]
    db_job = test_session.catalog.metastore.get_job(job.id)
    assert db_job.status == JobStatus.FAILED

</code_context>

<issue_to_address>
**suggestion (code-quality):** Merge nested try-statement into a single try ([`flatten-nested-try`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/flatten-nested-try/))

```suggestion
        raise ValueError("bad stuff")
    except ValueError as e:
        jm.finalize_failure(test_session, type(e), e, e.__traceback__)
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

inputs and outputs.
"""
parts = [
hash_callable(self._func),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question (bug_risk): Returning an empty string for missing _func may affect hash uniqueness.

Using an empty string may cause hash collisions for UDFs without a function. Consider using a distinct marker to preserve hash uniqueness.


monkeypatch.setenv("DATACHAIN_CHECKPOINTS_RESET", str(False))

chain = dc.read_dataset("nums", session=test_session).settings(parallel=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at what point it create a job? at first save? can we remove / or a add a test where save is happening after UDF - probably not a big of a change ... just to make sure ...

does persist btw create a JOB? can I have a job w/o save with a mapper that I would like to restart ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at what point it create a job? at first save?

Correct.

can we remove / or a add a test where save is happening after UDF - probably not a big of a change ... just to make sure ...

Not sure I understand. In these tests save is actually happening after UDF.

does persist btw create a JOB?

Persist doesn't create a job. Job is created in actual DataChain.save() function before running steps (before persist).

can I have a job w/o save with a mapper that I would like to restart ?

If there is no .save() there is no Job created and therefor no checkpoints.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we have a script with UDF and show() ? we don't want to save anything at all ... but we also want to reuse results as we iterate.

It feels like quite a limitation and even wrong design tbh that JOB is actually running, but no JOB is created... and JOB is created in save and only if we have save

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, checpoints / job should be created in that scenario as well but I would do that in follow up issue when implementing UDF checkpoints as here that is not important.

Regarding the place where job should be created - it should probably be before applying steps from DatasetQuery as thats where actual "job" starts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kk, let's reconsider this in the next step

@shcheklein
Copy link
Member

@ilongin there are a few other comments, let's please review them

catalog = test_session.catalog

dc.read_values(num=[1, 2, 3], session=test_session).save("nums")
dc.read_values(num=list(range(1000)), session=test_session).save("nums")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it better to patch the value of the variable that defines the batch size or something?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add ability to utilize checkpoints from local run

4 participants