Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
de9e0de
added logic to skip applying chain if checkpoint exist
ilongin Sep 24, 2025
4f5f304
removed not needed code
ilongin Sep 24, 2025
a0ad1b8
adding prints
ilongin Sep 25, 2025
2aae374
removed prints
ilongin Sep 25, 2025
b1df328
added job manager
ilongin Sep 26, 2025
b9c2020
added tests for util function to get user code
ilongin Sep 26, 2025
d0a4129
using job manager and adding unit tests for it
ilongin Sep 29, 2025
f66dc53
refactor
ilongin Sep 29, 2025
0921aea
made reset checkpoints as default for now
ilongin Sep 30, 2025
ea34286
added job manager reset and refactoring test_checkpoints to use new j…
ilongin Oct 1, 2025
f19a055
refactoring, fixing tests
ilongin Oct 1, 2025
33a2bcd
adding job e2e tests
ilongin Oct 1, 2025
9988d8c
refactoring tests
ilongin Oct 1, 2025
fecdcae
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 1, 2025
4ea1169
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 2, 2025
b9d748a
fix
ilongin Oct 2, 2025
2f806ca
fixig tests
ilongin Oct 2, 2025
01d0711
fixing job manager tests for keyboard interruption
ilongin Oct 2, 2025
105b03b
fixing windows test
ilongin Oct 2, 2025
0efc106
added more elements to hash
ilongin Oct 2, 2025
5db18e5
fixing test
ilongin Oct 2, 2025
cbdcac2
removed JobManager and moved its logic to Session
ilongin Oct 3, 2025
dd2487d
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 3, 2025
b26b150
mergint with main
ilongin Oct 3, 2025
09af752
removed saving query string in new job locally
ilongin Oct 3, 2025
f38fc70
merged with main
ilongin Oct 3, 2025
99afaeb
moved reset_job_state to test from Session
ilongin Oct 3, 2025
b896d3c
moved get_last_job_by_name to sqlite metastore
ilongin Oct 3, 2025
7d27879
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 5, 2025
18aa7c9
moved job to Session class attributes to ensure same job per process,…
ilongin Oct 5, 2025
428e10d
making job name random in interactive runs
ilongin Oct 5, 2025
84eebc7
refactoring except_hook
ilongin Oct 5, 2025
0e09ce8
moved tests from test_datachain to test_job_management
ilongin Oct 6, 2025
a7ff56b
fixing issue with updating job state because of hook after db is cleaned
ilongin Oct 6, 2025
8c006c8
fixing typing
ilongin Oct 6, 2025
c4f1b34
merging with main
ilongin Oct 7, 2025
74a1106
fixing windows tests
ilongin Oct 7, 2025
b009a4a
more robust check if is script run
ilongin Oct 8, 2025
d3acdb0
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 8, 2025
2e924f2
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 9, 2025
fb9306b
using util function to clean hooks
ilongin Oct 9, 2025
57d4845
removed session arg from reset_session_job_state
ilongin Oct 9, 2025
813cbd5
added checkpoint test with parallel and fixed deregistering job hooks
ilongin Oct 9, 2025
6ecf6b0
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 13, 2025
026996e
more granular exception check
ilongin Oct 13, 2025
b368d93
using better fixture
ilongin Oct 13, 2025
cedbabd
moved test_checkpoints_parallel to func tests
ilongin Oct 13, 2025
faee8ac
increasing number of rows in test
ilongin Oct 13, 2025
ee6d880
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 14, 2025
354185b
removing logic of removing datasets on job failure
ilongin Oct 14, 2025
07d881e
moved function to abstract
ilongin Oct 15, 2025
ecf102f
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 15, 2025
44e738c
fix test
ilongin Oct 15, 2025
76aba6c
adding docs and removing not needed abstract method
ilongin Oct 15, 2025
1fabb7a
adding checkpoint docs link
ilongin Oct 15, 2025
737a05a
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 16, 2025
ef80ca1
fixing docs
ilongin Oct 16, 2025
38e2bfc
adding missing abstract method
ilongin Oct 16, 2025
1167983
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 16, 2025
4237b88
fixing parsing parent job id
ilongin Oct 17, 2025
f374500
skipping hf tests
ilongin Oct 17, 2025
fdd533d
returning tests
ilongin Oct 17, 2025
c6f7266
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 17, 2025
24c6b36
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 18, 2025
77eafbc
fix test
ilongin Oct 18, 2025
380f36f
merged with main
ilongin Oct 19, 2025
855c841
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 207 additions & 0 deletions docs/guide/checkpoints.md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good! how much of this is AI generated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All. I just did maybe 5-6 prompts to make some things better, but even the first version would be good enough I think...for now I found AI to be most useful in generating docs and helping understand the cause of non trivial bugs.

Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
# Checkpoints

