diff --git a/CHANGELOG.md b/CHANGELOG.md index d0075f2ad..66ba25578 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `sar_backscatter`: try to retrieve coefficient options from backend ([#693](https://github.com/Open-EO/openeo-python-client/issues/693)) - Improve error message when OIDC provider is unavailable ([#751](https://github.com/Open-EO/openeo-python-client/issues/751)) - Added `on_response_headers` argument to `DataCube.download()` and related to handle (e.g. `print`) the response headers ([#560](https://github.com/Open-EO/openeo-python-client/issues/560)) +- Added more robust ranged download for large job result files (if supported by the server) ([#747](https://github.com/Open-EO/openeo-python-client/issues/747)) ### Changed @@ -40,7 +41,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `STACAPIJobDatabase.get_by_status()` now always returns a `pandas.DataFrame` with an index compatible with `MultiBackendJobManager`. ([#707](https://github.com/Open-EO/openeo-python-client/issues/707)) - ## [0.39.1] - 2025-02-26 ### Fixed diff --git a/openeo/rest/__init__.py b/openeo/rest/__init__.py index fbe9e2011..37b3a8170 100644 --- a/openeo/rest/__init__.py +++ b/openeo/rest/__init__.py @@ -4,7 +4,7 @@ # TODO: get from config file DEFAULT_DOWNLOAD_CHUNK_SIZE = 10_000_000 # 10MB - +DEFAULT_DOWNLOAD_RANGE_SIZE = 500_000_000 # 500MB DEFAULT_JOB_STATUS_POLL_INTERVAL_MAX = 60 DEFAULT_JOB_STATUS_POLL_CONNECTION_RETRY_INTERVAL = 30 diff --git a/openeo/rest/_connection.py b/openeo/rest/_connection.py index dfb2fc7cc..e2e1be3cb 100644 --- a/openeo/rest/_connection.py +++ b/openeo/rest/_connection.py @@ -194,6 +194,24 @@ def get( """ return self.request("get", path=path, params=params, stream=stream, auth=auth, **kwargs) + def head( + self, + path: str, + *, + params: Optional[dict] = None, + auth: Optional[AuthBase] = None, + **kwargs, + ) -> Response: + """ + Do HEAD request to REST API. + + :param path: API path (without root url) + :param params: Additional query parameters + :param auth: optional custom authentication to use instead of the default one + :return: response: Response + """ + return self.request("head", path=path, params=params, auth=auth, **kwargs) + def post(self, path: str, json: Optional[dict] = None, **kwargs) -> Response: """ Do POST request to REST API. diff --git a/openeo/rest/_testing.py b/openeo/rest/_testing.py index f857b133c..a88eb1177 100644 --- a/openeo/rest/_testing.py +++ b/openeo/rest/_testing.py @@ -103,6 +103,10 @@ def __init__( requests_mock.delete( re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_delete_job_results ) + requests_mock.head( + re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")), + headers={"Content-Length": "666"} + ) requests_mock.get( re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")), content=self._handle_get_job_result_asset, diff --git a/openeo/rest/job.py b/openeo/rest/job.py index ad3117753..a76c7bac7 100644 --- a/openeo/rest/job.py +++ b/openeo/rest/job.py @@ -11,15 +11,11 @@ import requests from openeo.internal.documentation import openeo_endpoint -from openeo.internal.jupyter import ( - VisualDict, - VisualList, - render_component, - render_error, -) +from openeo.internal.jupyter import VisualDict, render_component, render_error from openeo.internal.warnings import deprecated, legacy_alias from openeo.rest import ( DEFAULT_DOWNLOAD_CHUNK_SIZE, + DEFAULT_DOWNLOAD_RANGE_SIZE, DEFAULT_JOB_STATUS_POLL_CONNECTION_RETRY_INTERVAL, DEFAULT_JOB_STATUS_POLL_INTERVAL_MAX, DEFAULT_JOB_STATUS_POLL_SOFT_ERROR_MAX, @@ -29,7 +25,7 @@ OpenEoClientException, ) from openeo.rest.models.general import LogsResponse -from openeo.rest.models.logs import LogEntry, log_level_name, normalize_log_level +from openeo.rest.models.logs import log_level_name from openeo.util import ensure_dir if typing.TYPE_CHECKING: @@ -40,7 +36,8 @@ DEFAULT_JOB_RESULTS_FILENAME = "job-results.json" - +MAX_RETRIES_PER_RANGE = 3 +RETRIABLE_STATUSCODES = [408, 429, 500, 501, 502, 503, 504] class BatchJob: """ @@ -387,7 +384,7 @@ def __repr__(self): ) def download( - self, target: Optional[Union[Path, str]] = None, *, chunk_size: int = DEFAULT_DOWNLOAD_CHUNK_SIZE + self, target: Optional[Union[Path, str]] = None, *, chunk_size: int = DEFAULT_DOWNLOAD_CHUNK_SIZE, range_size: int=DEFAULT_DOWNLOAD_RANGE_SIZE ) -> Path: """ Download asset to given location @@ -402,10 +399,7 @@ def download( target = target / self.name ensure_dir(target.parent) logger.info("Downloading Job result asset {n!r} from {h!s} to {t!s}".format(n=self.name, h=self.href, t=target)) - response = self._get_response(stream=True) - with target.open("wb") as f: - for block in response.iter_content(chunk_size=chunk_size): - f.write(block) + self._download_to_file(url=self.href, target=target, chunk_size=chunk_size, range_size=range_size) return target def _get_response(self, stream=True) -> requests.Response: @@ -424,6 +418,45 @@ def load_bytes(self) -> bytes: # TODO: more `load` methods e.g.: load GTiff asset directly as numpy array + def _download_to_file(self, url: str, target: Path, *, chunk_size: int=DEFAULT_DOWNLOAD_CHUNK_SIZE, range_size: int=DEFAULT_DOWNLOAD_RANGE_SIZE): + head = self.job.connection.head(url, stream=True) + if head.ok and head.headers.get("Accept-Ranges") == "bytes" and 'Content-Length' in head.headers: + file_size = int(head.headers['Content-Length']) + self._download_ranged(url=url, target=target, file_size=file_size, chunk_size=chunk_size, range_size=range_size) + else: + self._download_all_at_once(url=url, target=target, chunk_size=chunk_size) + + + def _download_ranged(self, url: str, target: Path, file_size: int, *, chunk_size: int=DEFAULT_DOWNLOAD_CHUNK_SIZE, range_size: int=DEFAULT_DOWNLOAD_RANGE_SIZE): + with target.open('wb') as f: + for from_byte_index in range(0, file_size, range_size): + to_byte_index = min(from_byte_index + range_size - 1, file_size - 1) + tries_left = MAX_RETRIES_PER_RANGE + while tries_left > 0: + try: + range_headers = {"Range": f"bytes={from_byte_index}-{to_byte_index}"} + with self.job.connection.get(path=url, headers=range_headers, stream=True) as r: + r.raise_for_status() + for block in r.iter_content(chunk_size=chunk_size): + f.write(block) + break + except OpenEoApiPlainError as error: + tries_left -= 1 + if tries_left > 0 and error.http_status_code in RETRIABLE_STATUSCODES: + logger.warning(f"Failed to retrieve chunk {from_byte_index}-{to_byte_index} from {url} (status {error.http_status_code}) - retrying") + continue + else: + raise error + + + def _download_all_at_once(self, url: str, target: Path, *, chunk_size: int=DEFAULT_DOWNLOAD_CHUNK_SIZE): + with self.job.connection.get(path=url, stream=True) as r: + r.raise_for_status() + with target.open("wb") as f: + for block in r.iter_content(chunk_size=chunk_size): + f.write(block) + + class MultipleAssetException(OpenEoClientException): pass @@ -501,7 +534,7 @@ def get_asset(self, name: str = None) -> ResultAsset: "No asset {n!r} in: {a}".format(n=name, a=[a.name for a in assets]) ) - def download_file(self, target: Union[Path, str] = None, name: str = None) -> Path: + def download_file(self, target: Union[Path, str] = None, name: str = None, *, chunk_size=DEFAULT_DOWNLOAD_CHUNK_SIZE, range_size: int=DEFAULT_DOWNLOAD_RANGE_SIZE) -> Path: """ Download single asset. Can be used when there is only one asset in the :py:class:`JobResults`, or when the desired asset name is given explicitly. @@ -513,12 +546,12 @@ def download_file(self, target: Union[Path, str] = None, name: str = None) -> Pa :return: path of downloaded asset """ try: - return self.get_asset(name=name).download(target=target) + return self.get_asset(name=name).download(target=target, chunk_size=chunk_size, range_size=range_size) except MultipleAssetException: raise OpenEoClientException( "Can not use `download_file` with multiple assets. Use `download_files` instead.") - def download_files(self, target: Union[Path, str] = None, include_stac_metadata: bool = True) -> List[Path]: + def download_files(self, target: Union[Path, str] = None, include_stac_metadata: bool = True, chunk_size=DEFAULT_DOWNLOAD_CHUNK_SIZE) -> List[Path]: """ Download all assets to given folder. @@ -531,7 +564,7 @@ def download_files(self, target: Union[Path, str] = None, include_stac_metadata: raise OpenEoClientException(f"Target argument {target} exists but isn't a folder.") ensure_dir(target) - downloaded = [a.download(target) for a in self.get_assets()] + downloaded = [a.download(target, chunk_size=chunk_size) for a in self.get_assets()] if include_stac_metadata: # TODO #184: convention for metadata file name? diff --git a/tests/rest/test_job.py b/tests/rest/test_job.py index dc51e4653..bea48d717 100644 --- a/tests/rest/test_job.py +++ b/tests/rest/test_job.py @@ -18,9 +18,7 @@ API_URL = "https://oeo.test" -TIFF_CONTENT = b'T1f7D6t6l0l' * 1000 - - +TIFF_CONTENT = b'T1f7D6t6l0l' * 10000 @pytest.fixture def con100(requests_mock): @@ -74,7 +72,9 @@ def test_execute_batch(con100, requests_mock, tmpdir): } }, ) + requests_mock.head(API_URL + "/jobs/f00ba5/files/output.tiff", headers={"Content-Length": str(len("tiffdata"))}) requests_mock.get(API_URL + "/jobs/f00ba5/files/output.tiff", text="tiffdata") + requests_mock.get(API_URL + "/jobs/f00ba5/logs", json={'logs': []}) path = tmpdir.join("tmp.tiff") @@ -231,6 +231,7 @@ def test_execute_batch_with_soft_errors(con100, requests_mock, tmpdir, error_res } }, ) + requests_mock.head(API_URL + "/jobs/f00ba5/files/output.tiff", headers={"Content-Length": str(len("tiffdata"))}) requests_mock.get(API_URL + "/jobs/f00ba5/files/output.tiff", text="tiffdata") requests_mock.get(API_URL + "/jobs/f00ba5/logs", json={'logs': []}) @@ -536,11 +537,57 @@ def job_with_1_asset(con100, requests_mock, tmp_path) -> BatchJob: requests_mock.get(API_URL + "/jobs/jj1/results", json={"assets": { "1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff; application=geotiff"}, }}) + requests_mock.head(API_URL + "/dl/jjr1.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}"}) requests_mock.get(API_URL + "/dl/jjr1.tiff", content=TIFF_CONTENT) + + job = BatchJob("jj1", connection=con100) + return job + +@pytest.fixture +def job_with_chunked_asset_using_head(con100, requests_mock, tmp_path) -> BatchJob: + def handle_content(request, context): + range = request.headers.get("Range") + assert range + search = re.search(r"bytes=(\d+)-(\d+)", range) + assert search + from_bytes = int(search.group(1)) + to_bytes = int(search.group(2)) + assert from_bytes < to_bytes + return TIFF_CONTENT[from_bytes : to_bytes + 1] + + requests_mock.get( + API_URL + "/jobs/jj1/results", + json={ + "assets": { + "1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff; application=geotiff"}, + } + }, + ) + requests_mock.head( + API_URL + "/dl/jjr1.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}", "Accept-Ranges": "bytes"} + ) + requests_mock.get(API_URL + "/dl/jjr1.tiff", content=handle_content) job = BatchJob("jj1", connection=con100) return job +@pytest.fixture +def job_with_chunked_asset_using_head_old(con100, requests_mock, tmp_path) -> BatchJob: + requests_mock.get(API_URL + "/jobs/jj1/results", json={"assets": { + "1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff; application=geotiff"}, + }}) + requests_mock.head(API_URL + "/dl/jjr1.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}", "Accept-Ranges": "bytes"}) + + chunk_size = 1000 + for r in range(0, len(TIFF_CONTENT), chunk_size): + from_bytes = r + to_bytes = min(r + chunk_size, len(TIFF_CONTENT)) - 1 + requests_mock.get(API_URL + "/dl/jjr1.tiff", request_headers={"Range": f"bytes={from_bytes}-{to_bytes}"}, + response_list = [{"status_code": 500, "text": "Server error"}, + {"status_code": 206, "content": TIFF_CONTENT[from_bytes:to_bytes+1]}]) + job = BatchJob("jj1", connection=con100) + return job + @pytest.fixture def job_with_2_assets(con100, requests_mock, tmp_path) -> BatchJob: requests_mock.get(API_URL + "/jobs/jj2/results", json={ @@ -551,8 +598,11 @@ def job_with_2_assets(con100, requests_mock, tmp_path) -> BatchJob: "2.tiff": {"href": API_URL + "/dl/jjr2.tiff", "type": "image/tiff; application=geotiff"}, } }) + requests_mock.head(API_URL + "/dl/jjr1.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}"}) requests_mock.get(API_URL + "/dl/jjr1.tiff", content=TIFF_CONTENT) + requests_mock.head(API_URL + "/dl/jjr2.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}"}) requests_mock.get(API_URL + "/dl/jjr2.tiff", content=TIFF_CONTENT) + job = BatchJob("jj2", connection=con100) return job @@ -574,6 +624,13 @@ def test_get_results_download_file(job_with_1_asset: BatchJob, tmp_path): with target.open("rb") as f: assert f.read() == TIFF_CONTENT +def test_get_results_download_file_ranged(job_with_chunked_asset_using_head: BatchJob, tmp_path): + job = job_with_chunked_asset_using_head + target = tmp_path / "result.tiff" + res = job.get_results().download_file(target, range_size=1000) + assert res == target + with target.open("rb") as f: + assert f.read() == TIFF_CONTENT def test_download_result_folder(job_with_1_asset: BatchJob, tmp_path): job = job_with_1_asset @@ -714,6 +771,7 @@ def test_get_results_download_files_include_stac_metadata( def test_result_asset_download_file(con100, requests_mock, tmp_path): href = API_URL + "/dl/jjr1.tiff" + requests_mock.head(href, headers={"Content-Length": f"{len(TIFF_CONTENT)}"}) requests_mock.get(href, content=TIFF_CONTENT) job = BatchJob("jj", connection=con100) @@ -729,6 +787,7 @@ def test_result_asset_download_file(con100, requests_mock, tmp_path): def test_result_asset_download_file_error(con100, requests_mock, tmp_path): href = API_URL + "/dl/jjr1.tiff" + requests_mock.head(href, status_code=500, text="Nope!") requests_mock.get(href, status_code=500, text="Nope!") job = BatchJob("jj", connection=con100) @@ -743,6 +802,7 @@ def test_result_asset_download_file_error(con100, requests_mock, tmp_path): def test_result_asset_download_folder(con100, requests_mock, tmp_path): href = API_URL + "/dl/jjr1.tiff" + requests_mock.head(href, headers={"Content-Length": f"{len(TIFF_CONTENT)}"}) requests_mock.get(href, content=TIFF_CONTENT) job = BatchJob("jj", connection=con100) @@ -770,6 +830,7 @@ def test_result_asset_load_json(con100, requests_mock): def test_result_asset_load_bytes(con100, requests_mock): href = API_URL + "/dl/jjr1.tiff" + requests_mock.head(href, headers={"Content-Length": f"{len(TIFF_CONTENT)}"}) requests_mock.get(href, content=TIFF_CONTENT) job = BatchJob("jj", connection=con100) @@ -797,6 +858,7 @@ def download_tiff(request, context): return TIFF_CONTENT requests_mock.get(API_URL + "/jobs/jj1/results", json=get_results) + requests_mock.head("https://evilcorp.test/dl/jjr1.tiff", headers={"Content-Length": "666"}) requests_mock.get("https://evilcorp.test/dl/jjr1.tiff", content=download_tiff) con100.authenticate_basic("john", "j0hn")