Skip to content

Commit 241a019

Browse files
author
dsamaey
committed
Issue #747 add chunked downloading (get bytes 0-0 based)
1 parent 8082083 commit 241a019

File tree

2 files changed

+82
-41
lines changed

2 files changed

+82
-41
lines changed

openeo/rest/job.py

+49-37
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@
33
import datetime
44
import json
55
import logging
6+
import re
67
import time
78
import typing
8-
import urllib.error
99
from pathlib import Path
1010
from typing import Dict, List, Optional, Union
11-
from urllib.error import HTTPError
1211

1312
import requests
1413
import shutil
@@ -35,8 +34,6 @@
3534
from openeo.rest.models.logs import LogEntry, log_level_name, normalize_log_level
3635
from openeo.util import ensure_dir
3736

38-
MAX_RETRIES_DOWNLOAD = 3
39-
4037
if typing.TYPE_CHECKING:
4138
# Imports for type checking only (circular import issue at runtime).
4239
from openeo.rest.connection import Connection
@@ -45,7 +42,8 @@
4542

4643

4744
DEFAULT_JOB_RESULTS_FILENAME = "job-results.json"
48-
45+
MAX_RETRIES_PER_CHUNK = 3
46+
RETRIABLE_STATUSCODES = [408, 429, 500, 501, 502, 503, 504]
4947