Checkpoints allow DataChain to automatically skip re-creating datasets that were successfully saved in previous script runs. When a script fails or is interrupted, you can re-run it and DataChain will resume from where it left off, reusing datasets that were already created.

**Note:** Checkpoints are currently available only for local script runs. Support for Studio is planned for future releases.

## How Checkpoints Work

When you run a Python script locally (e.g., `python my_script.py`), DataChain automatically:

1. **Creates a job** for the script execution, using the script's absolute path as the job name
2. **Tracks parent jobs** by finding the last job with the same script name
3. **Calculates hashes** for each dataset save operation based on the DataChain operations chain
4. **Creates checkpoints** after each successful `.save()` call, storing the hash
5. **Checks for existing checkpoints** on subsequent runs - if a matching checkpoint exists in the parent job, DataChain skips the save and reuses the existing dataset

This means that if your script creates multiple datasets and fails partway through, the next run will skip recreating the datasets that were already successfully saved.

## Example

Consider this script that processes data in multiple stages:

```python
import datachain as dc

# Stage 1: Load and filter data
filtered = (
dc.read_csv("s3://mybucket/data.csv")
.filter(dc.C("score") > 0.5)
.save("filtered_data")
)

# Stage 2: Transform data
transformed = (
filtered
.map(value=lambda x: x * 2, output=float)
.save("transformed_data")
)

# Stage 3: Aggregate results
result = (
transformed
.agg(
total=lambda values: sum(values),
partition_by="category",
)
.save("final_results")
)
```

**First run:** The script executes all three stages and creates three datasets: `filtered_data`, `transformed_data`, and `final_results`. If the script fails during Stage 3, only `filtered_data` and `transformed_data` are saved.

**Second run:** DataChain detects that `filtered_data` and `transformed_data` were already created in the parent job with matching hashes. It skips recreating them and proceeds directly to Stage 3, creating only `final_results`.

## When Checkpoints Are Used

Checkpoints are automatically used when:

- Running a Python script locally (e.g., `python my_script.py`)
- The script has been run before
- A dataset with the same name is being saved
- The chain hash matches a checkpoint from the parent job

Checkpoints are **not** used when:

- Running code interactively (Python REPL, Jupyter notebooks)
- Running code as a module (e.g., `python -m mymodule`)
- The `DATACHAIN_CHECKPOINTS_RESET` environment variable is set (see below)
- Running on Studio (checkpoints support planned for future releases)

## Resetting Checkpoints

To ignore existing checkpoints and run your script from scratch, set the `DATACHAIN_CHECKPOINTS_RESET` environment variable:

```bash
export DATACHAIN_CHECKPOINTS_RESET=1
python my_script.py
```

Or set it inline:

```bash
DATACHAIN_CHECKPOINTS_RESET=1 python my_script.py
```

This forces DataChain to recreate all datasets, regardless of existing checkpoints.

## How Job Names Are Determined

DataChain uses different strategies for naming jobs depending on how the code is executed:

### Script Execution (Checkpoints Enabled)

When running `python my_script.py`, DataChain uses the **absolute path** to the script as the job name:

```
/home/user/projects/my_script.py
```

This allows DataChain to link runs of the same script together as parent-child jobs, enabling checkpoint lookup.

### Interactive or Module Execution (Checkpoints Disabled)

When running code interactively or as a module, DataChain uses a **unique UUID** as the job name:

```
a1b2c3d4-e5f6-7890-abcd-ef1234567890
```

This prevents unrelated executions from being linked together, but also means checkpoints cannot be used.

## How Checkpoint Hashes Are Calculated

For each `.save()` operation, DataChain calculates a hash based on:

1. The hash of the previous checkpoint in the current job (if any)
2. The hash of the current DataChain operations chain

This creates a chain of hashes that uniquely identifies each stage of data processing. On subsequent runs, DataChain matches these hashes against the parent job's checkpoints and skips recreating datasets where the hashes match.

### Hash Invalidation

**Checkpoints are automatically invalidated when you modify the chain.** Any change to the DataChain operations will result in a different hash, causing DataChain to skip the checkpoint and recompute the dataset.

Changes that invalidate checkpoints include:

- **Modifying filter conditions:** `.filter(dc.C("score") > 0.5)` → `.filter(dc.C("score") > 0.8)`
- **Changing map/gen/agg functions:** Any modification to UDF logic
- **Altering function parameters:** Changes to column names, output types, or other parameters
- **Adding or removing operations:** Inserting new `.filter()`, `.map()`, or other steps
- **Reordering operations:** Changing the sequence of transformations

