Skip to content

Commit 8082083

Browse files
author
dsamaey
committed
Issue #747 add chunked downloading
1 parent 2ecd159 commit 8082083

File tree

3 files changed

+96
-17
lines changed

3 files changed

+96
-17
lines changed

openeo/rest/_testing.py

+4
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ def __init__(
103103
requests_mock.delete(
104104
re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_delete_job_results
105105
)
106+
requests_mock.head(
107+
re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")),
108+
headers={"Content-Length": "666"}
109+
)
106110
requests_mock.get(
107111
re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")),
108112
content=self._handle_get_job_result_asset,

openeo/rest/job.py

+41-8
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55
import logging
66
import time
77
import typing
8+
import urllib.error
89
from pathlib import Path
910
from typing import Dict, List, Optional, Union
11+
from urllib.error import HTTPError
1012

1113
import requests
14+
import shutil
1215

1316
from openeo.internal.documentation import openeo_endpoint
1417
from openeo.internal.jupyter import (
@@ -32,6 +35,8 @@
3235
from openeo.rest.models.logs import LogEntry, log_level_name, normalize_log_level
3336
from openeo.util import ensure_dir
3437

38+
MAX_RETRIES_DOWNLOAD = 3
39+
3540
if typing.TYPE_CHECKING:
3641
# Imports for type checking only (circular import issue at runtime).
3742
from openeo.rest.connection import Connection
@@ -402,12 +407,40 @@ def download(
402407
target = target / self.name
403408
ensure_dir(target.parent)
404409
logger.info("Downloading Job result asset {n!r} from {h!s} to {t!s}".format(n=self.name, h=self.href, t=target))
405-
response = self._get_response(stream=True)
406-
with target.open("wb") as f:
407-
for block in response.iter_content(chunk_size=chunk_size):
408-
f.write(block)
410+
self._download_chunked(target, chunk_size)
409411
return target
410412

413+
def _download_chunked(self, target: Path, chunk_size: int):
414+
file_size = None
415+
try:
416+
head = requests.head(self.href, stream=True)
417+
if head.ok:
418+
file_size = int(head.headers['Content-Length'])
419+
else:
420+
head.raise_for_status()
421+
with target.open('wb') as f:
422+
for from_byte_index in range(0, file_size, chunk_size):
423+
to_byte_index = min(from_byte_index + chunk_size - 1, file_size - 1)
424+
tries_left = MAX_RETRIES_DOWNLOAD
425+
while tries_left > 0:
426+
try:
427+
range_headers = {"Range": f"bytes={from_byte_index}-{to_byte_index}"}
428+
with requests.get(self.href, headers=range_headers, stream=True) as r:
429+
if r.ok:
430+
shutil.copyfileobj(r.raw, f)
431+
break
432+
else:
433+
r.raise_for_status()
434+
except requests.exceptions.HTTPError as error:
435+
tries_left -= 1
436+
if tries_left < 1:
437+
raise error
438+
else:
439+
logger.warning(f"Failed to retrieve chunk {from_byte_index}-{to_byte_index} from {self.href} (status {error.response.status_code}) - retrying")
440+
continue
441+
except requests.exceptions.HTTPError as http_error:
442+
raise OpenEoApiPlainError(message=f"Failed to download {self.href}", http_status_code=http_error.response.status_code, error_message=http_error.response.text)
443+
411444
def _get_response(self, stream=True) -> requests.Response:
412445
return self.job.connection.get(self.href, stream=stream)
413446

@@ -501,7 +534,7 @@ def get_asset(self, name: str = None) -> ResultAsset:
501534
"No asset {n!r} in: {a}".format(n=name, a=[a.name for a in assets])
502535
)
503536

504-
def download_file(self, target: Union[Path, str] = None, name: str = None) -> Path:
537+
def download_file(self, target: Union[Path, str] = None, name: str = None, chunk_size=DEFAULT_DOWNLOAD_CHUNK_SIZE) -> Path:
505538
"""
506539
Download single asset. Can be used when there is only one asset in the
507540
:py:class:`JobResults`, or when the desired asset name is given explicitly.
@@ -513,12 +546,12 @@ def download_file(self, target: Union[Path, str] = None, name: str = None) -> Pa
513546
:return: path of downloaded asset
514547
"""
515548
try:
516-
return self.get_asset(name=name).download(target=target)
549+
return self.get_asset(name=name).download(target=target, chunk_size=chunk_size)
517550
except MultipleAssetException:
518551
raise OpenEoClientException(
519552
"Can not use `download_file` with multiple assets. Use `download_files` instead.")
520553

521-
def download_files(self, target: Union[Path, str] = None, include_stac_metadata: bool = True) -> List[Path]:
554+
def download_files(self, target: Union[Path, str] = None, include_stac_metadata: bool = True, chunk_size=DEFAULT_DOWNLOAD_CHUNK_SIZE) -> List[Path]:
522555
"""
523556
Download all assets to given folder.
524557
@@ -531,7 +564,7 @@ def download_files(self, target: Union[Path, str] = None, include_stac_metadata:
531564
raise OpenEoClientException(f"Target argument {target} exists but isn't a folder.")
532565
ensure_dir(target)
533566

534-
downloaded = [a.download(target) for a in self.get_assets()]
567+
downloaded = [a.download(target, chunk_size=chunk_size) for a in self.get_assets()]
535568

536569
if include_stac_metadata:
537570
# TODO #184: convention for metadata file name?

tests/rest/test_job.py

+51-9
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import inspect
12
import itertools
23
import json
34
import logging
@@ -11,16 +12,14 @@
1112

1213
import openeo
1314
import openeo.rest.job
14-
from openeo.rest import JobFailedException, OpenEoApiPlainError, OpenEoClientException
15+
from openeo.rest import JobFailedException, OpenEoApiPlainError, OpenEoClientException, DEFAULT_DOWNLOAD_CHUNK_SIZE
1516
from openeo.rest.job import BatchJob, ResultAsset
1617
from openeo.rest.models.general import Link
1718
from openeo.rest.models.logs import LogEntry
1819

1920
API_URL = "https://oeo.test"
2021

21-
TIFF_CONTENT = b'T1f7D6t6l0l' * 1000
22-
23-
22+
TIFF_CONTENT = b'T1f7D6t6l0l' * 10000
2423

2524
@pytest.fixture
2625
def con100(requests_mock):
@@ -74,7 +73,7 @@ def test_execute_batch(con100, requests_mock, tmpdir):
7473
}
7574
},
7675
)
77-
requests_mock.get(API_URL + "/jobs/f00ba5/files/output.tiff", text="tiffdata")
76+
_mock_get_head_content(requests_mock, API_URL + "/jobs/f00ba5/files/output.tiff", "tiffdata")
7877
requests_mock.get(API_URL + "/jobs/f00ba5/logs", json={'logs': []})
7978

8079
path = tmpdir.join("tmp.tiff")
@@ -231,7 +230,8 @@ def test_execute_batch_with_soft_errors(con100, requests_mock, tmpdir, error_res
231230
}
232231
},
233232
)
234-
requests_mock.get(API_URL + "/jobs/f00ba5/files/output.tiff", text="tiffdata")
233+
_mock_get_head_content(requests_mock, API_URL + "/jobs/f00ba5/files/output.tiff", "tiffdata")
234+
# requests_mock.get(API_URL + "/jobs/f00ba5/files/output.tiff", text="tiffdata")
235235
requests_mock.get(API_URL + "/jobs/f00ba5/logs", json={'logs': []})
236236

237237
path = tmpdir.join("tmp.tiff")
@@ -536,10 +536,28 @@ def job_with_1_asset(con100, requests_mock, tmp_path) -> BatchJob:
536536
requests_mock.get(API_URL + "/jobs/jj1/results", json={"assets": {
537537
"1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff; application=geotiff"},
538538
}})
539+
requests_mock.head(API_URL + "/dl/jjr1.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}"})
539540
requests_mock.get(API_URL + "/dl/jjr1.tiff", content=TIFF_CONTENT)
541+
540542
job = BatchJob("jj1", connection=con100)
541543
return job
542544

545+
@pytest.fixture
546+
def job_with_chunked_asset(con100, requests_mock, tmp_path) -> BatchJob:
547+
requests_mock.get(API_URL + "/jobs/jj1/results", json={"assets": {
548+
"1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff; application=geotiff"},
549+
}})
550+
requests_mock.head(API_URL + "/dl/jjr1.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}"})
551+
552+
chunk_size = 1000
553+
for r in range(0, len(TIFF_CONTENT), chunk_size):
554+
from_bytes = r
555+
to_bytes = min(r + chunk_size, len(TIFF_CONTENT)) - 1
556+
# fail the 1st time, serve the content chunk the 2nd time
557+
requests_mock.get(API_URL + "/dl/jjr1.tiff", request_headers={"Range": f"bytes={from_bytes}-{to_bytes}"},
558+
response_list = [{"status_code": 500, "text": "Server error"}, {"content": TIFF_CONTENT[from_bytes:to_bytes+1]}])
559+
job = BatchJob("jj1", connection=con100)
560+
return job
543561

544562
@pytest.fixture
545563
def job_with_2_assets(con100, requests_mock, tmp_path) -> BatchJob:
@@ -551,8 +569,11 @@ def job_with_2_assets(con100, requests_mock, tmp_path) -> BatchJob:
551569
"2.tiff": {"href": API_URL + "/dl/jjr2.tiff", "type": "image/tiff; application=geotiff"},
552570
}
553571
})
572+
requests_mock.head(API_URL + "/dl/jjr1.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}"})
554573
requests_mock.get(API_URL + "/dl/jjr1.tiff", content=TIFF_CONTENT)
574+
requests_mock.head(API_URL + "/dl/jjr2.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}"})
555575
requests_mock.get(API_URL + "/dl/jjr2.tiff", content=TIFF_CONTENT)
576+
556577
job = BatchJob("jj2", connection=con100)
557578
return job
558579

@@ -574,6 +595,13 @@ def test_get_results_download_file(job_with_1_asset: BatchJob, tmp_path):
574595
with target.open("rb") as f:
575596
assert f.read() == TIFF_CONTENT
576597

598+
def test_get_results_download_chunked_file(job_with_chunked_asset: BatchJob, tmp_path):
599+
job = job_with_chunked_asset
600+
target = tmp_path / "result.tiff"
601+
res = job.get_results().download_file(target, chunk_size=1000)
602+
assert res == target
603+
with target.open("rb") as f:
604+
assert f.read() == TIFF_CONTENT
577605

578606
def test_download_result_folder(job_with_1_asset: BatchJob, tmp_path):
579607
job = job_with_1_asset
@@ -714,7 +742,7 @@ def test_get_results_download_files_include_stac_metadata(
714742

715743
def test_result_asset_download_file(con100, requests_mock, tmp_path):
716744
href = API_URL + "/dl/jjr1.tiff"
717-
requests_mock.get(href, content=TIFF_CONTENT)
745+
_mock_get_head_content(requests_mock, href, TIFF_CONTENT)
718746

719747
job = BatchJob("jj", connection=con100)
720748
asset = ResultAsset(job, name="1.tiff", href=href, metadata={'type': 'image/tiff; application=geotiff'})
@@ -729,6 +757,7 @@ def test_result_asset_download_file(con100, requests_mock, tmp_path):
729757

730758
def test_result_asset_download_file_error(con100, requests_mock, tmp_path):
731759
href = API_URL + "/dl/jjr1.tiff"
760+
requests_mock.head(href, status_code=500, text="Nope!")
732761
requests_mock.get(href, status_code=500, text="Nope!")
733762

734763
job = BatchJob("jj", connection=con100)
@@ -743,7 +772,7 @@ def test_result_asset_download_file_error(con100, requests_mock, tmp_path):
743772

744773
def test_result_asset_download_folder(con100, requests_mock, tmp_path):
745774
href = API_URL + "/dl/jjr1.tiff"
746-
requests_mock.get(href, content=TIFF_CONTENT)
775+
_mock_get_head_content(requests_mock, href, TIFF_CONTENT)
747776

748777
job = BatchJob("jj", connection=con100)
749778
asset = ResultAsset(job, name="1.tiff", href=href, metadata={"type": "image/tiff; application=geotiff"})
@@ -770,7 +799,7 @@ def test_result_asset_load_json(con100, requests_mock):
770799

771800
def test_result_asset_load_bytes(con100, requests_mock):
772801
href = API_URL + "/dl/jjr1.tiff"
773-
requests_mock.get(href, content=TIFF_CONTENT)
802+
_mock_get_head_content(requests_mock, href, TIFF_CONTENT)
774803

775804
job = BatchJob("jj", connection=con100)
776805
asset = ResultAsset(job, name="out.tiff", href=href, metadata={"type": "image/tiff; application=geotiff"})
@@ -797,6 +826,7 @@ def download_tiff(request, context):
797826
return TIFF_CONTENT
798827

799828
requests_mock.get(API_URL + "/jobs/jj1/results", json=get_results)
829+
requests_mock.head("https://evilcorp.test/dl/jjr1.tiff", headers={"Content-Length": "666"})
800830
requests_mock.get("https://evilcorp.test/dl/jjr1.tiff", content=download_tiff)
801831

802832
con100.authenticate_basic("john", "j0hn")
@@ -880,3 +910,15 @@ def get_jobs(request, context):
880910
assert jobs.links == [Link(rel="next", href="https://oeo.test/jobs?limit=2&offset=2")]
881911
assert jobs.ext_federation_missing() == ["oeob"]
882912
assert "Partial job listing: missing federation components: ['oeob']." in caplog.text
913+
914+
915+
def _mock_get_head_content(requests_mock, url: str, content):
916+
if callable(content):
917+
requests_mock.head(url, headers={"Content-Length": "666"})
918+
requests_mock.get(url, content=content)
919+
elif type(content) == str:
920+
requests_mock.head(url, headers={"Content-Length": f"{len(content)}"})
921+
requests_mock.get(url, text=content)
922+
else:
923+
requests_mock.head(url, headers={"Content-Length": f"{len(content)}"})
924+
requests_mock.get(url, content=content)

0 commit comments

Comments
 (0)