Skip to content

refactor: Interpret all rpaths as resource names in transactions #318

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions docs/guides/transactions.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ from lakefs_spec import LakeFSFileSystem
fs = LakeFSFileSystem()

with fs.transaction("repo", "main") as tx:
fs.put_file("train-data.txt", f"repo/{tx.branch.id}/train-data.txt")
fs.put_file("train-data.txt", "train-data.txt")
tx.commit(message="Add training data")
fs.put_file("test-data.txt", f"repo/{tx.branch.id}/test-data.txt")
fs.put_file("test-data.txt", "test-data.txt")
sha = tx.commit(message="Add test data")
tx.tag(sha, name="My train-test split")
```
Expand All @@ -35,6 +35,13 @@ The full list of supported lakeFS versioning operations (by default, these opera
* [`rev_parse`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.rev_parse), for parsing revisions like branch/tag names and SHA fragments into full commit SHAs.
* [`tag`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.tag), for creating a tag pointing to a commit.

## Limitations of transactions

Transactions are scoped to a single repository and branch only, equal to those given to the `fs.transaction()` context manager.
When uploading files in a transaction via `fs.put()` or `fs.put_file()`, you **must** give all remote paths as file names.
If you use a fully qualified URI, leading repository and branch names will be interpreted as subdirectories, which will be created on upload.
No warnings or errors will be thrown, so be sure to double-check your paths in all transaction scopes.

## Lifecycle of ephemeral transaction branches

You can control the lifecycle for a transaction branch with the `delete` argument:
Expand All @@ -56,7 +63,7 @@ from lakefs_spec import LakeFSFileSystem
fs = LakeFSFileSystem()

with fs.transaction("repo", "main", delete="onsuccess") as tx:
fs.put_file("my-file.txt", f"repo/{tx.branch.id}/my-file.txt")
fs.put_file("my-file.txt", "my-file.txt")
tx.commit(message="Add my-file.txt")
raise ValueError("oops!")
```
Expand Down
34 changes: 25 additions & 9 deletions src/lakefs_spec/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,20 @@ def wrapped_api_call(
except ServerException as e:
raise translate_lakefs_error(e, rpath=rpath, message=message, set_cause=set_cause)

def make_uri(self, path: str | os.PathLike[str]) -> str:
spath = stringify_path(path)
# NB: this fails silently if the input path is already fully qualified.
# However, in general, it's impossible to distinguish between a
# fully qualified path and a normal nested path, so at most, we
# could split off the first segment of the input and check it against existing
# repositories.
if self._intrans:
prefix = "/".join([self.transaction.repository, self.transaction.branch.id])
if spath.startswith(prefix):
return spath
return prefix + "/" + spath
return spath

def checksum(self, path: str | os.PathLike[str]) -> str | None:
"""
Get a remote lakeFS file object's checksum.
Expand All @@ -186,7 +200,7 @@ def checksum(self, path: str | os.PathLike[str]) -> str | None:
str | None
The remote file's checksum, or ``None`` if ``path`` points to a directory or does not exist.
"""
path = stringify_path(path)
path = self.make_uri(path)
try:
return self.info(path).get("checksum")
except FileNotFoundError:
Expand Down Expand Up @@ -218,7 +232,7 @@ def exists(self, path: str | os.PathLike[str], **kwargs: Any) -> bool:
PermissionError
If the user does not have sufficient permissions to query object existence.
"""
path = stringify_path(path)
path = self.make_uri(path)
repository, ref, resource = parse(path)
try:
reference = lakefs.Reference(repository, ref, client=self.client)
Expand Down Expand Up @@ -261,8 +275,8 @@ def cp_file(
ValueError
When attempting to copy objects between repositories.
"""
path1 = stringify_path(path1)
path2 = stringify_path(path2)
path1 = self.make_uri(path1)
path2 = self.make_uri(path2)
if path1 == path2:
return

Expand Down Expand Up @@ -306,7 +320,7 @@ def get_file(
**kwargs: Any
Additional keyword arguments passed to ``AbstractFileSystem.open()``.
"""
rpath = stringify_path(rpath)
rpath = self.make_uri(rpath)
lpath = stringify_path(lpath)

if precheck and Path(lpath).is_file():
Expand Down Expand Up @@ -343,7 +357,7 @@ def info(self, path: str | os.PathLike[str], **kwargs: Any) -> dict[str, Any]:
FileNotFoundError
If the ``path`` refers to a non-file path that does not exist in the repository.
"""
path = stringify_path(path)
path = self.make_uri(path)
repository, ref, resource = parse(path)
# first, try with `stat_object` in case of a file.
# the condition below checks edge cases of resources that cannot be files.
Expand Down Expand Up @@ -481,6 +495,7 @@ def ls(
A list of all objects' metadata under the given remote path if ``detail=True``, or alternatively only their names if ``detail=False``.
"""
path = self._strip_protocol(path)
path = self.make_uri(path)
repository, ref, prefix = parse(path)

recursive = kwargs.pop("recursive", False)
Expand Down Expand Up @@ -557,6 +572,7 @@ def ls(
# To make recursive ls behave identical to the non-recursive case,
# add back virtual `directory` entries, which are only returned by
# the lakeFS API when querying non-recursively.
# TODO: Fix this up for intrans
here = self._strip_protocol(path).rstrip("/")
subdirs = {parent for o in info if (parent := self._parent(o["name"])) != here}
for subdir in subdirs:
Expand Down Expand Up @@ -623,7 +639,7 @@ def open(
if mode not in {"r", "rb", "w", "wb", "x", "xb"}:
raise NotImplementedError(f"unsupported mode {mode!r}")

path = stringify_path(path)
path = self.make_uri(path)
repo, ref, resource = parse(path)

if mode.startswith("r"):
Expand Down Expand Up @@ -682,7 +698,7 @@ def put_file(
Additional keyword arguments to pass to ``LakeFSFileSystem.open()``.
"""
lpath = stringify_path(lpath)
rpath = stringify_path(rpath)
rpath = self.make_uri(rpath)

if precheck and Path(lpath).is_file():
remote_checksum = self.checksum(rpath)
Expand Down Expand Up @@ -730,7 +746,7 @@ def rm(
possible.
"""

path = stringify_path(path)
path = self.make_uri(path)
repository, ref, prefix = parse(path)

with self.wrapped_api_call(rpath=path):
Expand Down
6 changes: 3 additions & 3 deletions tests/test_put_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_no_change_postcommit(
message = f"Add file {random_file.name}"

with fs.transaction(repository, temp_branch) as tx:
fs.put(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}")
fs.put(lpath, random_file.name)
tx.commit(message=message)

commits = list(temp_branch.log(max_amount=2))
Expand All @@ -31,8 +31,8 @@ def test_no_change_postcommit(

# put the same file again, this time the diff is empty
with fs.transaction(repository, temp_branch) as tx:
fs.put(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}", precheck=False)
tx.commit(message=f"Add file {random_file.name}")
fs.put(lpath, random_file.name, precheck=False)
tx.commit(message=message)

# check that no other commit has happened.
assert temp_branch.head.get_commit() == current_head
Expand Down
2 changes: 1 addition & 1 deletion tests/test_rm.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_rm_with_transaction(
message = "Remove file README.md"

with fs.transaction(repository, temp_branch, automerge=True) as tx:
fs.rm(f"{repository.id}/{tx.branch.id}/README.md")
fs.rm("README.md")
tx.commit(message=message)

commits = list(temp_branch.log(max_amount=2))
Expand Down
15 changes: 7 additions & 8 deletions tests/test_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ def test_transaction_commit(
random_file = random_file_factory.make()

lpath = str(random_file)
rpath = f"{repository.id}/{temp_branch.id}/{random_file.name}"

message = f"Add file {random_file.name}"

with fs.transaction(repository, temp_branch) as tx:
fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}")
fs.put_file(lpath, random_file.name)
assert len(tx.files) == 1
# sha is a placeholder for the actual SHA created on transaction completion.
sha = tx.commit(message=message)
Expand Down Expand Up @@ -65,7 +64,7 @@ def test_transaction_merge(
tbname = tx.branch.id
lpath = str(random_file)
# stage a file on the transaction branch...
fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}")
fs.put_file(lpath, random_file.name)
# ... commit it with the above message
tx.commit(message=message)
# ... and merge it into temp_branch.
Expand All @@ -90,7 +89,7 @@ def test_transaction_revert(
message = f"Add file {random_file.name}"

with fs.transaction(repository, temp_branch, automerge=True) as tx:
fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}")
fs.put_file(lpath, random_file.name)
tx.commit(message=message)
revert_commit = tx.revert(temp_branch, temp_branch.head)

Expand All @@ -113,7 +112,7 @@ def test_transaction_failure(

try:
with fs.transaction(repository, temp_branch) as tx:
fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}")
fs.put_file(lpath, random_file.name)
tx.commit(message=message)
raise RuntimeError("something went wrong")
except RuntimeError:
Expand Down Expand Up @@ -159,8 +158,8 @@ def test_warn_uncommitted_changes(
lpath = str(random_file)

with pytest.warns(match="uncommitted changes.*lost"):
with fs.transaction(repository, temp_branch) as tx:
fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}")
with fs.transaction(repository, temp_branch):
fs.put_file(lpath, random_file.name)


def test_warn_uncommitted_changes_on_persisted_branch(
Expand All @@ -175,4 +174,4 @@ def test_warn_uncommitted_changes_on_persisted_branch(

with pytest.warns(match="uncommitted changes(?:(?!lost).)*$"):
with fs.transaction(repository, temp_branch, delete="never") as tx:
fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}")
fs.put_file(lpath, random_file.name)