From a01fdbff2fa479479a6402de3e48a1acd0a448cb Mon Sep 17 00:00:00 2001 From: Nicholas Junge Date: Fri, 28 Feb 2025 15:29:37 +0100 Subject: [PATCH 1/3] refactor: Take all rpaths as resource names only in transactions Proof of concept for a single repo-and-branch-scoped transaction. This ties a transaction to a single repo and branch by taking all file and directory names by resource only instead of a full URI. Naturally, this has the subtle side effect that given full URIs are silently understood as nested paths, and uploaded to the transaction branch without loud errors or warnings. A section was added to the transaction docs that details this behavior, but it might be safer to check the input path against existing repos and branches. --- docs/guides/transactions.md | 13 ++++++++++--- src/lakefs_spec/spec.py | 3 +++ src/lakefs_spec/transaction.py | 10 ++++++++++ tests/test_put_file.py | 4 ++-- tests/test_transactions.py | 15 +++++++-------- 5 files changed, 32 insertions(+), 13 deletions(-) diff --git a/docs/guides/transactions.md b/docs/guides/transactions.md index 4676e1e7..2cc67e1b 100644 --- a/docs/guides/transactions.md +++ b/docs/guides/transactions.md @@ -20,9 +20,9 @@ from lakefs_spec import LakeFSFileSystem fs = LakeFSFileSystem() with fs.transaction("repo", "main") as tx: - fs.put_file("train-data.txt", f"repo/{tx.branch.id}/train-data.txt") + fs.put_file("train-data.txt", "train-data.txt") tx.commit(message="Add training data") - fs.put_file("test-data.txt", f"repo/{tx.branch.id}/test-data.txt") + fs.put_file("test-data.txt", "test-data.txt") sha = tx.commit(message="Add test data") tx.tag(sha, name="My train-test split") ``` @@ -35,6 +35,13 @@ The full list of supported lakeFS versioning operations (by default, these opera * [`rev_parse`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.rev_parse), for parsing revisions like branch/tag names and SHA fragments into full commit SHAs. * [`tag`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.tag), for creating a tag pointing to a commit. +## Limitations of transactions + +Transactions are scoped to a single repository and branch only, equal to those given to the `fs.transaction()` context manager. +When uploading files in a transaction via `fs.put()` or `fs.put_file()`, you **must** give all remote paths as file names. +If you use a fully qualified URI, leading repository and branch names will be interpreted as subdirectories, which will be created on upload. +No warnings or errors will be thrown, so be sure to double-check your paths in all transaction scopes. + ## Lifecycle of ephemeral transaction branches You can control the lifecycle for a transaction branch with the `delete` argument: @@ -56,7 +63,7 @@ from lakefs_spec import LakeFSFileSystem fs = LakeFSFileSystem() with fs.transaction("repo", "main", delete="onsuccess") as tx: - fs.put_file("my-file.txt", f"repo/{tx.branch.id}/my-file.txt") + fs.put_file("my-file.txt", "my-file.txt") tx.commit(message="Add my-file.txt") raise ValueError("oops!") ``` diff --git a/src/lakefs_spec/spec.py b/src/lakefs_spec/spec.py index 18666c22..2b13c271 100644 --- a/src/lakefs_spec/spec.py +++ b/src/lakefs_spec/spec.py @@ -684,6 +684,9 @@ def put_file( lpath = stringify_path(lpath) rpath = stringify_path(rpath) + if self._intrans: + rpath = self.transaction.make_uri(rpath) + if precheck and Path(lpath).is_file(): remote_checksum = self.checksum(rpath) local_checksum = md5_checksum(lpath, blocksize=self.blocksize) diff --git a/src/lakefs_spec/transaction.py b/src/lakefs_spec/transaction.py index 52fa85dd..78583f3e 100644 --- a/src/lakefs_spec/transaction.py +++ b/src/lakefs_spec/transaction.py @@ -3,6 +3,7 @@ """ import logging +import os import random import string import warnings @@ -145,6 +146,15 @@ def __exit__(self, exc_type, exc_val, exc_tb): if self.delete == "always" or (success and self.delete == "onsuccess"): self._ephemeral_branch.delete() + def make_uri(self, path: str | os.PathLike[str]) -> str: + spath = str(path) + # NB: this fails silently if the input path is already fully qualified. + # However, in general, it's impossible to distinguish between a + # fully qualified path and a normal nested path, so at most, we + # could split off the first segment of the input and check it against existing + # repositories. + return "/".join([self.repository, self.branch.id, spath]) + @property def branch(self): return self._ephemeral_branch diff --git a/tests/test_put_file.py b/tests/test_put_file.py index 9c85ddf3..d16f5400 100644 --- a/tests/test_put_file.py +++ b/tests/test_put_file.py @@ -21,7 +21,7 @@ def test_no_change_postcommit( message = f"Add file {random_file.name}" with fs.transaction(repository, temp_branch) as tx: - fs.put(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + fs.put(lpath, random_file.name) tx.commit(message=message) commits = list(temp_branch.log(max_amount=2)) @@ -31,7 +31,7 @@ def test_no_change_postcommit( # put the same file again, this time the diff is empty with fs.transaction(repository, temp_branch) as tx: - fs.put(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}", precheck=False) + fs.put(lpath, random_file.name, precheck=False) tx.commit(message=f"Add file {random_file.name}") # check that no other commit has happened. diff --git a/tests/test_transactions.py b/tests/test_transactions.py index 032c61a2..61fb1ebb 100644 --- a/tests/test_transactions.py +++ b/tests/test_transactions.py @@ -17,12 +17,11 @@ def test_transaction_commit( random_file = random_file_factory.make() lpath = str(random_file) - rpath = f"{repository.id}/{temp_branch.id}/{random_file.name}" message = f"Add file {random_file.name}" with fs.transaction(repository, temp_branch) as tx: - fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + fs.put_file(lpath, random_file.name) assert len(tx.files) == 1 # sha is a placeholder for the actual SHA created on transaction completion. sha = tx.commit(message=message) @@ -65,7 +64,7 @@ def test_transaction_merge( tbname = tx.branch.id lpath = str(random_file) # stage a file on the transaction branch... - fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + fs.put_file(lpath, random_file.name) # ... commit it with the above message tx.commit(message=message) # ... and merge it into temp_branch. @@ -90,7 +89,7 @@ def test_transaction_revert( message = f"Add file {random_file.name}" with fs.transaction(repository, temp_branch, automerge=True) as tx: - fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + fs.put_file(lpath, random_file.name) tx.commit(message=message) revert_commit = tx.revert(temp_branch, temp_branch.head) @@ -113,7 +112,7 @@ def test_transaction_failure( try: with fs.transaction(repository, temp_branch) as tx: - fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + fs.put_file(lpath, random_file.name) tx.commit(message=message) raise RuntimeError("something went wrong") except RuntimeError: @@ -159,8 +158,8 @@ def test_warn_uncommitted_changes( lpath = str(random_file) with pytest.warns(match="uncommitted changes.*lost"): - with fs.transaction(repository, temp_branch) as tx: - fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + with fs.transaction(repository, temp_branch): + fs.put_file(lpath, random_file.name) def test_warn_uncommitted_changes_on_persisted_branch( @@ -175,4 +174,4 @@ def test_warn_uncommitted_changes_on_persisted_branch( with pytest.warns(match="uncommitted changes(?:(?!lost).)*$"): with fs.transaction(repository, temp_branch, delete="never") as tx: - fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + fs.put_file(lpath, random_file.name) From 6e18a1492dccd30b0ce010b54ebbaf4f6a255046 Mon Sep 17 00:00:00 2001 From: Nicholas Junge Date: Thu, 13 Mar 2025 17:21:02 +0100 Subject: [PATCH 2/3] refactor: Put `make_uri()` method onto filesystem This way it can return paths as they are when we are not in a transaction, or when we are in a transaction, but repo and ref have already been prepended. --- src/lakefs_spec/spec.py | 37 +++++++++++++++++++++++----------- src/lakefs_spec/transaction.py | 10 --------- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/src/lakefs_spec/spec.py b/src/lakefs_spec/spec.py index 2b13c271..8669d595 100644 --- a/src/lakefs_spec/spec.py +++ b/src/lakefs_spec/spec.py @@ -170,6 +170,20 @@ def wrapped_api_call( except ServerException as e: raise translate_lakefs_error(e, rpath=rpath, message=message, set_cause=set_cause) + def make_uri(self, path: str | os.PathLike[str]) -> str: + spath = stringify_path(path) + # NB: this fails silently if the input path is already fully qualified. + # However, in general, it's impossible to distinguish between a + # fully qualified path and a normal nested path, so at most, we + # could split off the first segment of the input and check it against existing + # repositories. + if self._intrans: + prefix = "/".join([self.transaction.repository, self.transaction.branch.id]) + if spath.startswith(prefix): + return spath + return prefix + "/" + spath + return spath + def checksum(self, path: str | os.PathLike[str]) -> str | None: """ Get a remote lakeFS file object's checksum. @@ -186,7 +200,7 @@ def checksum(self, path: str | os.PathLike[str]) -> str | None: str | None The remote file's checksum, or ``None`` if ``path`` points to a directory or does not exist. """ - path = stringify_path(path) + path = self.make_uri(path) try: return self.info(path).get("checksum") except FileNotFoundError: @@ -218,7 +232,7 @@ def exists(self, path: str | os.PathLike[str], **kwargs: Any) -> bool: PermissionError If the user does not have sufficient permissions to query object existence. """ - path = stringify_path(path) + path = self.make_uri(path) repository, ref, resource = parse(path) try: reference = lakefs.Reference(repository, ref, client=self.client) @@ -261,8 +275,8 @@ def cp_file( ValueError When attempting to copy objects between repositories. """ - path1 = stringify_path(path1) - path2 = stringify_path(path2) + path1 = self.make_uri(path1) + path2 = self.make_uri(path2) if path1 == path2: return @@ -306,7 +320,7 @@ def get_file( **kwargs: Any Additional keyword arguments passed to ``AbstractFileSystem.open()``. """ - rpath = stringify_path(rpath) + rpath = self.make_uri(rpath) lpath = stringify_path(lpath) if precheck and Path(lpath).is_file(): @@ -343,7 +357,7 @@ def info(self, path: str | os.PathLike[str], **kwargs: Any) -> dict[str, Any]: FileNotFoundError If the ``path`` refers to a non-file path that does not exist in the repository. """ - path = stringify_path(path) + path = self.make_uri(path) repository, ref, resource = parse(path) # first, try with `stat_object` in case of a file. # the condition below checks edge cases of resources that cannot be files. @@ -481,6 +495,7 @@ def ls( A list of all objects' metadata under the given remote path if ``detail=True``, or alternatively only their names if ``detail=False``. """ path = self._strip_protocol(path) + path = self.make_uri(path) repository, ref, prefix = parse(path) recursive = kwargs.pop("recursive", False) @@ -557,6 +572,7 @@ def ls( # To make recursive ls behave identical to the non-recursive case, # add back virtual `directory` entries, which are only returned by # the lakeFS API when querying non-recursively. + # TODO: Fix this up for intrans here = self._strip_protocol(path).rstrip("/") subdirs = {parent for o in info if (parent := self._parent(o["name"])) != here} for subdir in subdirs: @@ -623,7 +639,7 @@ def open( if mode not in {"r", "rb", "w", "wb", "x", "xb"}: raise NotImplementedError(f"unsupported mode {mode!r}") - path = stringify_path(path) + path = self.make_uri(path) repo, ref, resource = parse(path) if mode.startswith("r"): @@ -682,10 +698,7 @@ def put_file( Additional keyword arguments to pass to ``LakeFSFileSystem.open()``. """ lpath = stringify_path(lpath) - rpath = stringify_path(rpath) - - if self._intrans: - rpath = self.transaction.make_uri(rpath) + rpath = self.make_uri(rpath) if precheck and Path(lpath).is_file(): remote_checksum = self.checksum(rpath) @@ -733,7 +746,7 @@ def rm( possible. """ - path = stringify_path(path) + path = self.make_uri(path) repository, ref, prefix = parse(path) with self.wrapped_api_call(rpath=path): diff --git a/src/lakefs_spec/transaction.py b/src/lakefs_spec/transaction.py index 78583f3e..52fa85dd 100644 --- a/src/lakefs_spec/transaction.py +++ b/src/lakefs_spec/transaction.py @@ -3,7 +3,6 @@ """ import logging -import os import random import string import warnings @@ -146,15 +145,6 @@ def __exit__(self, exc_type, exc_val, exc_tb): if self.delete == "always" or (success and self.delete == "onsuccess"): self._ephemeral_branch.delete() - def make_uri(self, path: str | os.PathLike[str]) -> str: - spath = str(path) - # NB: this fails silently if the input path is already fully qualified. - # However, in general, it's impossible to distinguish between a - # fully qualified path and a normal nested path, so at most, we - # could split off the first segment of the input and check it against existing - # repositories. - return "/".join([self.repository, self.branch.id, spath]) - @property def branch(self): return self._ephemeral_branch From c988579cc0bf8bd7b02ca341a83076090a22c9f9 Mon Sep 17 00:00:00 2001 From: Nicholas Junge Date: Thu, 13 Mar 2025 17:22:14 +0100 Subject: [PATCH 3/3] Tests: Fix remaining transaction usages Some leftover fully qualified paths in transactions, which came to light when other filesystem APIs were migrated to the new URI maker scheme. --- tests/test_put_file.py | 2 +- tests/test_rm.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_put_file.py b/tests/test_put_file.py index d16f5400..1f6249ec 100644 --- a/tests/test_put_file.py +++ b/tests/test_put_file.py @@ -32,7 +32,7 @@ def test_no_change_postcommit( # put the same file again, this time the diff is empty with fs.transaction(repository, temp_branch) as tx: fs.put(lpath, random_file.name, precheck=False) - tx.commit(message=f"Add file {random_file.name}") + tx.commit(message=message) # check that no other commit has happened. assert temp_branch.head.get_commit() == current_head diff --git a/tests/test_rm.py b/tests/test_rm.py index 8026ff91..8e98c017 100644 --- a/tests/test_rm.py +++ b/tests/test_rm.py @@ -28,7 +28,7 @@ def test_rm_with_transaction( message = "Remove file README.md" with fs.transaction(repository, temp_branch, automerge=True) as tx: - fs.rm(f"{repository.id}/{tx.branch.id}/README.md") + fs.rm("README.md") tx.commit(message=message) commits = list(temp_branch.log(max_amount=2))