diff --git a/.gitignore b/.gitignore index f22dc5ba739..4e42b15bc27 100644 --- a/.gitignore +++ b/.gitignore @@ -67,3 +67,41 @@ test/e2e/v1/*.log test/e2e/*.log # Exclude build artifacts bin/ + +# ---------------- Python SDK & tooling ---------------- +# Virtual environments +.venv/ +venv/ +env/ + +# Bytecode caches +__pycache__/ +*.py[cod] + +# Build artifacts / metadata +python/dfcache_sdk/.pytest_cache/ +.pytest_cache/ +*.egg-info/ +*.egg +dist/ +build/ + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Tool caches +.mypy_cache/ +.ruff_cache/ +.coverage +coverage.xml +htmlcov/ + +# Temporary files +*.swp +*.bak +*.tmp + +# Jupyter (if added later) +.ipynb_checkpoints/ + diff --git a/python/dfcache_sdk/README.md b/python/dfcache_sdk/README.md new file mode 100644 index 00000000000..92f8c033753 --- /dev/null +++ b/python/dfcache_sdk/README.md @@ -0,0 +1,60 @@ +# Dragonfly Dfcache Python SDK + +This is an initial, lightweight Python interface to Dragonfly's `dfcache` operations aimed at AI and data workflows. + +## Features +- Import a local file into Dragonfly P2P cache +- Export a cached file by content ID (CID) to a local path +- Stat (existence check) for a CID (optionally local-only) +- Delete a cached CID +- Health check convenience + +## Design +MVP implementation shells out to the existing `dfcache` CLI binary rather than using gRPC. This avoids proto generation overhead. A future iteration can switch to direct gRPC calls and piece streaming for advanced scenarios. + +## Install +```bash +pip install -e python/dfcache_sdk +``` +Ensure the Dragonfly `dfcache` binary is built: +```bash +make build-dfcache +``` + +## Usage +```python +from dragonfly_dfcache import DfCacheClient, NotFoundError + +client = DfCacheClient() + +cid = "sha256:abcdef1234567890" # Example digest +source_file = "/data/model.bin" +export_path = "/tmp/model.bin" + +# Import +client.import_cache(cid=cid, path=source_file) + +# Stat +exists = client.stat(cid) +print("Exists?", exists) + +# Export +client.export(cid=cid, output=export_path) + +# Delete +client.delete(cid) +``` + +## Health +```python +client.check_health() # True if dfcache CLI and dfdaemon responsive +``` + +## Roadmap +- Replace CLI subprocess calls with gRPC client (requires Python stubs) +- Add streaming progress callbacks for export/import +- Optional async API using asyncio subprocess +- Support rate limiting and advanced flags + +## License +Apache 2.0 diff --git a/python/dfcache_sdk/pyproject.toml b/python/dfcache_sdk/pyproject.toml new file mode 100644 index 00000000000..809807508ca --- /dev/null +++ b/python/dfcache_sdk/pyproject.toml @@ -0,0 +1,38 @@ +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "dragonfly-dfcache-sdk" +version = "0.1.0" +description = "Unofficial Python SDK for Dragonfly dfcache/dfdaemon operations (stat, import, export, delete)." +readme = "README.md" +authors = [{name = "Dragonfly Authors"}] +license = {text = "Apache-2.0"} +keywords = ["dragonfly", "p2p", "cache", "dfcache", "dfdaemon"] +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent" +] +requires-python = ">=3.9" +dependencies = [ + "typing-extensions>=4.0.0" +] + +[project.optional-dependencies] +dev = ["pytest", "mypy", "ruff"] + +[tool.pytest.ini_options] +testpaths = ["python/dfcache_sdk/tests"] +pythonpath = ["python/dfcache_sdk/src"] + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.mypy] +python_version = "3.11" +strict = true + +[tool.ruff] +line-length = 100 diff --git a/python/dfcache_sdk/src/dragonfly_dfcache/__init__.py b/python/dfcache_sdk/src/dragonfly_dfcache/__init__.py new file mode 100644 index 00000000000..2f7e09cb76e --- /dev/null +++ b/python/dfcache_sdk/src/dragonfly_dfcache/__init__.py @@ -0,0 +1,7 @@ +from .client import DfCacheClient, DfCacheError, NotFoundError + +__all__ = [ + "DfCacheClient", + "DfCacheError", + "NotFoundError", +] diff --git a/python/dfcache_sdk/src/dragonfly_dfcache/client.py b/python/dfcache_sdk/src/dragonfly_dfcache/client.py new file mode 100644 index 00000000000..9df4ebc63a2 --- /dev/null +++ b/python/dfcache_sdk/src/dragonfly_dfcache/client.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import os +import subprocess +import json +from typing import Optional + +from .errors import DfCacheError, NotFoundError, DaemonUnavailableError + +_D7Y_SCHEME = "d7y:/" # prefix used to build internal URL from cid + + +def _cid_to_url(cid: str) -> str: + from urllib.parse import quote + return f"{_D7Y_SCHEME}{quote(cid, safe='') }" + + +def _run_cmd(args: list[str], timeout: float) -> subprocess.CompletedProcess: + return subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=timeout) + + +class DfCacheClient: + """Thin wrapper invoking existing dfcache CLI for stat/import/export/delete. + + This avoids needing Python gRPC stubs initially; later we can switch to gRPC. + """ + + def __init__(self, binary: Optional[str] = None, timeout: float = 10.0) -> None: + self._binary = binary or self._detect_binary() + self._timeout = timeout + + def _detect_binary(self) -> str: + # Allow explicit override via env + env_bin = os.getenv("DRAGONFLY_DFCACHE_BINARY") + if env_bin and os.path.isfile(env_bin): + return env_bin + + # Search common build output directories (linux/darwin, amd64/arm64) + bin_dir = os.path.join(os.getcwd(), "bin") + candidates: list[str] = [] + if os.path.isdir(bin_dir): + for root, dirs, files in os.walk(bin_dir): + if "dfcache" in files: + candidates.append(os.path.join(root, "dfcache")) + # Fallback to PATH lookup + candidates.append("dfcache") + for c in candidates: + if os.path.isfile(c) and os.access(c, os.X_OK): + return c + raise DaemonUnavailableError( + "dfcache binary not found. Build it via 'make build-dfcache' or set DRAGONFLY_DFCACHE_BINARY." + ) + + def stat(self, cid: str, tag: str = "", local_only: bool = False, timeout: Optional[float] = None) -> bool: + args = [self._binary, "stat", "-i", cid] + if tag: + args += ["-t", tag] + if local_only: + args += ["-l"] + cp = _run_cmd(args, timeout or self._timeout) + if cp.returncode == 0: + return True + if "not exist" in cp.stderr.lower() or "not exist" in cp.stdout.lower(): + return False + if cp.returncode != 0: + raise DfCacheError(f"stat failed: {cp.stderr.strip() or cp.stdout.strip()}") + return False + + def import_cache(self, cid: str, path: str, tag: str = "", timeout: Optional[float] = None) -> None: + if not os.path.isfile(path): + raise FileNotFoundError(path) + args = [self._binary, "import", "-i", cid, "-I", path] + if tag: + args += ["-t", tag] + cp = _run_cmd(args, timeout or self._timeout) + if cp.returncode != 0: + raise DfCacheError(f"import failed: {cp.stderr.strip() or cp.stdout.strip()}") + + def export(self, cid: str, output: str, tag: str = "", local_only: bool = False, timeout: Optional[float] = None) -> None: + parent = os.path.dirname(os.path.abspath(output)) + os.makedirs(parent, exist_ok=True) + args = [self._binary, "export", "-i", cid, "-O", output] + if tag: + args += ["-t", tag] + if local_only: + args += ["-l"] + cp = _run_cmd(args, timeout or self._timeout) + if cp.returncode != 0: + if "not exist" in cp.stderr.lower() or "not exist" in cp.stdout.lower(): + raise NotFoundError(f"cache {cid} not found") + raise DfCacheError(f"export failed: {cp.stderr.strip() or cp.stdout.strip()}") + + def delete(self, cid: str, tag: str = "", timeout: Optional[float] = None) -> None: + args = [self._binary, "delete", "-i", cid] + if tag: + args += ["-t", tag] + cp = _run_cmd(args, timeout or self._timeout) + if cp.returncode != 0 and "not exist" not in cp.stderr.lower(): + raise DfCacheError(f"delete failed: {cp.stderr.strip() or cp.stdout.strip()}") + + def check_health(self) -> bool: + try: + self.stat("health-check-cid") + return True + except DfCacheError: + return False + + def info_json(self) -> str: + data = { + "binary": self._binary, + "timeout": self._timeout, + } + return json.dumps(data) + diff --git a/python/dfcache_sdk/src/dragonfly_dfcache/errors.py b/python/dfcache_sdk/src/dragonfly_dfcache/errors.py new file mode 100644 index 00000000000..c8a53540add --- /dev/null +++ b/python/dfcache_sdk/src/dragonfly_dfcache/errors.py @@ -0,0 +1,10 @@ +class DfCacheError(Exception): + """Base exception for dfcache SDK.""" + + +class NotFoundError(DfCacheError): + """Raised when a cache entry does not exist (maps to os.ErrNotExist).""" + + +class DaemonUnavailableError(DfCacheError): + """Raised when dfdaemon is not reachable or unhealthy.""" diff --git a/python/dfcache_sdk/src/dragonfly_dfcache/proto/dfdaemon_cache.proto b/python/dfcache_sdk/src/dragonfly_dfcache/proto/dfdaemon_cache.proto new file mode 100644 index 00000000000..4e6b6568b5c --- /dev/null +++ b/python/dfcache_sdk/src/dragonfly_dfcache/proto/dfdaemon_cache.proto @@ -0,0 +1,50 @@ +syntax = "proto3"; +package dfdaemon.v1; + +// Minimal subset of Dragonfly dfdaemon v1 API required for dfcache operations. +// NOTE: This is a reduced, compatibility proto. For full capability use upstream d7y.io/api. + +message UrlMeta { + string tag = 1; +} + +message StatTaskRequest { + string url = 1; // d7y:/ + UrlMeta url_meta = 2; // tag differentiates tasks + bool local_only = 3; // only check local cache +} + +message ImportTaskRequest { + string type = 1; // Task type, "DfCache" + string url = 2; // d7y:/ + string path = 3; // source file path + UrlMeta url_meta = 4; // tag +} + +message ExportTaskRequest { + string url = 1; // d7y:/ + string output = 2; // destination file path + uint64 timeout = 3; // seconds + double limit = 4; // rate limit (bytes per second) - optional + UrlMeta url_meta = 5; // tag + int64 uid = 6; + int64 gid = 7; + bool local_only = 8; // only export from local cache +} + +message DeleteTaskRequest { + string url = 1; // d7y:/ + UrlMeta url_meta = 2; // tag +} + +message Empty {} + +// Error reporting simplified: success = empty response, not found -> gRPC status NOT_FOUND + +service Daemon { + rpc StatTask(StatTaskRequest) returns (Empty); + rpc ImportTask(ImportTaskRequest) returns (Empty); + rpc ExportTask(ExportTaskRequest) returns (Empty); + rpc DeleteTask(DeleteTaskRequest) returns (Empty); + // Health check: upstream exposes CheckHealth; we model it as StatTask on a reserved cid, or rely on channel connectivity. +} diff --git a/python/dfcache_sdk/tests/test_client_basic.py b/python/dfcache_sdk/tests/test_client_basic.py new file mode 100644 index 00000000000..862d6c54066 --- /dev/null +++ b/python/dfcache_sdk/tests/test_client_basic.py @@ -0,0 +1,79 @@ +import os +import tempfile +import shutil +import stat +import subprocess +import sys +from pathlib import Path + +import pytest + +from dragonfly_dfcache import DfCacheClient, NotFoundError + + +def make_fake_dfcache(tmp: Path) -> str: + fake = tmp / "dfcache" + fake.write_text( + """#!/usr/bin/env bash +cmd=$1; shift +case "$cmd" in + stat) + # simulate not-exist if CID contains 'missing' + if [[ "$@" == *missing* ]]; then + echo "not exist" >&2 + exit 1 + fi + exit 0 + ;; + import) + exit 0 + ;; + export) + # write output file path passed after -O + while [[ $# -gt 0 ]]; do + if [[ "$1" == "-O" ]]; then + shift; out=$1; break + fi + shift + done + mkdir -p "$(dirname "$out")" && echo ok > "$out" + exit 0 + ;; + delete) + exit 0 + ;; + *) + echo "unknown" >&2 + exit 2 + ;; + esac +""" + ) + st = os.stat(fake) + os.chmod(fake, st.st_mode | stat.S_IEXEC) + return str(fake) + + +def test_basic_flow(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + fake = make_fake_dfcache(tmp_path) + monkeypatch.setenv("DRAGONFLY_DFCACHE_BINARY", fake) + + c = DfCacheClient(timeout=2) + + # stat existing + assert c.stat("present-cid") is True + # stat missing + assert c.stat("missing-cid") is False + + # import requires file + src = tmp_path / "file.bin" + src.write_bytes(b"x") + c.import_cache("present-cid", str(src)) + + # export should create output + out = tmp_path / "out.bin" + c.export("present-cid", str(out)) + assert out.exists() + + # delete + c.delete("present-cid")