5048
class BatchJob:
5149
"""
@@ -407,40 +405,9 @@ def download(
407405
target = target / self.name
408406
ensure_dir(target.parent)
409407
logger.info("Downloading Job result asset {n!r} from {h!s} to {t!s}".format(n=self.name, h=self.href, t=target))
410-
self._download_chunked(target, chunk_size)
408+
_download_chunked(self.href, target, chunk_size)
411409
return target
412410

413-
def _download_chunked(self, target: Path, chunk_size: int):
414-
file_size = None
415-
try:
416-
head = requests.head(self.href, stream=True)
417-
if head.ok:
418-
file_size = int(head.headers['Content-Length'])
419-
else:
420-
head.raise_for_status()
421-
with target.open('wb') as f:
422-
for from_byte_index in range(0, file_size, chunk_size):
423-
to_byte_index = min(from_byte_index + chunk_size - 1, file_size - 1)
424-
tries_left = MAX_RETRIES_DOWNLOAD
425-
while tries_left > 0:
426-
try:
427-
range_headers = {"Range": f"bytes={from_byte_index}-{to_byte_index}"}
428-
with requests.get(self.href, headers=range_headers, stream=True) as r:
429-
if r.ok:
430-
shutil.copyfileobj(r.raw, f)
431-
break
432-
else:
433-
r.raise_for_status()
434-
except requests.exceptions.HTTPError as error:
435-
tries_left -= 1
436-
if tries_left < 1:
437-
raise error
438-
else:
439-
logger.warning(f"Failed to retrieve chunk {from_byte_index}-{to_byte_index} from {self.href} (status {error.response.status_code}) - retrying")
440-
continue
441-
except requests.exceptions.HTTPError as http_error:
442-
raise OpenEoApiPlainError(message=f"Failed to download {self.href}", http_status_code=http_error.response.status_code, error_message=http_error.response.text)
443-
444411
def _get_response(self, stream=True) -> requests.Response:
445412
return self.job.connection.get(self.href, stream=stream)
446413

@@ -457,6 +424,51 @@ def load_bytes(self) -> bytes:
457424
# TODO: more `load` methods e.g.: load GTiff asset directly as numpy array
458425

459426

427+
def _download_chunked(url: str, target: Path, chunk_size: int):
428+
try:
429+
file_size = _determine_content_length(url)
430+
with target.open('wb') as f:
431+
for from_byte_index in range(0, file_size, chunk_size):
432+
to_byte_index = min(from_byte_index + chunk_size - 1, file_size - 1)
433+
tries_left = MAX_RETRIES_PER_CHUNK
434+
while tries_left > 0:
435+
try:
436+
range_headers = {"Range": f"bytes={from_byte_index}-{to_byte_index}"}
437+
with requests.get(url, headers=range_headers, stream=True) as r:
438+
if r.ok:
439+
shutil.copyfileobj(r.raw, f)
440+
break
441+
else:
442+
r.raise_for_status()
443+
except requests.exceptions.HTTPError as error:
444+
tries_left -= 1
445+
if tries_left > 0 and error.response.status_code in RETRIABLE_STATUSCODES:
446+
logger.warning(f"Failed to retrieve chunk {from_byte_index}-{to_byte_index} from {url} (status {error.response.status_code}) - retrying")
447+
continue
448+
else:
449+
raise error
450+
except requests.exceptions.HTTPError as http_error:
451+
raise OpenEoApiPlainError(message=f"Failed to download {url}", http_status_code=http_error.response.status_code, error_message=http_error.response.text)
452+
453+
454+
def _determine_content_length(url: str) -> int:
455+
range_0_0_response = requests.get(url, headers={"Range": f"bytes=0-0"})
456+
if range_0_0_response.status_code == 206:
457+
content_range_header = range_0_0_response.headers.get("Content-Range")
458+
match = re.match(r"^bytes \d+-\d+/(\d+)$", content_range_header)
459+
if match:
460+
return int(match.group(1))
461+
462+
content_range_prefix = "bytes 0-0/"
463+
if content_range_header.startswith(content_range_prefix):
464+
return int(content_range_header[len(content_range_prefix):])
465+
head = requests.head(url, stream=True)
466+
if head.ok:
467+
return int(head.headers['Content-Length'])
468+
else:
469+
head.raise_for_status()
470+
471+
460472
class MultipleAssetException(OpenEoClientException):
461473
pass
462474

tests/rest/test_job.py

+33-4
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ def job_with_1_asset(con100, requests_mock, tmp_path) -> BatchJob:
543543
return job
544544

545545
@pytest.fixture
546-
def job_with_chunked_asset(con100, requests_mock, tmp_path) -> BatchJob:
546+
def job_with_chunked_asset_using_head(con100, requests_mock, tmp_path) -> BatchJob:
547547
requests_mock.get(API_URL + "/jobs/jj1/results", json={"assets": {
548548
"1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff; application=geotiff"},
549549
}})
@@ -554,8 +554,29 @@ def job_with_chunked_asset(con100, requests_mock, tmp_path) -> BatchJob:
554554
from_bytes = r
555555
to_bytes = min(r + chunk_size, len(TIFF_CONTENT)) - 1
556556
# fail the 1st time, serve the content chunk the 2nd time
557+
requests_mock.get(API_URL + "/dl/jjr1.tiff", request_headers={"Range": "bytes=0-0"},
558+
response_list=[{"status_code": 404, "text": "Not found"}])
557559
requests_mock.get(API_URL + "/dl/jjr1.tiff", request_headers={"Range": f"bytes={from_bytes}-{to_bytes}"},
558-
response_list = [{"status_code": 500, "text": "Server error"}, {"content": TIFF_CONTENT[from_bytes:to_bytes+1]}])
560+
response_list = [{"status_code": 500, "text": "Server error"},
561+
{"status_code": 206, "content": TIFF_CONTENT[from_bytes:to_bytes+1]}])
562+
job = BatchJob("jj1", connection=con100)
563+
return job
564+
565+
@pytest.fixture
566+
def job_with_chunked_asset_using_get_0_0(con100, requests_mock, tmp_path) -> BatchJob:
567+
requests_mock.get(API_URL + "/jobs/jj1/results", json={"assets": {
568+
"1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff; application=geotiff"},
569+
}})
570+
requests_mock.get(API_URL + "/dl/jjr1.tiff", request_headers={"Range": "bytes=0-0"},
571+
response_list=[{"status_code": 206, "text": "", "headers": {"Content-Range": f"bytes 0-0/{len(TIFF_CONTENT)}"}}])
572+
chunk_size = 1000
573+
for r in range(0, len(TIFF_CONTENT), chunk_size):
574+
from_bytes = r
575+
to_bytes = min(r + chunk_size, len(TIFF_CONTENT)) - 1
576+
# fail the 1st time, serve the content chunk the 2nd time
577+
requests_mock.get(API_URL + "/dl/jjr1.tiff", request_headers={"Range": f"bytes={from_bytes}-{to_bytes}"},
578+
response_list = [{"status_code": 408, "text": "Server error"},
579+
{"status_code": 206, "content": TIFF_CONTENT[from_bytes:to_bytes+1]}])
559580
job = BatchJob("jj1", connection=con100)
560581
return job
561582

@@ -595,8 +616,16 @@ def test_get_results_download_file(job_with_1_asset: BatchJob, tmp_path):
595616
with target.open("rb") as f:
596617
assert f.read() == TIFF_CONTENT
597618

598-
def test_get_results_download_chunked_file(job_with_chunked_asset: BatchJob, tmp_path):
599-
job = job_with_chunked_asset
619+
def test_get_results_download_chunked_file_using_get_0_0(job_with_chunked_asset_using_get_0_0: BatchJob, tmp_path):
620+
job = job_with_chunked_asset_using_get_0_0
621+
target = tmp_path / "result.tiff"
622+
res = job.get_results().download_file(target, chunk_size=1000)
623+
assert res == target
624+
with target.open("rb") as f:
625+
assert f.read() == TIFF_CONTENT
626+
627+
def test_get_results_download_chunked_file_using_head(job_with_chunked_asset_using_head: BatchJob, tmp_path):
628+
job = job_with_chunked_asset_using_head
600629
target = tmp_path / "result.tiff"
601630
res = job.get_results().download_file(target, chunk_size=1000)
602631
assert res == target

0 commit comments

Comments
 (0)