|
| 1 | +""" |
| 2 | +S3 key sanitization utilities for ensuring AWS S3 compatibility. |
| 3 | +
|
| 4 | +This module provides utilities for sanitizing file names to be compatible with |
| 5 | +AWS S3 object key naming guidelines while ensuring uniqueness when significant |
| 6 | +sanitization occurs. |
| 7 | +
|
| 8 | +Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html |
| 9 | +""" |
| 10 | + |
| 11 | +import hashlib |
| 12 | +import re |
| 13 | +import urllib.parse |
| 14 | +from re import Match |
| 15 | + |
| 16 | +# Constants for S3 key generation |
| 17 | +HASH_LENGTH = 64 # SHA256 hex digest length |
| 18 | +HASH_SEPARATOR_LENGTH = 1 # Length of underscore separator |
| 19 | +HASH_WITH_SEPARATOR_LENGTH = HASH_LENGTH + HASH_SEPARATOR_LENGTH |
| 20 | + |
| 21 | + |
| 22 | +def _encode_special_char(match: Match[str]) -> str: |
| 23 | + """Helper function to URL encode special characters.""" |
| 24 | + return urllib.parse.quote(match.group(0), safe="") |
| 25 | + |
| 26 | + |
| 27 | +def sanitize_s3_key_name(file_name: str) -> str: |
| 28 | + """ |
| 29 | + Sanitize file name to be S3-compatible according to AWS guidelines. |
| 30 | +
|
| 31 | + This method: |
| 32 | + 1. Replaces problematic characters with safe alternatives |
| 33 | + 2. URL-encodes characters that might require special handling |
| 34 | + 3. Ensures the result is safe for S3 object keys |
| 35 | + 4. Adds uniqueness when significant sanitization occurs |
| 36 | +
|
| 37 | + Args: |
| 38 | + file_name: The original file name to sanitize |
| 39 | +
|
| 40 | + Returns: |
| 41 | + A sanitized file name that is S3-compatible |
| 42 | +
|
| 43 | + Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html |
| 44 | + """ |
| 45 | + if not file_name: |
| 46 | + return "unnamed_file" |
| 47 | + |
| 48 | + original_name = file_name |
| 49 | + |
| 50 | + # Characters to avoid completely (replace with underscore) |
| 51 | + # These are characters that AWS recommends avoiding |
| 52 | + avoid_chars = r'[\\{}^%`\[\]"<>#|~]' |
| 53 | + |
| 54 | + # Replace avoided characters with underscore |
| 55 | + sanitized = re.sub(avoid_chars, "_", file_name) |
| 56 | + |
| 57 | + # Characters that might require special handling but are allowed |
| 58 | + # We'll URL encode these to be safe |
| 59 | + special_chars = r"[&$@=;:+,?\s]" |
| 60 | + |
| 61 | + sanitized = re.sub(special_chars, _encode_special_char, sanitized) |
| 62 | + |
| 63 | + # Handle non-ASCII characters by URL encoding them |
| 64 | + # This ensures Unicode characters are properly handled |
| 65 | + needs_unicode_encoding = False |
| 66 | + try: |
| 67 | + # Try to encode as ASCII to check if it contains non-ASCII chars |
| 68 | + sanitized.encode("ascii") |
| 69 | + except UnicodeEncodeError: |
| 70 | + needs_unicode_encoding = True |
| 71 | + # Contains non-ASCII characters, URL encode the entire string |
| 72 | + # but preserve safe ASCII characters |
| 73 | + sanitized = urllib.parse.quote( |
| 74 | + sanitized, |
| 75 | + safe="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.()!*", |
| 76 | + ) |
| 77 | + |
| 78 | + # Ensure we don't have consecutive periods at the start (relative path issue) |
| 79 | + sanitized = re.sub(r"^\.+", "", sanitized) |
| 80 | + |
| 81 | + # Remove any trailing periods to avoid download issues |
| 82 | + sanitized = sanitized.rstrip(".") |
| 83 | + |
| 84 | + # If sanitization resulted in empty string, use a default |
| 85 | + if not sanitized: |
| 86 | + sanitized = "sanitized_file" |
| 87 | + |
| 88 | + # Check if significant sanitization occurred and add uniqueness if needed |
| 89 | + significant_changes = ( |
| 90 | + # Check if we replaced many characters |
| 91 | + len(re.findall(avoid_chars, original_name)) > 3 |
| 92 | + or |
| 93 | + # Check if we had to URL encode Unicode characters |
| 94 | + needs_unicode_encoding |
| 95 | + or |
| 96 | + # Check if the sanitized name is very different in length (expansion due to encoding) |
| 97 | + len(sanitized) > len(original_name) * 2 |
| 98 | + or |
| 99 | + # Check if the original had many special characters |
| 100 | + len(re.findall(special_chars, original_name)) > 5 |
| 101 | + ) |
| 102 | + |
| 103 | + if significant_changes: |
| 104 | + # Add a short hash to ensure uniqueness while keeping some readability |
| 105 | + name_hash = hashlib.sha256(original_name.encode("utf-8")).hexdigest()[:8] |
| 106 | + |
| 107 | + # Try to preserve file extension if it exists and is reasonable |
| 108 | + if "." in sanitized and len(sanitized.split(".")[-1]) <= 10: |
| 109 | + name_parts = sanitized.rsplit(".", 1) |
| 110 | + sanitized = f"{name_parts[0]}_{name_hash}.{name_parts[1]}" |
| 111 | + else: |
| 112 | + sanitized = f"{sanitized}_{name_hash}" |
| 113 | + |
| 114 | + return sanitized |
| 115 | + |
| 116 | + |
| 117 | +def generate_s3_key( |
| 118 | + file_name: str, prefix: str, tenant_id: str, max_key_length: int = 1024 |
| 119 | +) -> str: |
| 120 | + """ |
| 121 | + Generate a complete S3 key from file name with prefix and tenant ID. |
| 122 | +
|
| 123 | + Args: |
| 124 | + file_name: The original file name |
| 125 | + prefix: S3 key prefix (e.g., 'onyx-files') |
| 126 | + tenant_id: Tenant identifier |
| 127 | + max_key_length: Maximum allowed S3 key length (default: 1024) |
| 128 | +
|
| 129 | + Returns: |
| 130 | + A complete S3 key that fits within the length limit |
| 131 | + """ |
| 132 | + # Strip slashes from prefix and tenant_id to avoid double slashes |
| 133 | + prefix_clean = prefix.strip("/") |
| 134 | + tenant_clean = tenant_id.strip("/") |
| 135 | + |
| 136 | + # Sanitize the file name first |
| 137 | + sanitized_file_name = sanitize_s3_key_name(file_name) |
| 138 | + |
| 139 | + # Handle long file names that could exceed S3's key limit |
| 140 | + # S3 key format: {prefix}/{tenant_id}/{file_name} |
| 141 | + prefix_and_tenant_parts = [prefix_clean, tenant_clean] |
| 142 | + prefix_and_tenant = "/".join(prefix_and_tenant_parts) + "/" |
| 143 | + max_file_name_length = max_key_length - len(prefix_and_tenant) |
| 144 | + |
| 145 | + if len(sanitized_file_name) < max_file_name_length: |
| 146 | + return "/".join(prefix_and_tenant_parts + [sanitized_file_name]) |
| 147 | + |
| 148 | + # For very long file names, use hash-based approach to ensure uniqueness |
| 149 | + # Use the original file name for the hash to maintain consistency |
| 150 | + file_hash = hashlib.sha256(file_name.encode("utf-8")).hexdigest() |
| 151 | + |
| 152 | + # Calculate how much space we have for the readable part |
| 153 | + # Reserve space for hash (64 chars) + underscore separator (1 char) |
| 154 | + readable_part_max_length = max(0, max_file_name_length - HASH_WITH_SEPARATOR_LENGTH) |
| 155 | + |
| 156 | + if readable_part_max_length > 0: |
| 157 | + # Use first part of sanitized name + hash to maintain some readability |
| 158 | + readable_part = sanitized_file_name[:readable_part_max_length] |
| 159 | + truncated_name = f"{readable_part}_{file_hash}" |
| 160 | + else: |
| 161 | + # If no space for readable part, just use hash |
| 162 | + truncated_name = file_hash |
| 163 | + |
| 164 | + return "/".join(prefix_and_tenant_parts + [truncated_name]) |
0 commit comments