Skip to content

Commit f86c7e1

Browse files
committed
Split download en zip_files util
Signed-off-by: Bram Stoeller <bram.stoeller@alliander.com>
1 parent 08a80fa commit f86c7e1

File tree

2 files changed

+194
-0
lines changed

2 files changed

+194
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# SPDX-FileCopyrightText: 2022 Contributors to the Power Grid Model IO project <dynamic.grid.calculation@alliander.com>
2+
#
3+
# SPDX-License-Identifier: MPL-2.0
4+
"""
5+
Helper function to extract zip files
6+
7+
csv_dir_path = extract("/tmp/1-complete_data-mixed-all-0-sw.zip")
8+
9+
This extracts the files, in a folder which corresponds to the zip file name ("/tmp/1-complete_data-mixed-all-0-sw/" in
10+
our example), and it returns the path to that directory. By default, it will not re-download or re-extract the zip
11+
file as long as the files exist.
12+
13+
"""
14+
15+
import zipfile
16+
from pathlib import Path
17+
from typing import Optional
18+
19+
import structlog
20+
from tqdm import tqdm
21+
22+
_log = structlog.get_logger(__name__)
23+
24+
25+
def extract(src_file_path: Path, dst_dir_path: Optional[Path] = None, skip_if_exists=False) -> Path:
26+
"""
27+
Extract a .zip file and return the destination dir
28+
29+
Args:
30+
src_file_path: The .zip file to extract.
31+
dst_dir_path: An optional destination path. If none is given, the src_file_path without .zip extension is used.
32+
skip_if_exists: Skip existing files, otherwise raise an exception when a file exists.
33+
34+
Returns: The path where the files are extracted
35+
36+
"""
37+
if src_file_path.suffix.lower() != ".zip":
38+
raise ValueError(f"Only files with .zip extension are supported, got {src_file_path.name}")
39+
40+
if dst_dir_path is None:
41+
dst_dir_path = src_file_path.with_suffix("")
42+
43+
log = _log.bind(src_file_path=src_file_path, dst_dir_path=dst_dir_path)
44+
45+
if dst_dir_path.exists():
46+
if not dst_dir_path.is_dir():
47+
raise NotADirectoryError(f"Destination dir {dst_dir_path} exists and is not a directory")
48+
49+
# Create the destination directory
50+
dst_dir_path.mkdir(parents=True, exist_ok=True)
51+
52+
# Extract per file, so we can show a progress bar
53+
with zipfile.ZipFile(src_file_path, "r") as zip_file:
54+
file_list = zip_file.namelist()
55+
for file_path in tqdm(desc="Extracting", iterable=file_list, total=len(file_list), unit="file", leave=True):
56+
dst_file_path = dst_dir_path / file_path
57+
if dst_file_path.exists() and dst_file_path.stat().st_size > 0:
58+
if skip_if_exists:
59+
log.debug("Skip file extraction, destination file exists", dst_file_path=dst_file_path)
60+
continue
61+
raise FileExistsError(f"Destination file {dst_dir_path / file_path} exists and is not empty")
62+
zip_file.extract(member=file_path, path=dst_dir_path)
63+
64+
# Zip files often contain a single directory with the same name as the zip file.
65+
# In that case, return the dir to that directory instead of the root dir
66+
only_item: Optional[Path] = None
67+
for item in dst_dir_path.iterdir():
68+
# If only_item is None, this is the first iteration, so item may be the only item
69+
if only_item is None:
70+
only_item = item
71+
# Else, if only_item is not None, there are more than one items in the root of the directory.
72+
# This means hat there is no 'only_item' and we can stop the loop
73+
else:
74+
only_item = None
75+
break
76+
if only_item and only_item.is_dir() and only_item.name == src_file_path.stem:
77+
dst_dir_path = only_item
78+
79+
return dst_dir_path.resolve()

