Skip to content
Open
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
de9e0de
added logic to skip applying chain if checkpoint exist
ilongin Sep 24, 2025
4f5f304
removed not needed code
ilongin Sep 24, 2025
a0ad1b8
adding prints
ilongin Sep 25, 2025
2aae374
removed prints
ilongin Sep 25, 2025
b1df328
added job manager
ilongin Sep 26, 2025
b9c2020
added tests for util function to get user code
ilongin Sep 26, 2025
d0a4129
using job manager and adding unit tests for it
ilongin Sep 29, 2025
f66dc53
refactor
ilongin Sep 29, 2025
0921aea
made reset checkpoints as default for now
ilongin Sep 30, 2025
ea34286
added job manager reset and refactoring test_checkpoints to use new j…
ilongin Oct 1, 2025
f19a055
refactoring, fixing tests
ilongin Oct 1, 2025
33a2bcd
adding job e2e tests
ilongin Oct 1, 2025
9988d8c
refactoring tests
ilongin Oct 1, 2025
fecdcae
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 1, 2025
4ea1169
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 2, 2025
b9d748a
fix
ilongin Oct 2, 2025
2f806ca
fixig tests
ilongin Oct 2, 2025
01d0711
fixing job manager tests for keyboard interruption
ilongin Oct 2, 2025
105b03b
fixing windows test
ilongin Oct 2, 2025
0efc106
added more elements to hash
ilongin Oct 2, 2025
5db18e5
fixing test
ilongin Oct 2, 2025
cbdcac2
removed JobManager and moved its logic to Session
ilongin Oct 3, 2025
dd2487d
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 3, 2025
b26b150
mergint with main
ilongin Oct 3, 2025
09af752
removed saving query string in new job locally
ilongin Oct 3, 2025
f38fc70
merged with main
ilongin Oct 3, 2025
99afaeb
moved reset_job_state to test from Session
ilongin Oct 3, 2025
b896d3c
moved get_last_job_by_name to sqlite metastore
ilongin Oct 3, 2025
7d27879
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 5, 2025
18aa7c9
moved job to Session class attributes to ensure same job per process,…
ilongin Oct 5, 2025
428e10d
making job name random in interactive runs
ilongin Oct 5, 2025
84eebc7
refactoring except_hook
ilongin Oct 5, 2025
0e09ce8
moved tests from test_datachain to test_job_management
ilongin Oct 6, 2025
a7ff56b
fixing issue with updating job state because of hook after db is cleaned
ilongin Oct 6, 2025
8c006c8
fixing typing
ilongin Oct 6, 2025
c4f1b34
merging with main
ilongin Oct 7, 2025
74a1106
fixing windows tests
ilongin Oct 7, 2025
b009a4a
more robust check if is script run
ilongin Oct 8, 2025
d3acdb0
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 8, 2025
2e924f2
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 9, 2025
fb9306b
using util function to clean hooks
ilongin Oct 9, 2025
57d4845
removed session arg from reset_session_job_state
ilongin Oct 9, 2025
813cbd5
added checkpoint test with parallel and fixed deregistering job hooks
ilongin Oct 9, 2025
6ecf6b0
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 13, 2025
026996e
more granular exception check
ilongin Oct 13, 2025
b368d93
using better fixture
ilongin Oct 13, 2025
cedbabd
moved test_checkpoints_parallel to func tests
ilongin Oct 13, 2025
faee8ac
increasing number of rows in test
ilongin Oct 13, 2025
ee6d880
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 14, 2025
354185b
removing logic of removing datasets on job failure
ilongin Oct 14, 2025
07d881e
moved function to abstract
ilongin Oct 15, 2025
ecf102f
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 15, 2025
44e738c
fix test
ilongin Oct 15, 2025
76aba6c
adding docs and removing not needed abstract method
ilongin Oct 15, 2025
1fabb7a
adding checkpoint docs link
ilongin Oct 15, 2025
737a05a
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 16, 2025
ef80ca1
fixing docs
ilongin Oct 16, 2025
38e2bfc
adding missing abstract method
ilongin Oct 16, 2025
1167983
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 16, 2025
4237b88
fixing parsing parent job id
ilongin Oct 17, 2025
f374500
skipping hf tests
ilongin Oct 17, 2025
fdd533d
returning tests
ilongin Oct 17, 2025
c6f7266
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 17, 2025
24c6b36
Merge branch 'main' into ilongin/1361-local-checkpoints-usage
ilongin Oct 18, 2025
77eafbc
fix test
ilongin Oct 18, 2025
380f36f
merged with main
ilongin Oct 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/datachain/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,7 @@ def create_dataset(
description: str | None = None,
attrs: list[str] | None = None,
update_version: str | None = "patch",
job_id: str | None = None,
) -> "DatasetRecord":
"""
Creates new dataset of a specific version.
Expand Down Expand Up @@ -865,6 +866,7 @@ def create_dataset(
create_rows_table=create_rows,
columns=columns,
uuid=uuid,
job_id=job_id,
)

def create_new_dataset_version(
Expand Down
6 changes: 6 additions & 0 deletions src/datachain/data_storage/metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,12 @@ def create_job(
Returns the job id.
"""

