Skip to content

Commit 446edb0

Browse files
author
dsamaey
committed
Issue #747 refactored job result download code
1 parent 241a019 commit 446edb0

File tree

4 files changed

+46
-67
lines changed

4 files changed

+46
-67
lines changed

CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212
- `sar_backscatter`: try to retrieve coefficient options from backend ([#693](https://github.com/Open-EO/openeo-python-client/issues/693))
1313
- Improve error message when OIDC provider is unavailable ([#751](https://github.com/Open-EO/openeo-python-client/issues/751))
1414
- Added `on_response_headers` argument to `DataCube.download()` and related to handle (e.g. `print`) the response headers ([#560](https://github.com/Open-EO/openeo-python-client/issues/560))
15+
- Added more robust download for large job result files (if supported by the server)
1516

1617
### Changed
1718

@@ -21,7 +22,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2122

2223
- `STACAPIJobDatabase.get_by_status()` now always returns a `pandas.DataFrame` with an index compatible with `MultiBackendJobManager`. ([#707](https://github.com/Open-EO/openeo-python-client/issues/707))
2324

24-
2525
## [0.39.1] - 2025-02-26
2626

2727
### Fixed

openeo/rest/_connection.py

+18
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,24 @@ def get(
194194
"""
195195
return self.request("get", path=path, params=params, stream=stream, auth=auth, **kwargs)
196196

197+
def head(
198+
self,
199+
path: str,
200+
*,
201+
params: Optional[dict] = None,
202+
auth: Optional[AuthBase] = None,
203+
**kwargs,
204+
) -> Response:
205+
"""
206+
Do HEAD request to REST API.
207+
208+
:param path: API path (without root url)
209+
:param params: Additional query parameters
210+
:param auth: optional custom authentication to use instead of the default one
211+
:return: response: Response
212+
"""
213+
return self.request("get", path=path, params=params, auth=auth, **kwargs)
214+
197215
def post(self, path: str, json: Optional[dict] = None, **kwargs) -> Response:
198216
"""
199217
Do POST request to REST API.

openeo/rest/job.py

+26-36
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import datetime
44
import json
55
import logging
6-
import re
76
import time
87
import typing
98
from pathlib import Path
@@ -15,7 +14,6 @@
1514
from openeo.internal.documentation import openeo_endpoint
1615
from openeo.internal.jupyter import (
1716
VisualDict,
18-
VisualList,
1917
render_component,
2018
render_error,
2119
)
@@ -31,7 +29,7 @@
3129
OpenEoClientException,
3230
)
3331
from openeo.rest.models.general import LogsResponse
34-
from openeo.rest.models.logs import LogEntry, log_level_name, normalize_log_level
32+
from openeo.rest.models.logs import log_level_name
3533
from openeo.util import ensure_dir
3634

3735
if typing.TYPE_CHECKING:
@@ -405,7 +403,7 @@ def download(
405403
target = target / self.name
406404
ensure_dir(target.parent)
407405
logger.info("Downloading Job result asset {n!r} from {h!s} to {t!s}".format(n=self.name, h=self.href, t=target))
408-
_download_chunked(self.href, target, chunk_size)
406+
self._download_to_file(url=self.href, target=target, chunk_size=chunk_size)
409407
return target
410408

411409
def _get_response(self, stream=True) -> requests.Response:
@@ -424,49 +422,41 @@ def load_bytes(self) -> bytes:
424422
# TODO: more `load` methods e.g.: load GTiff asset directly as numpy array
425423

426424

427-
def _download_chunked(url: str, target: Path, chunk_size: int):
428-
try:
429-
file_size = _determine_content_length(url)
425+
def _download_to_file(self, url: str, target: Path, chunk_size: int):
426+
head = requests.head(url, stream=True)
427+
if head.ok and 'Accept-Ranges' in head.headers and 'bytes' in head.headers['Accept-Ranges']:
428+
file_size = int(head.headers['Content-Length'])
429+
self._download_chunked(url=url, target=target, file_size=file_size, chunk_size=chunk_size)
430+
else:
431+
self._download_unchunked(url=url, target=target)
432+
433+
434+
def _download_chunked(self, url: str, target: Path, file_size: int, chunk_size: int):
430435
with target.open('wb') as f:
431436
for from_byte_index in range(0, file_size, chunk_size):
432437
to_byte_index = min(from_byte_index + chunk_size - 1, file_size - 1)
433438
tries_left = MAX_RETRIES_PER_CHUNK
434439
while tries_left > 0:
435440
try:
436441
range_headers = {"Range": f"bytes={from_byte_index}-{to_byte_index}"}
437-
with requests.get(url, headers=range_headers, stream=True) as r:
438-
if r.ok:
439-
shutil.copyfileobj(r.raw, f)
440-
break
441-
else:
442-
r.raise_for_status()
443-
except requests.exceptions.HTTPError as error:
442+
with self.job.connection.get(path=url, headers=range_headers, stream=True) as r:
443+
r.raise_for_status()
444+
shutil.copyfileobj(r.raw, f)
445+
break
446+
except OpenEoApiPlainError as error:
444447
tries_left -= 1
445-
if tries_left > 0 and error.response.status_code in RETRIABLE_STATUSCODES:
446-
logger.warning(f"Failed to retrieve chunk {from_byte_index}-{to_byte_index} from {url} (status {error.response.status_code}) - retrying")
448+
if tries_left > 0 and error.http_status_code in RETRIABLE_STATUSCODES:
449+
logger.warning(f"Failed to retrieve chunk {from_byte_index}-{to_byte_index} from {url} (status {error.http_status_code}) - retrying")
447450
continue
448451
else:
449452
raise error
450-
except requests.exceptions.HTTPError as http_error:
451-
raise OpenEoApiPlainError(message=f"Failed to download {url}", http_status_code=http_error.response.status_code, error_message=http_error.response.text)
452-
453-
454-
def _determine_content_length(url: str) -> int:
455-
range_0_0_response = requests.get(url, headers={"Range": f"bytes=0-0"})
456-
if range_0_0_response.status_code == 206:
457-
content_range_header = range_0_0_response.headers.get("Content-Range")
458-
match = re.match(r"^bytes \d+-\d+/(\d+)$", content_range_header)
459-
if match:
460-
return int(match.group(1))
461-
462-
content_range_prefix = "bytes 0-0/"
463-
if content_range_header.startswith(content_range_prefix):
464-
return int(content_range_header[len(content_range_prefix):])
465-
head = requests.head(url, stream=True)
466-
if head.ok:
467-
return int(head.headers['Content-Length'])
468-
else:
469-
head.raise_for_status()
453+
454+
455+
def _download_unchunked(self, url: str, target: Path):
456+
with self.job.connection.get(path=url, stream=True) as r:
457+
r.raise_for_status()
458+
with target.open("wb") as f:
459+
shutil.copyfileobj(r.raw, f)
470460

471461

472462
class MultipleAssetException(OpenEoClientException):

tests/rest/test_job.py

+1-30
Original file line numberDiff line numberDiff line change
@@ -547,39 +547,18 @@ def job_with_chunked_asset_using_head(con100, requests_mock, tmp_path) -> BatchJ
547547
requests_mock.get(API_URL + "/jobs/jj1/results", json={"assets": {
548548
"1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff; application=geotiff"},
549549
}})
550-
requests_mock.head(API_URL + "/dl/jjr1.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}"})
550+
requests_mock.head(API_URL + "/dl/jjr1.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}", "Accept-Ranges": "bytes"})
551551

552552
chunk_size = 1000
553553
for r in range(0, len(TIFF_CONTENT), chunk_size):
554554
from_bytes = r
555555
to_bytes = min(r + chunk_size, len(TIFF_CONTENT)) - 1
556-
# fail the 1st time, serve the content chunk the 2nd time
557-
requests_mock.get(API_URL + "/dl/jjr1.tiff", request_headers={"Range": "bytes=0-0"},
558-
response_list=[{"status_code": 404, "text": "Not found"}])
559556
requests_mock.get(API_URL + "/dl/jjr1.tiff", request_headers={"Range": f"bytes={from_bytes}-{to_bytes}"},
560557
response_list = [{"status_code": 500, "text": "Server error"},
561558
{"status_code": 206, "content": TIFF_CONTENT[from_bytes:to_bytes+1]}])
562559
job = BatchJob("jj1", connection=con100)
563560
return job
564561

565-
@pytest.fixture
566-
def job_with_chunked_asset_using_get_0_0(con100, requests_mock, tmp_path) -> BatchJob:
567-
requests_mock.get(API_URL + "/jobs/jj1/results", json={"assets": {
568-
"1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff; application=geotiff"},
569-
}})
570-
requests_mock.get(API_URL + "/dl/jjr1.tiff", request_headers={"Range": "bytes=0-0"},
571-
response_list=[{"status_code": 206, "text": "", "headers": {"Content-Range": f"bytes 0-0/{len(TIFF_CONTENT)}"}}])
572-
chunk_size = 1000
573-
for r in range(0, len(TIFF_CONTENT), chunk_size):
574-
from_bytes = r
575-
to_bytes = min(r + chunk_size, len(TIFF_CONTENT)) - 1
576-
# fail the 1st time, serve the content chunk the 2nd time
577-
requests_mock.get(API_URL + "/dl/jjr1.tiff", request_headers={"Range": f"bytes={from_bytes}-{to_bytes}"},
578-
response_list = [{"status_code": 408, "text": "Server error"},
579-
{"status_code": 206, "content": TIFF_CONTENT[from_bytes:to_bytes+1]}])
580-
job = BatchJob("jj1", connection=con100)
581-
return job
582-
583562
@pytest.fixture
584563
def job_with_2_assets(con100, requests_mock, tmp_path) -> BatchJob:
585564
requests_mock.get(API_URL + "/jobs/jj2/results", json={
@@ -616,14 +595,6 @@ def test_get_results_download_file(job_with_1_asset: BatchJob, tmp_path):
616595
with target.open("rb") as f:
617596
assert f.read() == TIFF_CONTENT
618597

619-
def test_get_results_download_chunked_file_using_get_0_0(job_with_chunked_asset_using_get_0_0: BatchJob, tmp_path):
620-
job = job_with_chunked_asset_using_get_0_0
621-
target = tmp_path / "result.tiff"
622-
res = job.get_results().download_file(target, chunk_size=1000)
623-
assert res == target
624-
with target.open("rb") as f:
625-
assert f.read() == TIFF_CONTENT
626-
627598
def test_get_results_download_chunked_file_using_head(job_with_chunked_asset_using_head: BatchJob, tmp_path):
628599
job = job_with_chunked_asset_using_head
629600
target = tmp_path / "result.tiff"

0 commit comments

Comments
 (0)