tests/unit/utils/test_zip_files.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# SPDX-FileCopyrightText: 2022 Contributors to the Power Grid Model IO project <dynamic.grid.calculation@alliander.com>
2+
#
3+
# SPDX-License-Identifier: MPL-2.0
4+
import shutil
5+
import tempfile
6+
from pathlib import Path
7+
from unittest.mock import MagicMock, patch
8+
9+
import pytest
10+
import structlog.testing
11+
12+
from power_grid_model_io.utils.zip_files import extract
13+
14+
from ...utils import MockTqdm, assert_log_exists
15+
16+
DATA_DIR = Path(__file__).parents[2] / "data" / "zip_files"
17+
ZIP1 = DATA_DIR / "foo.zip"
18+
ZIP2 = DATA_DIR / "foo-bar.zip"
19+
20+
21+
@pytest.fixture()
22+
def temp_dir():
23+
with tempfile.TemporaryDirectory() as tmp:
24+
yield Path(tmp)
25+
26+
27+
@patch("power_grid_model_io.utils.download.tqdm")
28+
def test_extract(mock_tqdm: MagicMock, temp_dir: Path):
29+
# Arrange
30+
src_file_path = temp_dir / "compressed.zip"
31+
dst_dir_path = temp_dir / "extracted"
32+
shutil.copyfile(ZIP2, src_file_path)
33+
mock_tqdm.side_effect = MockTqdm
34+
35+
# Act
36+
extract_dir_path = extract(src_file_path=src_file_path, dst_dir_path=dst_dir_path)
37+
38+
# Assert
39+
assert extract_dir_path == dst_dir_path
40+
assert (dst_dir_path / "foo.txt").is_file()
41+
assert (dst_dir_path / "folder/bar.txt").is_file()
42+
43+
44+
@patch("power_grid_model_io.utils.download.tqdm")
45+
def test_extract__auto_dir(mock_tqdm: MagicMock, temp_dir: Path):
46+
# Arrange
47+
src_file_path = temp_dir / "compressed.zip"
48+
shutil.copyfile(ZIP2, src_file_path)
49+
mock_tqdm.side_effect = MockTqdm
50+
51+
# Act
52+
extract_dir_path = extract(src_file_path=src_file_path)
53+
54+
# Assert
55+
assert extract_dir_path == temp_dir / "compressed"
56+
assert (temp_dir / "compressed" / "foo.txt").is_file()
57+
assert (temp_dir / "compressed" / "folder" / "bar.txt").is_file()
58+
59+
60+
def test_extract__invalid_file_extension():
61+
# Act / Assert
62+
with pytest.raises(ValueError, match=r"Only files with \.zip extension are supported, got tempfile\.download"):
63+
extract(src_file_path=Path("/tmp/dir/tempfile.download"))
64+
65+
66+
@patch("power_grid_model_io.utils.download.tqdm")
67+
def test_extract__file_exists(mock_tqdm: MagicMock, temp_dir: Path):
68+
# Arrange
69+
src_file_path = temp_dir / "compressed.zip"
70+
dst_dir_path = temp_dir / "extracted"
71+
shutil.copyfile(ZIP2, src_file_path)
72+
mock_tqdm.side_effect = MockTqdm
73+
74+
dst_dir_path.mkdir()
75+
with open(dst_dir_path / "foo.txt", "wb") as fp:
76+
fp.write(b"\0")
77+
78+
# Act / Assert
79+
with pytest.raises(FileExistsError, match=r"Destination file .*foo\.txt exists and is not empty"):
80+
extract(src_file_path=src_file_path, dst_dir_path=dst_dir_path)
81+
82+
83+
@patch("power_grid_model_io.utils.download.tqdm")
84+
def test_extract__skip_if_exists(mock_tqdm: MagicMock, temp_dir: Path):
85+
# Arrange
86+
src_file_path = temp_dir / "compressed.zip"
87+
dst_dir_path = temp_dir / "compressed"
88+
shutil.copyfile(ZIP2, src_file_path)
89+
mock_tqdm.side_effect = MockTqdm
90+
91+
dst_dir_path.mkdir()
92+
with open(dst_dir_path / "foo.txt", "wb") as fp:
93+
fp.write(b"\0")
94+
95+
# Act / Assert
96+
with structlog.testing.capture_logs() as capture:
97+
extract(src_file_path=src_file_path, dst_dir_path=dst_dir_path, skip_if_exists=True)
98+
assert_log_exists(
99+
capture, "debug", "Skip file extraction, destination file exists", dst_file_path=dst_dir_path / "foo.txt"
100+
)
101+
102+
103+
@patch("power_grid_model_io.utils.download.tqdm")
104+
def test_extract__return_subdir_path(mock_tqdm: MagicMock, temp_dir: Path):
105+
# Arrange
106+
src_file_path = temp_dir / "foo.zip"
107+
shutil.copyfile(ZIP1, src_file_path)
108+
mock_tqdm.side_effect = MockTqdm
109+
110+
# Act
111+
extract_dir_path = extract(src_file_path=src_file_path)
112+
113+
# Assert
114+
assert extract_dir_path == temp_dir / "foo" / "foo"
115+
assert (temp_dir / "foo" / "foo" / "foo.txt").is_file()

0 commit comments

Comments
 (0)