### Example

```python
# First run - creates three checkpoints
dc.read_csv("data.csv").save("stage1") # Hash = H1

dc.read_dataset("stage1").filter(dc.C("x") > 5).save("stage2") # Hash = H2 = hash(H1 + pipeline_hash)

dc.read_dataset("stage2").select("name", "value").save("stage3") # Hash = H3 = hash(H2 + pipeline_hash)
```

**Second run (no changes):**
- All three hashes match → all three datasets are reused → no computation

**Second run (modified filter):**
```python
dc.read_csv("data.csv").save("stage1") # Hash = H1 matches ✓ → reused

dc.read_dataset("stage1").filter(dc.C("x") > 10).save("stage2") # Hash ≠ H2 ✗ → recomputed

dc.read_dataset("stage2").select("name", "value").save("stage3") # Hash ≠ H3 ✗ → recomputed
```

Because the filter changed, `stage2` has a different hash and must be recomputed. Since `stage3` depends on `stage2`, its hash also changes (because it includes H2 in the calculation), so it must be recomputed as well.

**Key insight:** Modifying any step in the chain invalidates that checkpoint and all subsequent checkpoints, because the hash chain is broken.

## Dataset Persistence

Starting with the checkpoints feature, datasets created during script execution persist even if the script fails or is interrupted. This is essential for checkpoint functionality, as it allows subsequent runs to reuse successfully created datasets.

If you need to clean up datasets from failed runs, you can use:

```python
import datachain as dc

# Remove a specific dataset
dc.delete_dataset("dataset_name")

# List all datasets to see what's available
for ds in dc.datasets():
print(ds.name)
```

## Limitations

- **Local only:** Checkpoints currently work only for local script runs. Studio support is planned.
- **Script-based:** Code must be run as a script (not interactively or as a module).
- **Hash-based matching:** Any change to the chain will create a different hash, preventing checkpoint reuse.
- **Same script path:** The script must be run from the same absolute path for parent job linking to work.

## Future Plans

### Studio Support

Support for checkpoints on Studio is planned for future releases, which will enable checkpoint functionality for collaborative workflows and cloud-based data processing.

### UDF-Level Checkpoints

Currently, checkpoints are created only when datasets are saved using `.save()`. This means that if a script fails during a long-running UDF operation (like `.map()`, `.gen()`, or `.agg()`), the entire UDF computation must be rerun on the next execution.

Future versions will support **UDF-level checkpoints**, creating checkpoints after each UDF step in the chain. This will provide much more granular recovery:

```python
# Future behavior with UDF-level checkpoints
result = (
dc.read_csv("data.csv")
.map(heavy_computation_1) # Checkpoint created after this UDF
.map(heavy_computation_2) # Checkpoint created after this UDF
.map(heavy_computation_3) # Checkpoint created after this UDF
.save("result")
)
```

