-
Notifications
You must be signed in to change notification settings - Fork 130
Implement checkpoints usage in local #1367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
67 commits
Select commit
Hold shift + click to select a range
de9e0de
added logic to skip applying chain if checkpoint exist
ilongin 4f5f304
removed not needed code
ilongin a0ad1b8
adding prints
ilongin 2aae374
removed prints
ilongin b1df328
added job manager
ilongin b9c2020
added tests for util function to get user code
ilongin d0a4129
using job manager and adding unit tests for it
ilongin f66dc53
refactor
ilongin 0921aea
made reset checkpoints as default for now
ilongin ea34286
added job manager reset and refactoring test_checkpoints to use new j…
ilongin f19a055
refactoring, fixing tests
ilongin 33a2bcd
adding job e2e tests
ilongin 9988d8c
refactoring tests
ilongin fecdcae
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin 4ea1169
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin b9d748a
fix
ilongin 2f806ca
fixig tests
ilongin 01d0711
fixing job manager tests for keyboard interruption
ilongin 105b03b
fixing windows test
ilongin 0efc106
added more elements to hash
ilongin 5db18e5
fixing test
ilongin cbdcac2
removed JobManager and moved its logic to Session
ilongin dd2487d
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin b26b150
mergint with main
ilongin 09af752
removed saving query string in new job locally
ilongin f38fc70
merged with main
ilongin 99afaeb
moved reset_job_state to test from Session
ilongin b896d3c
moved get_last_job_by_name to sqlite metastore
ilongin 7d27879
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin 18aa7c9
moved job to Session class attributes to ensure same job per process,…
ilongin 428e10d
making job name random in interactive runs
ilongin 84eebc7
refactoring except_hook
ilongin 0e09ce8
moved tests from test_datachain to test_job_management
ilongin a7ff56b
fixing issue with updating job state because of hook after db is cleaned
ilongin 8c006c8
fixing typing
ilongin c4f1b34
merging with main
ilongin 74a1106
fixing windows tests
ilongin b009a4a
more robust check if is script run
ilongin d3acdb0
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin 2e924f2
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin fb9306b
using util function to clean hooks
ilongin 57d4845
removed session arg from reset_session_job_state
ilongin 813cbd5
added checkpoint test with parallel and fixed deregistering job hooks
ilongin 6ecf6b0
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin 026996e
more granular exception check
ilongin b368d93
using better fixture
ilongin cedbabd
moved test_checkpoints_parallel to func tests
ilongin faee8ac
increasing number of rows in test
ilongin ee6d880
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin 354185b
removing logic of removing datasets on job failure
ilongin 07d881e
moved function to abstract
ilongin ecf102f
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin 44e738c
fix test
ilongin 76aba6c
adding docs and removing not needed abstract method
ilongin 1fabb7a
adding checkpoint docs link
ilongin 737a05a
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin ef80ca1
fixing docs
ilongin 38e2bfc
adding missing abstract method
ilongin 1167983
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin 4237b88
fixing parsing parent job id
ilongin f374500
skipping hf tests
ilongin fdd533d
returning tests
ilongin c6f7266
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin 24c6b36
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin 77eafbc
fix test
ilongin 380f36f
merged with main
ilongin 855c841
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,207 @@ | ||
| # Checkpoints | ||
|
|
||
| Checkpoints allow DataChain to automatically skip re-creating datasets that were successfully saved in previous script runs. When a script fails or is interrupted, you can re-run it and DataChain will resume from where it left off, reusing datasets that were already created. | ||
|
|
||
| **Note:** Checkpoints are currently available only for local script runs. Support for Studio is planned for future releases. | ||
|
|
||
| ## How Checkpoints Work | ||
|
|
||
| When you run a Python script locally (e.g., `python my_script.py`), DataChain automatically: | ||
|
|
||
| 1. **Creates a job** for the script execution, using the script's absolute path as the job name | ||
| 2. **Tracks parent jobs** by finding the last job with the same script name | ||
| 3. **Calculates hashes** for each dataset save operation based on the DataChain operations chain | ||
| 4. **Creates checkpoints** after each successful `.save()` call, storing the hash | ||
| 5. **Checks for existing checkpoints** on subsequent runs - if a matching checkpoint exists in the parent job, DataChain skips the save and reuses the existing dataset | ||
|
|
||
| This means that if your script creates multiple datasets and fails partway through, the next run will skip recreating the datasets that were already successfully saved. | ||
|
|
||
| ## Example | ||
|
|
||
| Consider this script that processes data in multiple stages: | ||
|
|
||
| ```python | ||
| import datachain as dc | ||
|
|
||
| # Stage 1: Load and filter data | ||
| filtered = ( | ||
| dc.read_csv("s3://mybucket/data.csv") | ||
| .filter(dc.C("score") > 0.5) | ||
| .save("filtered_data") | ||
| ) | ||
|
|
||
| # Stage 2: Transform data | ||
| transformed = ( | ||
| filtered | ||
| .map(value=lambda x: x * 2, output=float) | ||
| .save("transformed_data") | ||
| ) | ||
|
|
||
| # Stage 3: Aggregate results | ||
| result = ( | ||
| transformed | ||
| .agg( | ||
| total=lambda values: sum(values), | ||
| partition_by="category", | ||
| ) | ||
| .save("final_results") | ||
| ) | ||
| ``` | ||
|
|
||
| **First run:** The script executes all three stages and creates three datasets: `filtered_data`, `transformed_data`, and `final_results`. If the script fails during Stage 3, only `filtered_data` and `transformed_data` are saved. | ||
|
|
||
| **Second run:** DataChain detects that `filtered_data` and `transformed_data` were already created in the parent job with matching hashes. It skips recreating them and proceeds directly to Stage 3, creating only `final_results`. | ||
|
|
||
| ## When Checkpoints Are Used | ||
|
|
||
| Checkpoints are automatically used when: | ||
|
|
||
| - Running a Python script locally (e.g., `python my_script.py`) | ||
| - The script has been run before | ||
| - A dataset with the same name is being saved | ||
| - The chain hash matches a checkpoint from the parent job | ||
|
|
||
| Checkpoints are **not** used when: | ||
|
|
||
| - Running code interactively (Python REPL, Jupyter notebooks) | ||
| - Running code as a module (e.g., `python -m mymodule`) | ||
| - The `DATACHAIN_CHECKPOINTS_RESET` environment variable is set (see below) | ||
| - Running on Studio (checkpoints support planned for future releases) | ||
|
|
||
| ## Resetting Checkpoints | ||
|
|
||
| To ignore existing checkpoints and run your script from scratch, set the `DATACHAIN_CHECKPOINTS_RESET` environment variable: | ||
|
|
||
| ```bash | ||
| export DATACHAIN_CHECKPOINTS_RESET=1 | ||
| python my_script.py | ||
| ``` | ||
|
|
||
| Or set it inline: | ||
|
|
||
| ```bash | ||
| DATACHAIN_CHECKPOINTS_RESET=1 python my_script.py | ||
| ``` | ||
|
|
||
| This forces DataChain to recreate all datasets, regardless of existing checkpoints. | ||
|
|
||
| ## How Job Names Are Determined | ||
|
|
||
| DataChain uses different strategies for naming jobs depending on how the code is executed: | ||
|
|
||
| ### Script Execution (Checkpoints Enabled) | ||
|
|
||
| When running `python my_script.py`, DataChain uses the **absolute path** to the script as the job name: | ||
|
|
||
| ``` | ||
| /home/user/projects/my_script.py | ||
| ``` | ||
|
|
||
| This allows DataChain to link runs of the same script together as parent-child jobs, enabling checkpoint lookup. | ||
|
|
||
| ### Interactive or Module Execution (Checkpoints Disabled) | ||
|
|
||
| When running code interactively or as a module, DataChain uses a **unique UUID** as the job name: | ||
|
|
||
| ``` | ||
| a1b2c3d4-e5f6-7890-abcd-ef1234567890 | ||
| ``` | ||
|
|
||
| This prevents unrelated executions from being linked together, but also means checkpoints cannot be used. | ||
|
|
||
| ## How Checkpoint Hashes Are Calculated | ||
|
|
||
| For each `.save()` operation, DataChain calculates a hash based on: | ||
|
|
||
| 1. The hash of the previous checkpoint in the current job (if any) | ||
| 2. The hash of the current DataChain operations chain | ||
|
|
||
| This creates a chain of hashes that uniquely identifies each stage of data processing. On subsequent runs, DataChain matches these hashes against the parent job's checkpoints and skips recreating datasets where the hashes match. | ||
|
|
||
| ### Hash Invalidation | ||
|
|
||
| **Checkpoints are automatically invalidated when you modify the chain.** Any change to the DataChain operations will result in a different hash, causing DataChain to skip the checkpoint and recompute the dataset. | ||
|
|
||
| Changes that invalidate checkpoints include: | ||
|
|
||
| - **Modifying filter conditions:** `.filter(dc.C("score") > 0.5)` → `.filter(dc.C("score") > 0.8)` | ||
| - **Changing map/gen/agg functions:** Any modification to UDF logic | ||
| - **Altering function parameters:** Changes to column names, output types, or other parameters | ||
| - **Adding or removing operations:** Inserting new `.filter()`, `.map()`, or other steps | ||
| - **Reordering operations:** Changing the sequence of transformations | ||
|
|
||
| ### Example | ||
|
|
||
| ```python | ||
| # First run - creates three checkpoints | ||
| dc.read_csv("data.csv").save("stage1") # Hash = H1 | ||
|
|
||
| dc.read_dataset("stage1").filter(dc.C("x") > 5).save("stage2") # Hash = H2 = hash(H1 + pipeline_hash) | ||
|
|
||
| dc.read_dataset("stage2").select("name", "value").save("stage3") # Hash = H3 = hash(H2 + pipeline_hash) | ||
| ``` | ||
|
|
||
| **Second run (no changes):** | ||
| - All three hashes match → all three datasets are reused → no computation | ||
|
|
||
| **Second run (modified filter):** | ||
| ```python | ||
| dc.read_csv("data.csv").save("stage1") # Hash = H1 matches ✓ → reused | ||
|
|
||
| dc.read_dataset("stage1").filter(dc.C("x") > 10).save("stage2") # Hash ≠ H2 ✗ → recomputed | ||
|
|
||
| dc.read_dataset("stage2").select("name", "value").save("stage3") # Hash ≠ H3 ✗ → recomputed | ||
| ``` | ||
|
|
||
| Because the filter changed, `stage2` has a different hash and must be recomputed. Since `stage3` depends on `stage2`, its hash also changes (because it includes H2 in the calculation), so it must be recomputed as well. | ||
|
|
||
| **Key insight:** Modifying any step in the chain invalidates that checkpoint and all subsequent checkpoints, because the hash chain is broken. | ||
|
|
||
| ## Dataset Persistence | ||
|
|
||
| Starting with the checkpoints feature, datasets created during script execution persist even if the script fails or is interrupted. This is essential for checkpoint functionality, as it allows subsequent runs to reuse successfully created datasets. | ||
|
|
||
| If you need to clean up datasets from failed runs, you can use: | ||
|
|
||
| ```python | ||
| import datachain as dc | ||
|
|
||
| # Remove a specific dataset | ||
| dc.delete_dataset("dataset_name") | ||
|
|
||
| # List all datasets to see what's available | ||
| for ds in dc.datasets(): | ||
| print(ds.name) | ||
| ``` | ||
|
|
||
| ## Limitations | ||
|
|
||
| - **Local only:** Checkpoints currently work only for local script runs. Studio support is planned. | ||
| - **Script-based:** Code must be run as a script (not interactively or as a module). | ||
| - **Hash-based matching:** Any change to the chain will create a different hash, preventing checkpoint reuse. | ||
| - **Same script path:** The script must be run from the same absolute path for parent job linking to work. | ||
|
|
||
| ## Future Plans | ||
|
|
||
| ### Studio Support | ||
|
|
||
| Support for checkpoints on Studio is planned for future releases, which will enable checkpoint functionality for collaborative workflows and cloud-based data processing. | ||
|
|
||
| ### UDF-Level Checkpoints | ||
|
|
||
| Currently, checkpoints are created only when datasets are saved using `.save()`. This means that if a script fails during a long-running UDF operation (like `.map()`, `.gen()`, or `.agg()`), the entire UDF computation must be rerun on the next execution. | ||
|
|
||
| Future versions will support **UDF-level checkpoints**, creating checkpoints after each UDF step in the chain. This will provide much more granular recovery: | ||
|
|
||
| ```python | ||
| # Future behavior with UDF-level checkpoints | ||
| result = ( | ||
| dc.read_csv("data.csv") | ||
| .map(heavy_computation_1) # Checkpoint created after this UDF | ||
| .map(heavy_computation_2) # Checkpoint created after this UDF | ||
| .map(heavy_computation_3) # Checkpoint created after this UDF | ||
| .save("result") | ||
| ) | ||
| ``` | ||
|
|
||
| If the script fails during `heavy_computation_3`, the next run will skip re-executing `heavy_computation_1` and `heavy_computation_2`, resuming only the work that wasn't completed. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good! how much of this is AI generated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All. I just did maybe 5-6 prompts to make some things better, but even the first version would be good enough I think...for now I found AI to be most useful in generating docs and helping understand the cause of non trivial bugs.