Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `sar_backscatter`: try to retrieve coefficient options from backend ([#693](https://github.com/Open-EO/openeo-python-client/issues/693))
- Improve error message when OIDC provider is unavailable ([#751](https://github.com/Open-EO/openeo-python-client/issues/751))
- Added `on_response_headers` argument to `DataCube.download()` and related to handle (e.g. `print`) the response headers ([#560](https://github.com/Open-EO/openeo-python-client/issues/560))
- Added more robust download for large job result files (if supported by the server)

### Changed

Expand All @@ -32,7 +33,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `STACAPIJobDatabase.get_by_status()` now always returns a `pandas.DataFrame` with an index compatible with `MultiBackendJobManager`. ([#707](https://github.com/Open-EO/openeo-python-client/issues/707))


## [0.39.1] - 2025-02-26

### Fixed
Expand Down
18 changes: 18 additions & 0 deletions openeo/rest/_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,24 @@ def get(
"""
return self.request("get", path=path, params=params, stream=stream, auth=auth, **kwargs)

def head(
self,
path: str,
*,
params: Optional[dict] = None,
auth: Optional[AuthBase] = None,
**kwargs,
) -> Response:
"""
Do HEAD request to REST API.

:param path: API path (without root url)
:param params: Additional query parameters
:param auth: optional custom authentication to use instead of the default one
:return: response: Response
"""
return self.request("head", path=path, params=params, auth=auth, **kwargs)

def post(self, path: str, json: Optional[dict] = None, **kwargs) -> Response:
"""
Do POST request to REST API.
Expand Down
4 changes: 4 additions & 0 deletions openeo/rest/_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ def __init__(
requests_mock.delete(
re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_delete_job_results
)
requests_mock.head(
re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")),
headers={"Content-Length": "666"}
)
requests_mock.get(
re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")),
content=self._handle_get_job_result_asset,
Expand Down
63 changes: 47 additions & 16 deletions openeo/rest/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datetime
import json
import logging
import shutil
import time
import typing
from pathlib import Path
Expand All @@ -11,12 +12,7 @@
import requests

from openeo.internal.documentation import openeo_endpoint
from openeo.internal.jupyter import (
VisualDict,
VisualList,
render_component,
render_error,
)
from openeo.internal.jupyter import VisualDict, render_component, render_error
from openeo.internal.warnings import deprecated, legacy_alias
from openeo.rest import (
DEFAULT_DOWNLOAD_CHUNK_SIZE,
Expand All @@ -29,7 +25,7 @@
OpenEoClientException,
)
from openeo.rest.models.general import LogsResponse
from openeo.rest.models.logs import LogEntry, log_level_name, normalize_log_level
from openeo.rest.models.logs import log_level_name
from openeo.util import ensure_dir

if typing.TYPE_CHECKING:
Expand All @@ -40,7 +36,8 @@


DEFAULT_JOB_RESULTS_FILENAME = "job-results.json"

MAX_RETRIES_PER_CHUNK = 3
RETRIABLE_STATUSCODES = [408, 429, 500, 501, 502, 503, 504]

class BatchJob:
"""
Expand Down Expand Up @@ -402,10 +399,7 @@ def download(
target = target / self.name
ensure_dir(target.parent)
logger.info("Downloading Job result asset {n!r} from {h!s} to {t!s}".format(n=self.name, h=self.href, t=target))
response = self._get_response(stream=True)
with target.open("wb") as f:
for block in response.iter_content(chunk_size=chunk_size):
f.write(block)
self._download_to_file(url=self.href, target=target, chunk_size=chunk_size)
return target

def _get_response(self, stream=True) -> requests.Response:
Expand All @@ -424,6 +418,43 @@ def load_bytes(self) -> bytes:
# TODO: more `load` methods e.g.: load GTiff asset directly as numpy array


def _download_to_file(self, url: str, target: Path, chunk_size: int):
head = self.job.connection.head(url, stream=True)
if head.ok and 'Accept-Ranges' in head.headers and 'bytes' in head.headers['Accept-Ranges']:
file_size = int(head.headers['Content-Length'])
self._download_chunked(url=url, target=target, file_size=file_size, chunk_size=chunk_size)
else:
self._download_unchunked(url=url, target=target)


def _download_chunked(self, url: str, target: Path, file_size: int, chunk_size: int):
with target.open('wb') as f:
for from_byte_index in range(0, file_size, chunk_size):
to_byte_index = min(from_byte_index + chunk_size - 1, file_size - 1)
tries_left = MAX_RETRIES_PER_CHUNK
while tries_left > 0:
try:
range_headers = {"Range": f"bytes={from_byte_index}-{to_byte_index}"}
with self.job.connection.get(path=url, headers=range_headers, stream=True) as r:
r.raise_for_status()
shutil.copyfileobj(r.raw, f)
break
except OpenEoApiPlainError as error:
tries_left -= 1
if tries_left > 0 and error.http_status_code in RETRIABLE_STATUSCODES:
logger.warning(f"Failed to retrieve chunk {from_byte_index}-{to_byte_index} from {url} (status {error.http_status_code}) - retrying")
continue
else:
raise error


def _download_unchunked(self, url: str, target: Path):
with self.job.connection.get(path=url, stream=True) as r:
r.raise_for_status()
with target.open("wb") as f:
shutil.copyfileobj(r.raw, f)


class MultipleAssetException(OpenEoClientException):
pass

Expand Down Expand Up @@ -501,7 +532,7 @@ def get_asset(self, name: str = None) -> ResultAsset:
"No asset {n!r} in: {a}".format(n=name, a=[a.name for a in assets])
)

def download_file(self, target: Union[Path, str] = None, name: str = None) -> Path:
def download_file(self, target: Union[Path, str] = None, name: str = None, chunk_size=DEFAULT_DOWNLOAD_CHUNK_SIZE) -> Path:
"""
Download single asset. Can be used when there is only one asset in the
:py:class:`JobResults`, or when the desired asset name is given explicitly.
Expand All @@ -513,12 +544,12 @@ def download_file(self, target: Union[Path, str] = None, name: str = None) -> Pa
:return: path of downloaded asset
"""
try:
return self.get_asset(name=name).download(target=target)
return self.get_asset(name=name).download(target=target, chunk_size=chunk_size)
except MultipleAssetException:
raise OpenEoClientException(
"Can not use `download_file` with multiple assets. Use `download_files` instead.")

def download_files(self, target: Union[Path, str] = None, include_stac_metadata: bool = True) -> List[Path]:
def download_files(self, target: Union[Path, str] = None, include_stac_metadata: bool = True, chunk_size=DEFAULT_DOWNLOAD_CHUNK_SIZE) -> List[Path]:
"""
Download all assets to given folder.

Expand All @@ -531,7 +562,7 @@ def download_files(self, target: Union[Path, str] = None, include_stac_metadata:
raise OpenEoClientException(f"Target argument {target} exists but isn't a folder.")
ensure_dir(target)

downloaded = [a.download(target) for a in self.get_assets()]
downloaded = [a.download(target, chunk_size=chunk_size) for a in self.get_assets()]

if include_stac_metadata:
# TODO #184: convention for metadata file name?
Expand Down
65 changes: 56 additions & 9 deletions tests/rest/test_job.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import inspect
import itertools
import json
import logging
Expand All @@ -11,16 +12,19 @@

import openeo
import openeo.rest.job
from openeo.rest import JobFailedException, OpenEoApiPlainError, OpenEoClientException
from openeo.rest import (
DEFAULT_DOWNLOAD_CHUNK_SIZE,
JobFailedException,
OpenEoApiPlainError,
OpenEoClientException,
)
from openeo.rest.job import BatchJob, ResultAsset
from openeo.rest.models.general import Link
from openeo.rest.models.logs import LogEntry

API_URL = "https://oeo.test"

TIFF_CONTENT = b'T1f7D6t6l0l' * 1000


TIFF_CONTENT = b'T1f7D6t6l0l' * 10000

@pytest.fixture
def con100(requests_mock):
Expand Down Expand Up @@ -74,7 +78,7 @@ def test_execute_batch(con100, requests_mock, tmpdir):
}
},
)
requests_mock.get(API_URL + "/jobs/f00ba5/files/output.tiff", text="tiffdata")
_mock_get_head_content(requests_mock, API_URL + "/jobs/f00ba5/files/output.tiff", "tiffdata")
requests_mock.get(API_URL + "/jobs/f00ba5/logs", json={'logs': []})

path = tmpdir.join("tmp.tiff")
Expand Down Expand Up @@ -231,7 +235,8 @@ def test_execute_batch_with_soft_errors(con100, requests_mock, tmpdir, error_res
}
},
)
requests_mock.get(API_URL + "/jobs/f00ba5/files/output.tiff", text="tiffdata")
_mock_get_head_content(requests_mock, API_URL + "/jobs/f00ba5/files/output.tiff", "tiffdata")
# requests_mock.get(API_URL + "/jobs/f00ba5/files/output.tiff", text="tiffdata")
requests_mock.get(API_URL + "/jobs/f00ba5/logs", json={'logs': []})

path = tmpdir.join("tmp.tiff")
Expand Down Expand Up @@ -536,10 +541,28 @@ def job_with_1_asset(con100, requests_mock, tmp_path) -> BatchJob:
requests_mock.get(API_URL + "/jobs/jj1/results", json={"assets": {
"1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff; application=geotiff"},
}})
requests_mock.head(API_URL + "/dl/jjr1.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}"})
requests_mock.get(API_URL + "/dl/jjr1.tiff", content=TIFF_CONTENT)

job = BatchJob("jj1", connection=con100)
return job

@pytest.fixture
def job_with_chunked_asset_using_head(con100, requests_mock, tmp_path) -> BatchJob:
requests_mock.get(API_URL + "/jobs/jj1/results", json={"assets": {
"1.tiff": {"href": API_URL + "/dl/jjr1.tiff", "type": "image/tiff; application=geotiff"},
}})
requests_mock.head(API_URL + "/dl/jjr1.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}", "Accept-Ranges": "bytes"})

chunk_size = 1000
for r in range(0, len(TIFF_CONTENT), chunk_size):
from_bytes = r
to_bytes = min(r + chunk_size, len(TIFF_CONTENT)) - 1
requests_mock.get(API_URL + "/dl/jjr1.tiff", request_headers={"Range": f"bytes={from_bytes}-{to_bytes}"},
response_list = [{"status_code": 500, "text": "Server error"},
{"status_code": 206, "content": TIFF_CONTENT[from_bytes:to_bytes+1]}])
job = BatchJob("jj1", connection=con100)
return job

@pytest.fixture
def job_with_2_assets(con100, requests_mock, tmp_path) -> BatchJob:
Expand All @@ -551,8 +574,11 @@ def job_with_2_assets(con100, requests_mock, tmp_path) -> BatchJob:
"2.tiff": {"href": API_URL + "/dl/jjr2.tiff", "type": "image/tiff; application=geotiff"},
}
})
requests_mock.head(API_URL + "/dl/jjr1.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}"})
requests_mock.get(API_URL + "/dl/jjr1.tiff", content=TIFF_CONTENT)
requests_mock.head(API_URL + "/dl/jjr2.tiff", headers={"Content-Length": f"{len(TIFF_CONTENT)}"})
requests_mock.get(API_URL + "/dl/jjr2.tiff", content=TIFF_CONTENT)

