Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions metaflow/plugins/datatools/s3/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
get_timestamp,
TRANSIENT_RETRY_START_LINE,
TRANSIENT_RETRY_LINE_CONTENT,
CONSECUTIVE_SLASHES_REGEX,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -550,15 +551,19 @@ def __init__(
else:
prefix = os.path.join(prefix, run.parent.id, run.id)

self._s3root = "s3://%s" % os.path.join(bucket, prefix.strip("/"))
s3root_raw = "s3://%s" % os.path.join(bucket, prefix.strip("/"))
# Normalize the path by collapsing consecutive slashes
self._s3root = CONSECUTIVE_SLASHES_REGEX.sub("/", s3root_raw).rstrip("/")
elif s3root:
# 2. use an explicit S3 prefix
parsed = urlparse(to_unicode(s3root))
if parsed.scheme != "s3":
raise MetaflowS3URLException(
"s3root needs to be an S3 URL prefixed with s3://."
)
self._s3root = s3root.rstrip("/")
# Normalize the path by collapsing consecutive slashes
normalized = CONSECUTIVE_SLASHES_REGEX.sub("/", s3root)
self._s3root = normalized.rstrip("/")
else:
# 3. use the client only with full URLs
self._s3root = None
Expand Down
7 changes: 5 additions & 2 deletions metaflow/plugins/datatools/s3/s3op.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
get_timestamp,
TRANSIENT_RETRY_LINE_CONTENT,
TRANSIENT_RETRY_START_LINE,
CONSECUTIVE_SLASHES_REGEX,
)
import metaflow.tracing as tracing
from metaflow.metaflow_config import (
Expand Down Expand Up @@ -623,7 +624,7 @@ def list_prefix(self, prefix_url, delimiter=""):
and len(key_path) > len(normalized_prefix)
):
continue
url = url_base + key_path
url = CONSECUTIVE_SLASHES_REGEX.sub("/", url_base + key_path)
urlobj = S3Url(
url=url,
bucket=prefix_url.bucket,
Expand All @@ -635,7 +636,9 @@ def list_prefix(self, prefix_url, delimiter=""):
if "CommonPrefixes" in page:
# we get CommonPrefixes if Delimiter is a non-empty string
for key in page.get("CommonPrefixes", []):
url = url_base + key["Prefix"]
url = CONSECUTIVE_SLASHES_REGEX.sub(
"/", url_base + key["Prefix"]
)
urlobj = S3Url(
url=url,
bucket=prefix_url.bucket,
Expand Down
5 changes: 5 additions & 0 deletions metaflow/plugins/datatools/s3/s3util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import print_function
from datetime import datetime
import random
import re
import time
import sys
import os
Expand All @@ -19,6 +20,10 @@
TRANSIENT_RETRY_LINE_CONTENT = "<none>"
TRANSIENT_RETRY_START_LINE = "### RETRY INPUTS ###"

# Compiled regex for normalizing consecutive slashes in S3 paths
# Matches two or more consecutive slashes not preceded by a colon (to preserve s3://)
CONSECUTIVE_SLASHES_REGEX = re.compile(r"(?<!:)//+")


def get_s3_client(s3_role_arn=None, s3_session_vars=None, s3_client_params=None):
from metaflow.plugins.aws.aws_client import get_aws_client
Expand Down
5 changes: 4 additions & 1 deletion test/data/s3/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,10 +982,13 @@ def test_list_recursive_sibling_prefix_filtering(inject_failure_rate):
) as s3:
objects = s3.list_recursive()

# Use the normalized s3root from the S3 object for accurate path extraction
test_prefix_path = f"{s3_setup._s3root}/{test_prefix}/"

found_relative_paths = []
for obj in objects:
# Get path relative to our test prefix
relative_path = obj.url.replace(f"{base_s3root}/{test_prefix}/", "")
relative_path = obj.url.replace(test_prefix_path, "")
found_relative_paths.append(relative_path)

expected_under_log = ["log/test.txt"]
Expand Down
Loading