Skip to content

Commit 3fccb65

Browse files
authored
Merge pull request #759 from Open-EO/747-robustranged-download-support
747 robustranged download support
2 parents c6bc1c9 + 2816c98 commit 3fccb65

File tree

6 files changed

+139
-22
lines changed

6 files changed

+139
-22
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3131
- `sar_backscatter`: try to retrieve coefficient options from backend ([#693](https://github.com/Open-EO/openeo-python-client/issues/693))
3232
- Improve error message when OIDC provider is unavailable ([#751](https://github.com/Open-EO/openeo-python-client/issues/751))
3333
- Added `on_response_headers` argument to `DataCube.download()` and related to handle (e.g. `print`) the response headers ([#560](https://github.com/Open-EO/openeo-python-client/issues/560))
34+
- Added more robust ranged download for large job result files (if supported by the server) ([#747](https://github.com/Open-EO/openeo-python-client/issues/747))
3435

3536
### Changed
3637

@@ -40,7 +41,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4041

4142
- `STACAPIJobDatabase.get_by_status()` now always returns a `pandas.DataFrame` with an index compatible with `MultiBackendJobManager`. ([#707](https://github.com/Open-EO/openeo-python-client/issues/707))
4243

43-
4444
## [0.39.1] - 2025-02-26
4545

4646
### Fixed

openeo/rest/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
# TODO: get from config file
66
DEFAULT_DOWNLOAD_CHUNK_SIZE = 10_000_000 # 10MB
7-
7+
DEFAULT_DOWNLOAD_RANGE_SIZE = 500_000_000 # 500MB
88

99
DEFAULT_JOB_STATUS_POLL_INTERVAL_MAX = 60
1010
DEFAULT_JOB_STATUS_POLL_CONNECTION_RETRY_INTERVAL = 30

openeo/rest/_connection.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,24 @@ def get(
194194
"""
195195
return self.request("get", path=path, params=params, stream=stream, auth=auth, **kwargs)
196196

197+
def head(
198+
self,
199+
path: str,
200+
*,
201+
params: Optional[dict] = None,
202+
auth: Optional[AuthBase] = None,
203+
**kwargs,
204+
) -> Response:
205+
"""
206+
Do HEAD request to REST API.
207+
208+
:param path: API path (without root url)
209+
:param params: Additional query parameters
210+
:param auth: optional custom authentication to use instead of the default one
211+
:return: response: Response
212+
"""
213+
return self.request("head", path=path, params=params, auth=auth, **kwargs)
214+
197215
def post(self, path: str, json: Optional[dict] = None, **kwargs) -> Response:
198216
"""
199217
Do POST request to REST API.

openeo/rest/_testing.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ def __init__(
103103
requests_mock.delete(
104104
re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_delete_job_results
105105
)
106+
requests_mock.head(
107+
re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")),
108+
headers={"Content-Length": "666"}
109+
)
106110
requests_mock.get(
107111
re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")),
108112
content=self._handle_get_job_result_asset,

openeo/rest/job.py

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,11 @@
1111
import requests
1212

1313
from openeo.internal.documentation import openeo_endpoint
14-
from openeo.internal.jupyter import (
15-
VisualDict,
16-
VisualList,
17-
render_component,
18-
render_error,
19-
)
14+
from openeo.internal.jupyter import VisualDict, render_component, render_error
2015
from openeo.internal.warnings import deprecated, legacy_alias
2116
from openeo.rest import (
2217
DEFAULT_DOWNLOAD_CHUNK_SIZE,
18+
DEFAULT_DOWNLOAD_RANGE_SIZE,
2319
DEFAULT_JOB_STATUS_POLL_CONNECTION_RETRY_INTERVAL,
2420
DEFAULT_JOB_STATUS_POLL_INTERVAL_MAX,
2521
DEFAULT_JOB_STATUS_POLL_SOFT_ERROR_MAX,
@@ -29,7 +25,7 @@
2925
OpenEoClientException,
3026
)
3127
from openeo.rest.models.general import LogsResponse
32-
from openeo.rest.models.logs import LogEntry, log_level_name, normalize_log_level
28+
from openeo.rest.models.logs import log_level_name
3329
from openeo.util import ensure_dir
3430

3531
if typing.TYPE_CHECKING:
@@ -40,7 +36,8 @@
4036

4137

4238
DEFAULT_JOB_RESULTS_FILENAME = "job-results.json"
43-
39+
MAX_RETRIES_PER_RANGE = 3
40+
RETRIABLE_STATUSCODES = [408, 429, 500, 501, 502, 503, 504]
4441

4542
class BatchJob:
4643
"""
@@ -387,7 +384,7 @@ def __repr__(self):
387384
)
388385

389386
def download(
390-
self, target: Optional[Union[Path, str]] = None, *, chunk_size: int = DEFAULT_DOWNLOAD_CHUNK_SIZE
387+
self, target: Optional[Union[Path, str]] = None, *, chunk_size: int = DEFAULT_DOWNLOAD_CHUNK_SIZE, range_size: int=DEFAULT_DOWNLOAD_RANGE_SIZE
391388
) -> Path:
392389
"""
393390
Download asset to given location
@@ -402,10 +399,7 @@ def download(
402399
target = target / self.name
403400
ensure_dir(target.parent)
404401
logger.info("Downloading Job result asset {n!r} from {h!s} to {t!s}".format(n=self.name, h=self.href, t=target))
405-
response = self._get_response(stream=True)
406-
with target.open("wb") as f:
407-
for block in response.iter_content(chunk_size=chunk_size):
408-
f.write(block)
402+
self._download_to_file(url=self.href, target=target, chunk_size=chunk_size, range_size=range_size)
409403
return target
410404

411405
def _get_response(self, stream=True) -> requests.Response:
@@ -424,6 +418,45 @@ def load_bytes(self) -> bytes:
424418
# TODO: more `load` methods e.g.: load GTiff asset directly as numpy array
425419

426420

421+
def _download_to_file(self, url: str, target: Path, *, chunk_size: int=DEFAULT_DOWNLOAD_CHUNK_SIZE, range_size: int=DEFAULT_DOWNLOAD_RANGE_SIZE):
422+
head = self.job.connection.head(url, stream=True)
423+
if head.ok and head.headers.get("Accept-Ranges") == "bytes" and 'Content-Length' in head.headers:
424+
file_size = int(head.headers['Content-Length'])
425+
self._download_ranged(url=url, target=target, file_size=file_size, chunk_size=chunk_size, range_size=range_size)
426+
else:
427+
self._download_all_at_once(url=url, target=target, chunk_size=chunk_size)
428+
429+
430+
def _download_ranged(self, url: str, target: Path, file_size: int, *, chunk_size: int=DEFAULT_DOWNLOAD_CHUNK_SIZE, range_size: int=DEFAULT_DOWNLOAD_RANGE_SIZE):
431+
with target.open('wb') as f:
432+
for from_byte_index in range(0, file_size, range_size):
433+
to_byte_index = min(from_byte_index + range_size - 1, file_size - 1)
434+
tries_left = MAX_RETRIES_PER_RANGE
435+
while tries_left > 0:
436+
try:
437+
range_headers = {"Range": f"bytes={from_byte_index}-{to_byte_index}"}
438+
with self.job.connection.get(path=url, headers=range_headers, stream=True) as r:
439+
r.raise_for_status()
440+
for block in r.iter_content(chunk_size=chunk_size):
441+
f.write(block)
442+
break
443+
except OpenEoApiPlainError as error:
444+
tries_left -= 1
445+
if tries_left > 0 and error.http_status_code in RETRIABLE_STATUSCODES:
446+
logger.warning(f"Failed to retrieve chunk {from_byte_index}-{to_byte_index} from {url} (status {error.http_status_code}) - retrying")
447+
continue
448+
else:
449+
raise error
450+
451+
452+
def _download_all_at_once(self, url: str, target: Path, *, chunk_size: int=DEFAULT_DOWNLOAD_CHUNK_SIZE):
453+
with self.job.connection.get(path=url, stream=True) as r:
454+
r.raise_for_status()
455+
with target.open("wb") as f:
456+
for block in r.iter_content(chunk_size=chunk_size):
457+
f.write(block)
458+
459+
427460
class MultipleAssetException(OpenEoClientException):
428461
pass
429462

@@ -501,7 +534,7 @@ def get_asset(self, name: str = None) -> ResultAsset:
501534
"No asset {n!r} in: {a}".format(n=name, a=[a.name for a in assets])
502535
)
503536

504-
def download_file(self, target: Union[Path, str] = None, name: str = None) -> Path:
537+
def download_file(self, target: Union[Path, str] = None, name: str = None, *, chunk_size=DEFAULT_DOWNLOAD_CHUNK_SIZE, range_size: int=DEFAULT_DOWNLOAD_RANGE_SIZE) -> Path:
505538
"""
506539
Download single asset. Can be used when there is only one asset in the
507540
:py:class:`JobResults`, or when the desired asset name is given explicitly.
@@ -513,12 +546,12 @@ def download_file(self, target: Union[Path, str] = None, name: str = None) -> Pa
513546
:return: path of downloaded asset
514547
"""
515548
try:
516-
return self.get_asset(name=name).download(target=target)
549+
return self.get_asset(name=name).download(target=target, chunk_size=chunk_size, range_size=range_size)
517550
except MultipleAssetException:
518551
raise OpenEoClientException(
519552
"Can not use `download_file` with multiple assets. Use `download_files` instead.")
520553

521-
def download_files(self, target: Union[Path, str] = None, include_stac_metadata: bool = True) -> List[Path]:
554+
def download_files(self, target: Union[Path, str] = None, include_stac_metadata: bool = True, chunk_size=DEFAULT_DOWNLOAD_CHUNK_SIZE) -> List[Path]:
522555
"""
523556
Download all assets to given folder.
524557
@@ -531,7 +564,7 @@ def download_files(self, target: Union[Path, str] = None, include_stac_metadata:
531564
raise OpenEoClientException(f"Target argument {target} exists but isn't a folder.")
532565
ensure_dir(target)
533566

534-
downloaded = [a.download(target) for a in self.get_assets()]
567+
downloaded = [a.download(target, chunk_size=chunk_size) for a in self.get_assets()]
535568

536569
if include_stac_metadata:
537570
# TODO #184: convention for metadata file name?

tests/rest/test_job.py

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818

1919
API_URL = "https://oeo.test"
2020

21-
TIFF_CONTENT = b'T1f7D6t6l0l' * 1000
22-
23-
21+
TIFF_CONTENT = b'T1f7D6t6l0l' * 10000
2422

2523
@pytest.fixture
2624
def con100(requests_mock):
@@ -74,7 +72,9 @@ def test_execute_batch(con100, requests_mock, tmpdir):
7472
}
7573
},
7674
)
75+
requests_mock.head(API_URL + "/jobs/f00ba5/files/output.tiff", headers={"Content-Length": str(len("tiffdata"))})
7776
requests_mock.get(API_URL + "/jobs/f00ba5/files/output.tiff", text="tiffdata")
77+
7878
requests_mock.get(API_URL + "/jobs/f00ba5/logs", json={'logs': []})
7979

8080
path = tmpdir.join("tmp.tiff")
@@ -231,6 +231,7 @@ def test_execute_batch_with_soft_errors(con100, requests_mock, tmpdir, error_res
231231
}
232232
},
233233
)
234+
requests_mock.head(API_URL + "/jobs/f00ba5/files/output.tiff", headers={"Content-Length": str(len("tiffdata"))})
234235
requests_mock.get(API_URL + "/jobs/f00ba5/files/output.tiff", text="tiffdata")
235236
requests_mock.get(API_URL + "/jobs/f00ba5/logs", json={'logs': []})
236237

@@ -536,11 +537,57 @@ def job_with_1_asset(con100, requests_mock, tmp_path) -> BatchJob:
536537
requests_mock.get(API_URL + "/jobs/jj1/results", json={"assets": {
537538
"1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff; application=geotiff"},
538539
}})
540+
requests_mock.head(API_URL + "/dl/jjr1.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}"})
539541
requests_mock.get(API_URL + "/dl/jjr1.tiff", content=TIFF_CONTENT)
542+
543+
job = BatchJob("jj1", connection=con100)
544+
return job
545+
546+
@pytest.fixture
547+
def job_with_chunked_asset_using_head(con100, requests_mock, tmp_path) -> BatchJob:
548+
def handle_content(request, context):
549+
range = request.headers.get("Range")
550+
assert range
551+
search = re.search(r"bytes=(\d+)-(\d+)", range)
552+
assert search
553+
from_bytes = int(search.group(1))
554+
to_bytes = int(search.group(2))
555+
assert from_bytes < to_bytes
556+
return TIFF_CONTENT[from_bytes : to_bytes + 1]
557+
558+
requests_mock.get(
559+
API_URL + "/jobs/jj1/results",
560+
json={
561+
"assets": {
562+
"1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff; application=geotiff"},
563+
}
564+
},
565+
)
566+
requests_mock.head(
567+
API_URL + "/dl/jjr1.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}", "Accept-Ranges": "bytes"}
568+
)
569+
requests_mock.get(API_URL + "/dl/jjr1.tiff", content=handle_content)
540570
job = BatchJob("jj1", connection=con100)
541571
return job
542572

543573

574+
@pytest.fixture
575+
def job_with_chunked_asset_using_head_old(con100, requests_mock, tmp_path) -> BatchJob:
576+
requests_mock.get(API_URL + "/jobs/jj1/results", json={"assets": {
577+
"1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff; application=geotiff"},
578+
}})
579+
requests_mock.head(API_URL + "/dl/jjr1.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}", "Accept-Ranges": "bytes"})
580+
581+
chunk_size = 1000
582+
for r in range(0, len(TIFF_CONTENT), chunk_size):
583+
from_bytes = r
584+
to_bytes = min(r + chunk_size, len(TIFF_CONTENT)) - 1
585+
requests_mock.get(API_URL + "/dl/jjr1.tiff", request_headers={"Range": f"bytes={from_bytes}-{to_bytes}"},
586+
response_list = [{"status_code": 500, "text": "Server error"},
587+
{"status_code": 206, "content": TIFF_CONTENT[from_bytes:to_bytes+1]}])
588+
job = BatchJob("jj1", connection=con100)
589+
return job
590+
544591
@pytest.fixture
545592
def job_with_2_assets(con100, requests_mock, tmp_path) -> BatchJob:
546593
requests_mock.get(API_URL + "/jobs/jj2/results", json={
@@ -551,8 +598,11 @@ def job_with_2_assets(con100, requests_mock, tmp_path) -> BatchJob:
551598
"2.tiff": {"href": API_URL + "/dl/jjr2.tiff", "type": "image/tiff; application=geotiff"},
552599
}
553600
})
601+
requests_mock.head(API_URL + "/dl/jjr1.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}"})
554602
requests_mock.get(API_URL + "/dl/jjr1.tiff", content=TIFF_CONTENT)
603+
requests_mock.head(API_URL + "/dl/jjr2.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}"})
555604
requests_mock.get(API_URL + "/dl/jjr2.tiff", content=TIFF_CONTENT)
605+
556606
job = BatchJob("jj2", connection=con100)
557607
return job
558608

@@ -574,6 +624,13 @@ def test_get_results_download_file(job_with_1_asset: BatchJob, tmp_path):
574624
with target.open("rb") as f:
575625
assert f.read() == TIFF_CONTENT
576626

627+
def test_get_results_download_file_ranged(job_with_chunked_asset_using_head: BatchJob, tmp_path):
628+
job = job_with_chunked_asset_using_head
629+
target = tmp_path / "result.tiff"
630+
res = job.get_results().download_file(target, range_size=1000)
631+
assert res == target
632+
with target.open("rb") as f:
633+
assert f.read() == TIFF_CONTENT
577634

578635
def test_download_result_folder(job_with_1_asset: BatchJob, tmp_path):
579636
job = job_with_1_asset
@@ -714,6 +771,7 @@ def test_get_results_download_files_include_stac_metadata(
714771

715772
def test_result_asset_download_file(con100, requests_mock, tmp_path):
716773
href = API_URL + "/dl/jjr1.tiff"
774+
requests_mock.head(href, headers={"Content-Length": f"{len(TIFF_CONTENT)}"})
717775
requests_mock.get(href, content=TIFF_CONTENT)
718776

719777
job = BatchJob("jj", connection=con100)
@@ -729,6 +787,7 @@ def test_result_asset_download_file(con100, requests_mock, tmp_path):
729787

730788
def test_result_asset_download_file_error(con100, requests_mock, tmp_path):
731789
href = API_URL + "/dl/jjr1.tiff"
790+
requests_mock.head(href, status_code=500, text="Nope!")
732791
requests_mock.get(href, status_code=500, text="Nope!")
733792

734793
job = BatchJob("jj", connection=con100)
@@ -743,6 +802,7 @@ def test_result_asset_download_file_error(con100, requests_mock, tmp_path):
743802

744803
def test_result_asset_download_folder(con100, requests_mock, tmp_path):
745804
href = API_URL + "/dl/jjr1.tiff"
805+
requests_mock.head(href, headers={"Content-Length": f"{len(TIFF_CONTENT)}"})
746806
requests_mock.get(href, content=TIFF_CONTENT)
747807

748808
job = BatchJob("jj", connection=con100)
@@ -770,6 +830,7 @@ def test_result_asset_load_json(con100, requests_mock):
770830

771831
def test_result_asset_load_bytes(con100, requests_mock):
772832
href = API_URL + "/dl/jjr1.tiff"
833+
requests_mock.head(href, headers={"Content-Length": f"{len(TIFF_CONTENT)}"})
773834
requests_mock.get(href, content=TIFF_CONTENT)
774835

775836
job = BatchJob("jj", connection=con100)
@@ -797,6 +858,7 @@ def download_tiff(request, context):
797858
return TIFF_CONTENT
798859

799860
requests_mock.get(API_URL + "/jobs/jj1/results", json=get_results)
861+
requests_mock.head("https://evilcorp.test/dl/jjr1.tiff", headers={"Content-Length": "666"})
800862
requests_mock.get("https://evilcorp.test/dl/jjr1.tiff", content=download_tiff)
801863

802864
con100.authenticate_basic("john", "j0hn")

0 commit comments

Comments
 (0)