@abstractmethod
def get_last_job_by_name(self, name: str, conn=None) -> Job | None:
"""
Returns the most recently created Job with the given name if exists.
"""

@abstractmethod
def get_job(self, job_id: str) -> Job | None:
"""Returns the job with the given ID."""
Expand Down
13 changes: 13 additions & 0 deletions src/datachain/data_storage/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from sqlalchemy.sql.elements import ColumnElement
from sqlalchemy.types import TypeEngine

from datachain.job import Job
from datachain.lib.file import File


Expand Down Expand Up @@ -552,6 +553,18 @@ def _jobs_insert(self) -> "Insert":
def _checkpoints_insert(self) -> "Insert":
return sqlite.insert(self._checkpoints)

def get_last_job_by_name(self, name: str, conn=None) -> "Job | None":
query = (
self._jobs_query()
.where(self._jobs.c.name == name)
.order_by(self._jobs.c.created_at.desc())
.limit(1)
)
results = list(self.db.execute(query, conn=conn))
if not results:
return None
return self._parse_job(results[0])

#
# Namespaces
#
Expand Down
27 changes: 10 additions & 17 deletions src/datachain/lib/dc/datachain.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from datachain.dataset import DatasetRecord
from datachain.delta import delta_disabled
from datachain.error import (
JobNotFoundError,
ProjectCreateNotAllowedError,
ProjectNotFoundError,
)
Expand Down Expand Up @@ -627,6 +626,9 @@ def save( # type: ignore[override]
self._validate_version(version)
self._validate_update_version(update_version)

# get existing job if running in SaaS, or creating new one if running locally
job = self.session.get_or_create_job()

namespace_name, project_name, name = catalog.get_full_dataset_name(
name,
namespace_name=self._settings.namespace,
Expand All @@ -635,7 +637,7 @@ def save( # type: ignore[override]
project = self._get_or_create_project(namespace_name, project_name)

# Checkpoint handling
job, _hash, result = self._resolve_checkpoint(name, project, kwargs)
_hash, result = self._resolve_checkpoint(name, project, job, kwargs)

# Schema preparation
schema = self.signals_schema.clone_without_sys_signals().serialize()
Expand All @@ -655,13 +657,12 @@ def save( # type: ignore[override]
attrs=attrs,
feature_schema=schema,
update_version=update_version,
job_id=job.id,
**kwargs,
)
)

if job:
catalog.metastore.create_checkpoint(job.id, _hash) # type: ignore[arg-type]

catalog.metastore.create_checkpoint(job.id, _hash) # type: ignore[arg-type]
return result

def _validate_version(self, version: str | None) -> None:
Expand Down Expand Up @@ -690,23 +691,15 @@ def _resolve_checkpoint(
self,
name: str,
project: Project,
job: Job,
kwargs: dict,
) -> tuple[Job | None, str | None, "DataChain | None"]:
) -> tuple[str, "DataChain | None"]:
"""Check if checkpoint exists and return cached dataset if possible."""
from .datasets import read_dataset

metastore = self.session.catalog.metastore

job_id = os.getenv("DATACHAIN_JOB_ID")
checkpoints_reset = env2bool("DATACHAIN_CHECKPOINTS_RESET", undefined=True)

if not job_id:
return None, None, None

job = metastore.get_job(job_id)
if not job:
raise JobNotFoundError(f"Job with id {job_id} not found")

_hash = self._calculate_job_hash(job.id)

if (
Expand All @@ -718,9 +711,9 @@ def _resolve_checkpoint(
chain = read_dataset(
name, namespace=project.namespace.name, project=project.name, **kwargs
)
return job, _hash, chain
return _hash, chain

return job, _hash, None
return _hash, None

def _handle_delta(
self,
Expand Down
164 changes: 162 additions & 2 deletions src/datachain/query/session.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,38 @@
import atexit
import gc
import logging
import os
import re
import sys
import traceback
from collections.abc import Callable
from typing import TYPE_CHECKING, ClassVar
from uuid import uuid4

from datachain.catalog import get_catalog
from datachain.error import TableMissingError
from datachain.data_storage import JobQueryType, JobStatus
from datachain.error import JobNotFoundError, TableMissingError

if TYPE_CHECKING:
from datachain.catalog import Catalog
from datachain.dataset import DatasetRecord
from datachain.job import Job

logger = logging.getLogger("datachain")


def is_script_run() -> bool:
"""
Returns True if this was ran as python script, e.g python my_script.py.
Otherwise (if interactive or module run) returns False.
"""
try:
argv0 = sys.argv[0]
except (IndexError, AttributeError):
return False
return bool(argv0) and argv0 not in ("-c", "-m", "ipython")


class Session:
"""
Session is a context that keeps track of temporary DataChain datasets for a proper
Expand Down Expand Up @@ -43,6 +60,13 @@ class Session:
SESSION_CONTEXTS: ClassVar[list["Session"]] = []
ORIGINAL_EXCEPT_HOOK = None

# Job management - class-level to ensure one job per process
_CURRENT_JOB: ClassVar["Job | None"] = None
_JOB_STATUS: ClassVar[JobStatus | None] = None
_OWNS_JOB: ClassVar[bool | None] = None
_JOB_HOOKS_REGISTERED: ClassVar[bool] = False
_JOB_FINALIZE_HOOK: ClassVar[Callable[[], None] | None] = None

DATASET_PREFIX = "session_"
GLOBAL_SESSION_NAME = "global"
SESSION_UUID_LEN = 6
Expand Down Expand Up @@ -94,6 +118,115 @@ def add_dataset_version(
) -> None:
self.dataset_versions.append((dataset, version, listing))

def get_or_create_job(self) -> "Job":
"""
Get or create a Job for this process.

Returns:
Job: The active Job instance.

Behavior:
- If a job already exists, it is returned.
- If ``DATACHAIN_JOB_ID`` is set, the corresponding job is fetched.
- Otherwise, a new job is created:
* Name = absolute path to the Python script.
* Query = empty string.
* Parent = last job with the same name, if available.
* Status = "running".
Exit hooks are registered to finalize the job.

Note:
Job is shared across all Session instances to ensure one job per process.
"""
if Session._CURRENT_JOB:
return Session._CURRENT_JOB

if env_job_id := os.getenv("DATACHAIN_JOB_ID"):
# SaaS run: just fetch existing job
Session._CURRENT_JOB = self.catalog.metastore.get_job(env_job_id)
if not Session._CURRENT_JOB:
raise JobNotFoundError(
f"Job {env_job_id} from DATACHAIN_JOB_ID env not found"
)
Session._OWNS_JOB = False
else:
# Local run: create new job
if is_script_run():
script = os.path.abspath(sys.argv[0])
else:
# Interactive session or module run - use unique name to avoid
# linking unrelated sessions
script = str(uuid4())
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"

# try to find the parent job
parent = self.catalog.metastore.get_last_job_by_name(script)

job_id = self.catalog.metastore.create_job(
name=script,
query="",
query_type=JobQueryType.PYTHON,
status=JobStatus.RUNNING,
python_version=python_version,
parent_job_id=parent.id if parent else None,
)
Session._CURRENT_JOB = self.catalog.metastore.get_job(job_id)
Session._OWNS_JOB = True
Session._JOB_STATUS = JobStatus.RUNNING

# register cleanup hooks only once
if not Session._JOB_HOOKS_REGISTERED:

def _finalize_success_hook() -> None:
self._finalize_job_success()

Session._JOB_FINALIZE_HOOK = _finalize_success_hook
atexit.register(Session._JOB_FINALIZE_HOOK)
Session._JOB_HOOKS_REGISTERED = True

assert Session._CURRENT_JOB is not None
return Session._CURRENT_JOB

def _finalize_job_success(self):
"""Mark the current job as completed."""
if (
Session._CURRENT_JOB
and Session._OWNS_JOB
and Session._JOB_STATUS == JobStatus.RUNNING
):
self.catalog.metastore.set_job_status(
Session._CURRENT_JOB.id, JobStatus.COMPLETE
)
Session._JOB_STATUS = JobStatus.COMPLETE

def _finalize_job_as_canceled(self):
"""Mark the current job as canceled."""
if (
Session._CURRENT_JOB
and Session._OWNS_JOB
and Session._JOB_STATUS == JobStatus.RUNNING
):
self.catalog.metastore.set_job_status(
Session._CURRENT_JOB.id, JobStatus.CANCELED
)
Session._JOB_STATUS = JobStatus.CANCELED

def _finalize_job_as_failed(self, exc_type, exc_value, tb):
"""Mark the current job as failed with error details."""
if (
Session._CURRENT_JOB
and Session._OWNS_JOB
and Session._JOB_STATUS == JobStatus.RUNNING
):
error_stack = "".join(traceback.format_exception(exc_type, exc_value, tb))
self.catalog.metastore.set_job_status(
Session._CURRENT_JOB.id,
JobStatus.FAILED,
error_message=str(exc_value),
error_stack=error_stack,
)
Session._JOB_STATUS = JobStatus.FAILED

def generate_temp_dataset_name(self) -> str:
return self.get_temp_prefix() + uuid4().hex[: self.TEMP_TABLE_UUID_LEN]

Expand Down Expand Up @@ -173,19 +306,46 @@ def get(

@staticmethod
def except_hook(exc_type, exc_value, exc_traceback):
Session.GLOBAL_SESSION_CTX.__exit__(exc_type, exc_value, exc_traceback)
if Session.GLOBAL_SESSION_CTX:
# Handle KeyboardInterrupt specially - mark as canceled and exit with
# signal code
if exc_type is KeyboardInterrupt:
Session.GLOBAL_SESSION_CTX._finalize_job_as_canceled()
else:
Session.GLOBAL_SESSION_CTX._finalize_job_as_failed(
exc_type, exc_value, exc_traceback
)
Session.GLOBAL_SESSION_CTX.__exit__(exc_type, exc_value, exc_traceback)

Session._global_cleanup()

# Always delegate to original hook if it exists
if Session.ORIGINAL_EXCEPT_HOOK:
Session.ORIGINAL_EXCEPT_HOOK(exc_type, exc_value, exc_traceback)

if exc_type is KeyboardInterrupt:
# Exit with SIGINT signal code (128 + 2 = 130, or -2 in subprocess terms)
sys.exit(130)

@classmethod
def cleanup_for_tests(cls):
if cls.GLOBAL_SESSION_CTX is not None:
cls.GLOBAL_SESSION_CTX.__exit__(None, None, None)
cls.GLOBAL_SESSION_CTX = None
atexit.unregister(cls._global_cleanup)

# Reset job-related class variables
if cls._JOB_FINALIZE_HOOK:
try:
atexit.unregister(cls._JOB_FINALIZE_HOOK)
except ValueError:
pass # Hook was not registered
cls._CURRENT_JOB = None
cls._JOB_STATUS = None
cls._OWNS_JOB = None
cls._JOB_HOOKS_REGISTERED = False
cls._JOB_FINALIZE_HOOK = None

if cls.ORIGINAL_EXCEPT_HOOK:
sys.excepthook = cls.ORIGINAL_EXCEPT_HOOK

Expand Down
Loading
Loading