diff --git a/docs/guides/transactions.md b/docs/guides/transactions.md index 4676e1e7..2cc67e1b 100644 --- a/docs/guides/transactions.md +++ b/docs/guides/transactions.md @@ -20,9 +20,9 @@ from lakefs_spec import LakeFSFileSystem fs = LakeFSFileSystem() with fs.transaction("repo", "main") as tx: - fs.put_file("train-data.txt", f"repo/{tx.branch.id}/train-data.txt") + fs.put_file("train-data.txt", "train-data.txt") tx.commit(message="Add training data") - fs.put_file("test-data.txt", f"repo/{tx.branch.id}/test-data.txt") + fs.put_file("test-data.txt", "test-data.txt") sha = tx.commit(message="Add test data") tx.tag(sha, name="My train-test split") ``` @@ -35,6 +35,13 @@ The full list of supported lakeFS versioning operations (by default, these opera * [`rev_parse`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.rev_parse), for parsing revisions like branch/tag names and SHA fragments into full commit SHAs. * [`tag`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.tag), for creating a tag pointing to a commit. +## Limitations of transactions + +Transactions are scoped to a single repository and branch only, equal to those given to the `fs.transaction()` context manager. +When uploading files in a transaction via `fs.put()` or `fs.put_file()`, you **must** give all remote paths as file names. +If you use a fully qualified URI, leading repository and branch names will be interpreted as subdirectories, which will be created on upload. +No warnings or errors will be thrown, so be sure to double-check your paths in all transaction scopes. + ## Lifecycle of ephemeral transaction branches You can control the lifecycle for a transaction branch with the `delete` argument: @@ -56,7 +63,7 @@ from lakefs_spec import LakeFSFileSystem fs = LakeFSFileSystem() with fs.transaction("repo", "main", delete="onsuccess") as tx: - fs.put_file("my-file.txt", f"repo/{tx.branch.id}/my-file.txt") + fs.put_file("my-file.txt", "my-file.txt") tx.commit(message="Add my-file.txt") raise ValueError("oops!") ``` diff --git a/src/lakefs_spec/spec.py b/src/lakefs_spec/spec.py index 18666c22..8669d595 100644 --- a/src/lakefs_spec/spec.py +++ b/src/lakefs_spec/spec.py @@ -170,6 +170,20 @@ def wrapped_api_call( except ServerException as e: raise translate_lakefs_error(e, rpath=rpath, message=message, set_cause=set_cause) + def make_uri(self, path: str | os.PathLike[str]) -> str: + spath = stringify_path(path) + # NB: this fails silently if the input path is already fully qualified. + # However, in general, it's impossible to distinguish between a + # fully qualified path and a normal nested path, so at most, we + # could split off the first segment of the input and check it against existing + # repositories. + if self._intrans: + prefix = "/".join([self.transaction.repository, self.transaction.branch.id]) + if spath.startswith(prefix): + return spath + return prefix + "/" + spath + return spath + def checksum(self, path: str | os.PathLike[str]) -> str | None: """ Get a remote lakeFS file object's checksum. @@ -186,7 +200,7 @@ def checksum(self, path: str | os.PathLike[str]) -> str | None: str | None The remote file's checksum, or ``None`` if ``path`` points to a directory or does not exist. """ - path = stringify_path(path) + path = self.make_uri(path) try: return self.info(path).get("checksum") except FileNotFoundError: @@ -218,7 +232,7 @@ def exists(self, path: str | os.PathLike[str], **kwargs: Any) -> bool: PermissionError If the user does not have sufficient permissions to query object existence. """ - path = stringify_path(path) + path = self.make_uri(path) repository, ref, resource = parse(path) try: reference = lakefs.Reference(repository, ref, client=self.client) @@ -261,8 +275,8 @@ def cp_file( ValueError When attempting to copy objects between repositories. """ - path1 = stringify_path(path1) - path2 = stringify_path(path2) + path1 = self.make_uri(path1) + path2 = self.make_uri(path2) if path1 == path2: return @@ -306,7 +320,7 @@ def get_file( **kwargs: Any Additional keyword arguments passed to ``AbstractFileSystem.open()``. """ - rpath = stringify_path(rpath) + rpath = self.make_uri(rpath) lpath = stringify_path(lpath) if precheck and Path(lpath).is_file(): @@ -343,7 +357,7 @@ def info(self, path: str | os.PathLike[str], **kwargs: Any) -> dict[str, Any]: FileNotFoundError If the ``path`` refers to a non-file path that does not exist in the repository. """ - path = stringify_path(path) + path = self.make_uri(path) repository, ref, resource = parse(path) # first, try with `stat_object` in case of a file. # the condition below checks edge cases of resources that cannot be files. @@ -481,6 +495,7 @@ def ls( A list of all objects' metadata under the given remote path if ``detail=True``, or alternatively only their names if ``detail=False``. """ path = self._strip_protocol(path) + path = self.make_uri(path) repository, ref, prefix = parse(path) recursive = kwargs.pop("recursive", False) @@ -557,6 +572,7 @@ def ls( # To make recursive ls behave identical to the non-recursive case, # add back virtual `directory` entries, which are only returned by # the lakeFS API when querying non-recursively. + # TODO: Fix this up for intrans here = self._strip_protocol(path).rstrip("/") subdirs = {parent for o in info if (parent := self._parent(o["name"])) != here} for subdir in subdirs: @@ -623,7 +639,7 @@ def open( if mode not in {"r", "rb", "w", "wb", "x", "xb"}: raise NotImplementedError(f"unsupported mode {mode!r}") - path = stringify_path(path) + path = self.make_uri(path) repo, ref, resource = parse(path) if mode.startswith("r"): @@ -682,7 +698,7 @@ def put_file( Additional keyword arguments to pass to ``LakeFSFileSystem.open()``. """ lpath = stringify_path(lpath) - rpath = stringify_path(rpath) + rpath = self.make_uri(rpath) if precheck and Path(lpath).is_file(): remote_checksum = self.checksum(rpath) @@ -730,7 +746,7 @@ def rm( possible. """ - path = stringify_path(path) + path = self.make_uri(path) repository, ref, prefix = parse(path) with self.wrapped_api_call(rpath=path): diff --git a/tests/test_put_file.py b/tests/test_put_file.py index 9c85ddf3..1f6249ec 100644 --- a/tests/test_put_file.py +++ b/tests/test_put_file.py @@ -21,7 +21,7 @@ def test_no_change_postcommit( message = f"Add file {random_file.name}" with fs.transaction(repository, temp_branch) as tx: - fs.put(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + fs.put(lpath, random_file.name) tx.commit(message=message) commits = list(temp_branch.log(max_amount=2)) @@ -31,8 +31,8 @@ def test_no_change_postcommit( # put the same file again, this time the diff is empty with fs.transaction(repository, temp_branch) as tx: - fs.put(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}", precheck=False) - tx.commit(message=f"Add file {random_file.name}") + fs.put(lpath, random_file.name, precheck=False) + tx.commit(message=message) # check that no other commit has happened. assert temp_branch.head.get_commit() == current_head diff --git a/tests/test_rm.py b/tests/test_rm.py index 8026ff91..8e98c017 100644 --- a/tests/test_rm.py +++ b/tests/test_rm.py @@ -28,7 +28,7 @@ def test_rm_with_transaction( message = "Remove file README.md" with fs.transaction(repository, temp_branch, automerge=True) as tx: - fs.rm(f"{repository.id}/{tx.branch.id}/README.md") + fs.rm("README.md") tx.commit(message=message) commits = list(temp_branch.log(max_amount=2)) diff --git a/tests/test_transactions.py b/tests/test_transactions.py index 032c61a2..61fb1ebb 100644 --- a/tests/test_transactions.py +++ b/tests/test_transactions.py @@ -17,12 +17,11 @@ def test_transaction_commit( random_file = random_file_factory.make() lpath = str(random_file) - rpath = f"{repository.id}/{temp_branch.id}/{random_file.name}" message = f"Add file {random_file.name}" with fs.transaction(repository, temp_branch) as tx: - fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + fs.put_file(lpath, random_file.name) assert len(tx.files) == 1 # sha is a placeholder for the actual SHA created on transaction completion. sha = tx.commit(message=message) @@ -65,7 +64,7 @@ def test_transaction_merge( tbname = tx.branch.id lpath = str(random_file) # stage a file on the transaction branch... - fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + fs.put_file(lpath, random_file.name) # ... commit it with the above message tx.commit(message=message) # ... and merge it into temp_branch. @@ -90,7 +89,7 @@ def test_transaction_revert( message = f"Add file {random_file.name}" with fs.transaction(repository, temp_branch, automerge=True) as tx: - fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + fs.put_file(lpath, random_file.name) tx.commit(message=message) revert_commit = tx.revert(temp_branch, temp_branch.head) @@ -113,7 +112,7 @@ def test_transaction_failure( try: with fs.transaction(repository, temp_branch) as tx: - fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + fs.put_file(lpath, random_file.name) tx.commit(message=message) raise RuntimeError("something went wrong") except RuntimeError: @@ -159,8 +158,8 @@ def test_warn_uncommitted_changes( lpath = str(random_file) with pytest.warns(match="uncommitted changes.*lost"): - with fs.transaction(repository, temp_branch) as tx: - fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + with fs.transaction(repository, temp_branch): + fs.put_file(lpath, random_file.name) def test_warn_uncommitted_changes_on_persisted_branch( @@ -175,4 +174,4 @@ def test_warn_uncommitted_changes_on_persisted_branch( with pytest.warns(match="uncommitted changes(?:(?!lost).)*$"): with fs.transaction(repository, temp_branch, delete="never") as tx: - fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}") + fs.put_file(lpath, random_file.name)