If the script fails during `heavy_computation_3`, the next run will skip re-executing `heavy_computation_1` and `heavy_computation_2`, resuming only the work that wasn't completed.
1 change: 1 addition & 0 deletions docs/guide/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Welcome to the DataChain User Guide! This section provides comprehensive documen
- [Data Processing Overview](./processing.md) - Discover DataChain's specialized data processing features.
- [Delta Processing](./delta.md) - Incremental data processing to efficiently handle large datasets that change over time.
- [Error Handling and Retries](./retry.md) - Learn how to handle processing errors and selectively reprocess problematic records.
- [Checkpoints](./checkpoints.md) - Automatically resume script execution from where it left off after failures.
- [Environment Variables](./env.md) - Configure DataChain's behavior using environment variables.
- [Namespaces](./namespaces.md) - Learn more about namespaces and projects.
- [Local DB Migrations](./namespaces.md) - Learn how to handle local DB migrations after upgrading datachain.
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ nav:
- Overview: guide/processing.md
- Delta Processing: guide/delta.md
- Errors Handling and Retries: guide/retry.md
- Checkpoints: guide/checkpoints.md
- Environment Variables: guide/env.md
- Namespaces: guide/namespaces.md
- Local DB Migrations: guide/db_migrations.md
Expand Down
2 changes: 2 additions & 0 deletions src/datachain/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ def create_dataset(
description: str | None = None,
attrs: list[str] | None = None,
update_version: str | None = "patch",
job_id: str | None = None,
) -> "DatasetRecord":
"""
Creates new dataset of a specific version.
Expand Down Expand Up @@ -866,6 +867,7 @@ def create_dataset(
create_rows_table=create_rows,
columns=columns,
uuid=uuid,
job_id=job_id,
)

def create_new_dataset_version(
Expand Down
16 changes: 16 additions & 0 deletions src/datachain/data_storage/metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,10 @@ def set_job_status(
def get_job_status(self, job_id: str) -> JobStatus | None:
"""Returns the status of the given job."""

@abstractmethod
def get_last_job_by_name(self, name: str, conn=None) -> "Job | None":
"""Returns the last job with the given name, ordered by created_at."""

#
# Checkpoints
#
Expand Down Expand Up @@ -1685,6 +1689,18 @@ def list_jobs_by_ids(self, ids: list[str], conn=None) -> Iterator["Job"]:
query = self._jobs_query().where(self._jobs.c.id.in_(ids))
yield from self._parse_jobs(self.db.execute(query, conn=conn))

def get_last_job_by_name(self, name: str, conn=None) -> "Job | None":
query = (
self._jobs_query()
.where(self._jobs.c.name == name)
.order_by(self._jobs.c.created_at.desc())
.limit(1)
)
results = list(self.db.execute(query, conn=conn))
if not results:
return None
return self._parse_job(results[0])

def create_job(
self,
name: str,
Expand Down
2 changes: 1 addition & 1 deletion src/datachain/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ def parse(
python_version,
error_message,
error_stack,
parent_job_id,
str(parent_job_id) if parent_job_id else None,
)
27 changes: 10 additions & 17 deletions src/datachain/lib/dc/datachain.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from datachain.dataset import DatasetRecord
from datachain.delta import delta_disabled
from datachain.error import (
JobNotFoundError,
ProjectCreateNotAllowedError,
ProjectNotFoundError,
)
Expand Down Expand Up @@ -627,6 +626,9 @@ def save( # type: ignore[override]
self._validate_version(version)
self._validate_update_version(update_version)

# get existing job if running in SaaS, or creating new one if running locally
job = self.session.get_or_create_job()

namespace_name, project_name, name = catalog.get_full_dataset_name(
name,
namespace_name=self._settings.namespace,
Expand All @@ -635,7 +637,7 @@ def save( # type: ignore[override]
project = self._get_or_create_project(namespace_name, project_name)

# Checkpoint handling
job, _hash, result = self._resolve_checkpoint(name, project, kwargs)
_hash, result = self._resolve_checkpoint(name, project, job, kwargs)

# Schema preparation
schema = self.signals_schema.clone_without_sys_signals().serialize()
Expand All @@ -655,13 +657,12 @@ def save( # type: ignore[override]
attrs=attrs,
feature_schema=schema,
update_version=update_version,
job_id=job.id,
**kwargs,
)
)

if job:
catalog.metastore.create_checkpoint(job.id, _hash) # type: ignore[arg-type]

catalog.metastore.create_checkpoint(job.id, _hash) # type: ignore[arg-type]
return result

def _validate_version(self, version: str | None) -> None:
Expand Down Expand Up @@ -690,23 +691,15 @@ def _resolve_checkpoint(
self,
name: str,
project: Project,
job: Job,
kwargs: dict,
) -> tuple[Job | None, str | None, "DataChain | None"]:
) -> tuple[str, "DataChain | None"]:
"""Check if checkpoint exists and return cached dataset if possible."""
from .datasets import read_dataset

metastore = self.session.catalog.metastore

job_id = os.getenv("DATACHAIN_JOB_ID")
checkpoints_reset = env2bool("DATACHAIN_CHECKPOINTS_RESET", undefined=True)

if not job_id:
return None, None, None

job = metastore.get_job(job_id)
if not job:
raise JobNotFoundError(f"Job with id {job_id} not found")

_hash = self._calculate_job_hash(job.id)

if (
Expand All @@ -718,9 +711,9 @@ def _resolve_checkpoint(
chain = read_dataset(
name, namespace=project.namespace.name, project=project.name, **kwargs
)
return job, _hash, chain
return _hash, chain

return job, _hash, None
return _hash, None

def _handle_delta(
self,
Expand Down
2 changes: 0 additions & 2 deletions src/datachain/lib/dc/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ def read_records(
),
)

session.add_dataset_version(dsr, dsr.latest_version)

if isinstance(to_insert, dict):
to_insert = [to_insert]
elif not to_insert:
Expand Down
4 changes: 0 additions & 4 deletions src/datachain/query/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1927,10 +1927,6 @@ def save(
)
version = version or dataset.latest_version

self.session.add_dataset_version(
dataset=dataset, version=version, listing=kwargs.get("listing", False)
)

dr = self.catalog.warehouse.dataset_rows(dataset)

self.catalog.warehouse.copy_table(dr.get_table(), query.select())
Expand Down
Loading