Skip to content

Commit 0e6d2f3

Browse files
authored
fix: feed files are not been extracted (#1303)
1 parent fb797f9 commit 0e6d2f3

File tree

7 files changed

+141
-27
lines changed

7 files changed

+141
-27
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,7 @@ coverage_reports
7777
tf.plan
7878

7979
# CSV generation output files
80-
functions-python/**/*.csv
80+
functions-python/**/*.csv
81+
82+
# Local emulators
83+
.cloudstorage

functions-python/batch_process_dataset/.coveragerc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ omit =
55
*/database_gen/*
66
*/dataset_service/*
77
*/shared/*
8+
*/scripts/*
89

910
[report]
1011
exclude_lines =

functions-python/batch_process_dataset/requirements_dev.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ Faker
22
pytest~=7.4.3
33
urllib3-mock
44
requests-mock
5-
python-dotenv~=1.0.0
5+
python-dotenv~=1.0.0
6+
gcp-storage-emulator

functions-python/batch_process_dataset/src/main.py

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -128,18 +128,15 @@ def download_content(self, temporary_file_path):
128128
logger=self.logger,
129129
)
130130
is_zip = zipfile.is_zipfile(temporary_file_path)
131-
if is_zip:
132-
extracted_file_path = os.path.join(
133-
temporary_file_path.split(".")[0], "extracted"
134-
)
135-
with zipfile.ZipFile(temporary_file_path, "r") as zip_ref:
136-
zip_ref.extractall(os.path.dirname(extracted_file_path))
137-
# List all files in the extracted directory
138-
extracted_files = os.listdir(os.path.dirname(extracted_file_path))
139-
self.logger.info(f"Extracted files: {extracted_files}")
140131
return file_hash, is_zip
141132

142-
def upload_file_to_storage(self, source_file_path, dataset_stable_id):
133+
def upload_file_to_storage(
134+
self,
135+
source_file_path,
136+
dataset_stable_id,
137+
extracted_files_path,
138+
public=True,
139+
):
143140
"""
144141
Uploads a file to the GCP bucket
145142
"""
@@ -153,12 +150,12 @@ def upload_file_to_storage(self, source_file_path, dataset_stable_id):
153150
blob = bucket.blob(target_path)
154151
with open(source_file_path, "rb") as file:
155152
blob.upload_from_file(file)
156-
blob.make_public()
153+
if public:
154+
blob.make_public()
157155

158156
base_path, _ = os.path.splitext(source_file_path)
159-
extracted_files_path = os.path.join(base_path, "extracted")
160157
extracted_files: List[Gtfsfile] = []
161-
if not os.path.exists(extracted_files_path):
158+
if not extracted_files_path or not os.path.exists(extracted_files_path):
162159
self.logger.warning(
163160
f"Extracted files path {extracted_files_path} does not exist."
164161
)
@@ -170,7 +167,8 @@ def upload_file_to_storage(self, source_file_path, dataset_stable_id):
170167
f"{self.feed_stable_id}/{dataset_stable_id}/extracted/{file_name}"
171168
)
172169
file_blob.upload_from_filename(file_path)
173-
file_blob.make_public()
170+
if public:
171+
file_blob.make_public()
174172
self.logger.info(
175173
f"Uploaded extracted file {file_name} to {file_blob.public_url}"
176174
)
@@ -183,7 +181,7 @@ def upload_file_to_storage(self, source_file_path, dataset_stable_id):
183181
)
184182
return blob, extracted_files
185183

186-
def upload_dataset(self) -> DatasetFile or None:
184+
def upload_dataset(self, public=True) -> DatasetFile or None:
187185
"""
188186
Uploads a dataset to a GCP bucket as <feed_stable_id>/latest.zip and
189187
<feed_stable_id>/<feed_stable_id>-<upload_datetime>.zip
@@ -203,12 +201,12 @@ def upload_dataset(self) -> DatasetFile or None:
203201
self.logger.info(
204202
f"[{self.feed_stable_id}] File hash is {file_sha256_hash}."
205203
)
206-
207204
if self.latest_hash != file_sha256_hash:
208205
self.logger.info(
209206
f"[{self.feed_stable_id}] Dataset has changed (hash {self.latest_hash}"
210207
f"-> {file_sha256_hash}). Uploading new version."
211208
)
209+
extracted_files_path = self.unzip_files(temp_file_path)
212210
self.logger.info(
213211
f"Creating file {self.feed_stable_id}/latest.zip in bucket {self.bucket_name}"
214212
)
@@ -224,7 +222,10 @@ def upload_dataset(self) -> DatasetFile or None:
224222
f" in bucket {self.bucket_name}"
225223
)
226224
_, extracted_files = self.upload_file_to_storage(
227-
temp_file_path, dataset_stable_id
225+
temp_file_path,
226+
dataset_stable_id,
227+
extracted_files_path,
228+
public=public,
228229
)
229230

230231
return DatasetFile(
@@ -246,6 +247,18 @@ def upload_dataset(self) -> DatasetFile or None:
246247
os.remove(temp_file_path)
247248
return None
248249

250+
def unzip_files(self, temp_file_path):
251+
extracted_files_path = os.path.join(temp_file_path.split(".")[0], "extracted")
252+
self.logger.info(f"Unzipping files to {extracted_files_path}")
253+
# Create the directory for extracted files if it does not exist
254+
os.makedirs(extracted_files_path, exist_ok=True)
255+
with zipfile.ZipFile(temp_file_path, "r") as zip_ref:
256+
zip_ref.extractall(path=extracted_files_path)
257+
# List all files in the extracted directory
258+
extracted_files = os.listdir(extracted_files_path)
259+
self.logger.info(f"Extracted files: {extracted_files}")
260+
return extracted_files_path
261+
249262
def generate_temp_filename(self):
250263
"""
251264
Generates a temporary filename
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import logging
2+
import os
3+
4+
from main import DatasetProcessor
5+
from gcp_storage_emulator.server import create_server
6+
7+
HOST = "localhost"
8+
PORT = 9023
9+
BUCKET_NAME = "verifier"
10+
PRODUCER_URL = "https://example.com/dataset.zip" # Replace with actual producer URL
11+
12+
13+
def verify_download_content(producer_url: str):
14+
"""
15+
Verifies the download_content is able to retrieve the file
16+
This is useful to simulate the download code locally and test issues related with user-agent and downloaded content.
17+
Not supported authenticated feeds currently.
18+
"""
19+
logging.info("Verifying downloaded content... (not implemented)")
20+
21+
logging.info(f"Producer URL: {producer_url}")
22+
23+
processor = DatasetProcessor(
24+
producer_url=producer_url,
25+
feed_id=None,
26+
feed_stable_id=None,
27+
execution_id=None,
28+
latest_hash=None,
29+
bucket_name=None,
30+
authentication_type=0,
31+
api_key_parameter_name=None,
32+
public_hosted_datasets_url=None,
33+
)
34+
tempfile = processor.generate_temp_filename()
35+
logging.info(f"Temp filename: {tempfile}")
36+
file_hash, is_zip = processor.download_content(tempfile)
37+
logging.info(f"File hash: {file_hash}")
38+
39+
40+
def verify_upload_dataset(producer_url: str):
41+
"""
42+
Verifies the upload_dataset is able to upload the dataset to the GCP storage emulator.
43+
This is useful to simulate the upload code locally and test issues related with user-agent and uploaded content.
44+
This function also tests the DatasetProcessor class methods for generating a temporary filename
45+
and uploading the dataset.
46+
:param producer_url:
47+
:return:
48+
"""
49+
processor = DatasetProcessor(
50+
producer_url=producer_url,
51+
feed_id="feed_id",
52+
feed_stable_id="feed_stable_id",
53+
execution_id=None,
54+
latest_hash="123",
55+
bucket_name=BUCKET_NAME,
56+
authentication_type=0,
57+
api_key_parameter_name=None,
58+
public_hosted_datasets_url=None,
59+
)
60+
tempfile = processor.generate_temp_filename()
61+
logging.info(f"Temp filename: {tempfile}")
62+
dataset_file = processor.upload_dataset(public=False)
63+
logging.info(f"Dataset File: {dataset_file}")
64+
65+
66+
if __name__ == "__main__":
67+
logging.basicConfig(level=logging.INFO)
68+
# Replace with actual producer URL
69+
try:
70+
os.environ["STORAGE_EMULATOR_HOST"] = f"http://{HOST}:{PORT}"
71+
server = create_server(
72+
host=HOST, port=PORT, in_memory=False, default_bucket=BUCKET_NAME
73+
)
74+
server.start()
75+
76+
verify_download_content(producer_url=PRODUCER_URL)
77+
logging.info("Download content verification completed successfully.")
78+
verify_upload_dataset(producer_url=PRODUCER_URL)
79+
verify_upload_dataset(producer_url=PRODUCER_URL)
80+
except Exception as e:
81+
logging.error(f"Error verifying download content: {e}")
82+
finally:
83+
server.stop()
84+
logging.info("Verification completed.")

functions-python/batch_process_dataset/tests/test_batch_process_dataset_main.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ def create_cloud_event(mock_data):
4646
class TestDatasetProcessor(unittest.TestCase):
4747
@patch("main.DatasetProcessor.upload_file_to_storage")
4848
@patch("main.DatasetProcessor.download_content")
49+
@patch("main.DatasetProcessor.unzip_files")
4950
def test_upload_dataset_diff_hash(
50-
self, mock_download_url_content, upload_file_to_storage
51+
self, mock_unzip_files, mock_download_url_content, upload_file_to_storage
5152
):
5253
"""
5354
Test upload_dataset method of DatasetProcessor class with different hash from the latest one
@@ -57,6 +58,7 @@ def test_upload_dataset_diff_hash(
5758
mock_blob.path = public_url
5859
upload_file_to_storage.return_value = mock_blob, []
5960
mock_download_url_content.return_value = file_hash, True
61+
mock_unzip_files.return_value = [mock_blob, mock_blob]
6062

6163
processor = DatasetProcessor(
6264
public_url,
@@ -178,6 +180,7 @@ def test_upload_dataset_download_exception(
178180
def test_upload_file_to_storage(self):
179181
bucket_name = "test-bucket"
180182
source_file_path = "path/to/source/file"
183+
extracted_file_path = "path/to/source/file"
181184

182185
mock_blob = Mock()
183186
mock_blob.public_url = public_url
@@ -204,7 +207,9 @@ def test_upload_file_to_storage(self):
204207
test_hosted_public_url,
205208
)
206209
dataset_id = faker.Faker().uuid4()
207-
result, _ = processor.upload_file_to_storage(source_file_path, dataset_id)
210+
result, _ = processor.upload_file_to_storage(
211+
source_file_path, dataset_id, extracted_file_path
212+
)
208213
self.assertEqual(result.public_url, public_url)
209214
mock_client.get_bucket.assert_called_with(bucket_name)
210215
mock_bucket.blob.assert_called_with(

functions-python/helpers/logger.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def filter(self, record):
4242

4343

4444
lock = threading.Lock()
45+
lock_logger = threading.Lock()
4546
_logging_initialized = False
4647

4748

@@ -73,9 +74,15 @@ def get_logger(name: str, stable_id: str = None):
7374
If stable_id is provided, the StableIdFilter is added.
7475
This method can be called multiple times for the same logger name without creating a side effect.
7576
"""
76-
logger = logging.getLogger(name)
77-
if stable_id and not any(
78-
isinstance(log_filter, StableIdFilter) for log_filter in logger.filters
79-
):
80-
logger.addFilter(StableIdFilter(stable_id))
81-
return logger
77+
with lock_logger:
78+
# Create the logger with the provided name to avoid retuning the same logger instance
79+
logger = (
80+
logging.getLogger(f"{name}_{stable_id}")
81+
if stable_id
82+
else logging.getLogger(name)
83+
)
84+
if stable_id and not any(
85+
isinstance(log_filter, StableIdFilter) for log_filter in logger.filters
86+
):
87+
logger.addFilter(StableIdFilter(stable_id))
88+
return logger

0 commit comments

Comments
 (0)