job = BatchJob("jj2", connection=con100)
return job

Expand All @@ -574,6 +600,13 @@ def test_get_results_download_file(job_with_1_asset: BatchJob, tmp_path):
with target.open("rb") as f:
assert f.read() == TIFF_CONTENT

def test_get_results_download_chunked_file_using_head(job_with_chunked_asset_using_head: BatchJob, tmp_path):
job = job_with_chunked_asset_using_head
target = tmp_path / "result.tiff"
res = job.get_results().download_file(target, chunk_size=1000)
assert res == target
with target.open("rb") as f:
assert f.read() == TIFF_CONTENT

def test_download_result_folder(job_with_1_asset: BatchJob, tmp_path):
job = job_with_1_asset
Expand Down Expand Up @@ -714,7 +747,7 @@ def test_get_results_download_files_include_stac_metadata(

def test_result_asset_download_file(con100, requests_mock, tmp_path):
href = API_URL + "/dl/jjr1.tiff"
requests_mock.get(href, content=TIFF_CONTENT)
_mock_get_head_content(requests_mock, href, TIFF_CONTENT)

job = BatchJob("jj", connection=con100)
asset = ResultAsset(job, name="1.tiff", href=href, metadata={'type': 'image/tiff; application=geotiff'})
Expand All @@ -729,6 +762,7 @@ def test_result_asset_download_file(con100, requests_mock, tmp_path):

def test_result_asset_download_file_error(con100, requests_mock, tmp_path):
href = API_URL + "/dl/jjr1.tiff"
requests_mock.head(href, status_code=500, text="Nope!")
requests_mock.get(href, status_code=500, text="Nope!")

job = BatchJob("jj", connection=con100)
Expand All @@ -743,7 +777,7 @@ def test_result_asset_download_file_error(con100, requests_mock, tmp_path):

def test_result_asset_download_folder(con100, requests_mock, tmp_path):
href = API_URL + "/dl/jjr1.tiff"
requests_mock.get(href, content=TIFF_CONTENT)
_mock_get_head_content(requests_mock, href, TIFF_CONTENT)

job = BatchJob("jj", connection=con100)
asset = ResultAsset(job, name="1.tiff", href=href, metadata={"type": "image/tiff; application=geotiff"})
Expand All @@ -770,7 +804,7 @@ def test_result_asset_load_json(con100, requests_mock):

def test_result_asset_load_bytes(con100, requests_mock):
href = API_URL + "/dl/jjr1.tiff"
requests_mock.get(href, content=TIFF_CONTENT)
_mock_get_head_content(requests_mock, href, TIFF_CONTENT)

job = BatchJob("jj", connection=con100)
asset = ResultAsset(job, name="out.tiff", href=href, metadata={"type": "image/tiff; application=geotiff"})
Expand All @@ -797,6 +831,7 @@ def download_tiff(request, context):
return TIFF_CONTENT

requests_mock.get(API_URL + "/jobs/jj1/results", json=get_results)
requests_mock.head("https://evilcorp.test/dl/jjr1.tiff", headers={"Content-Length": "666"})
requests_mock.get("https://evilcorp.test/dl/jjr1.tiff", content=download_tiff)

con100.authenticate_basic("john", "j0hn")
Expand Down Expand Up @@ -880,3 +915,15 @@ def get_jobs(request, context):
assert jobs.links == [Link(rel="next", href="https://oeo.test/jobs?limit=2&offset=2")]
assert jobs.ext_federation_missing() == ["oeob"]
assert "Partial job listing: missing federation components: ['oeob']." in caplog.text


def _mock_get_head_content(requests_mock, url: str, content):
if callable(content):
requests_mock.head(url, headers={"Content-Length": "666"})
requests_mock.get(url, content=content)
elif type(content) == str:
requests_mock.head(url, headers={"Content-Length": f"{len(content)}"})
requests_mock.get(url, text=content)
else:
requests_mock.head(url, headers={"Content-Length": f"{len(content)}"})
requests_mock.get(url, content=content)