Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
d2ab2fc
update to 1.9 and use odc.geo; most tests passing
Apr 8, 2025
7209aeb
expect dataset sources as list
Apr 11, 2025
50904be
update test
Apr 11, 2025
b5de1d6
set XYSCALE=None explicitly to use odc-geo warp
Apr 16, 2025
cf1cf79
update reqs
Apr 17, 2025
06ac710
tests requirement with odc>1.9
Apr 17, 2025
95d4c2d
change time to timezone aware
May 29, 2025
fec3e81
relocate loading from odc-algo
May 30, 2025
149444c
delete cached stage periodically
Feb 21, 2025
f8136eb
rename docker image workflow yaml
Feb 21, 2025
9aed4f6
update basics with pass
Feb 28, 2025
27e344f
update tflite runtime name
Feb 28, 2025
1f973d6
add back stats and install newest numexpr
Mar 27, 2025
0828840
switch from ubuntu to debian
Apr 17, 2025
f368d44
remove odc-stats
Apr 17, 2025
5b78473
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 30, 2025
5807a19
fix plugin import
May 30, 2025
695b7cb
upgrade to odc>1.9
Jun 12, 2025
f7fca14
update integration test
Jun 12, 2025
9c25437
add indexing test (#191)
emmaai Mar 28, 2025
e86f22c
[pre-commit.ci] pre-commit autoupdate (#193)
pre-commit-ci[bot] May 12, 2025
b020172
update dependencies in test
Jun 13, 2025
02b0106
remove dask client from unit test
Jun 13, 2025
9451e14
update odc-algo hash
Jun 16, 2025
9a33abe
update env variable with datacube>1.9
Jun 16, 2025
332146b
remove pin on sqlalchemy
Jun 16, 2025
8194168
lower pin numexpr
Jun 16, 2025
92c45e6
update dependencies version
Jun 17, 2025
c3ba410
remove expr_eval
Jun 17, 2025
35ff595
install numexpr from pypi
Jun 17, 2025
385153e
fix typos
Jun 17, 2025
9bd0f48
update dependencies in test
Jun 17, 2025
e8d2f52
more typos
Jun 17, 2025
ae83fff
save and retrieve info on fused product from properties explicitly
Jun 18, 2025
b51a8a3
always some typos
Jun 18, 2025
3c91b5b
update typing to 3.10
Jun 18, 2025
2245438
unhacking lineage and metadata assemble
Jun 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ jobs:
./tests/integration_test.sh

env:
DB_HOSTNAME: postgres
DB_USERNAME: opendatacube
DB_PASSWORD: opendatacubepassword
DB_DATABASE: opendatacube
ODC_DEFAULT_DB_HOSTNAME: postgres
ODC_DEFAULT_DB_USERNAME: opendatacube
ODC_DEFAULT_DB_PASSWORD: opendatacubepassword
ODC_DEFAULT_DB_DATABASE: opendatacube
AWS_NO_SIGN_REQUEST: true
AWS_DEFAULT_REGION: ap-southeast-2
AWS_REGION: ap-southeast-2
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ ci:

repos:
- repo: https://github.com/adrienverge/yamllint.git
rev: v1.37.0
rev: v1.37.1
hooks:
- id: yamllint
args: ['-c', '.yamllint']
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,6 @@ The dockerfile for the docker image used in the ochastration, which can be used

## Integration test

The integration test for the summary products is located under [tests](./tests). Currently the test is performed on all the official summary products published by DEA. The "golden files" to test against are stored on the public accessible [S3 bucket](s3://dea-public-data-dev/stats-golden-files). The "golder files" should be achived but not deleted in the case that we decide to amend or upgrade any product. It will help with tracking the changes that we intend and alerting those that we do not.
The integration test for the summary products is located under [tests](./tests). Currently the test is performed on all the official summary products published by DEA. The "golden files" to test against are stored on the public accessible [S3 bucket](s3://dea-public-data-dev/stats-golden-files). The "golden files" should be archived but not deleted in the case that we decide to amend or upgrade any product. It will help with tracking the changes that we intend and alerting those that we do not.

The test is meant to be regressive, i.e., the new version of `odc-stats` need to pass the test on the last version of docker image. The new version of docker image needs to pass the test on the current released version of `odc-stats`.
8 changes: 4 additions & 4 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ services:
build:
context: .
environment:
- DB_HOSTNAME=postgres
- DB_USERNAME=opendatacube
- DB_PASSWORD=opendatacubepassword
- DB_DATABASE=opendatacube
- ODC_DEFAULT_DB_HOSTNAME=postgres
- ODC_DEFAULT_DB_USERNAME=opendatacube
- ODC_DEFAULT_DB_PASSWORD=opendatacubepassword
- ODC_DEFAULT_DB_DATABASE=opendatacube
- AWS_NO_SIGN_REQUEST=true
- STAC_API_URL=https://earth-search.aws.element84.com/v0/
- GDAL_HTTP_MAX_RETRY=5
Expand Down
4 changes: 2 additions & 2 deletions docker/env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ dependencies:
- dask-image
- deepdiff
- defusedxml
- distributed<2024.11.0
- distributed>=2025.4
- docutils
- fiona
- Flask
Expand Down Expand Up @@ -120,7 +120,7 @@ dependencies:
- slicerator
- snuggs
- sortedcontainers
- SQLAlchemy<2.0
- SQLAlchemy
- structlog
- tblib
- text-unidecode
Expand Down
15 changes: 8 additions & 7 deletions docker/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@

--extra-index-url https://packages.dea.ga.gov.au/
datacube-ows<1.9
datacube[performance,s3]<1.9
eodatasets3<1.9
datacube-ows>=1.9
datacube[performance,s3]>=1.9.5
eodatasets3>1.9
hdstats==0.1.8.post1
numexpr @ git+https://github.com/pydata/numexpr@a99412e
odc-algo @ git+https://github.com/opendatacube/odc-algo@adb1856
numexpr>=2.11
odc-algo>=1.0.1
odc-apps-cloud>=0.2.2
# For testing
odc-apps-dc-tools>=0.2.12
odc-cloud>=0.2.5
odc-dscache>=0.2.3
odc-stac @ git+https://github.com/opendatacube/odc-stac@69bdf64
odc-dscache>=1.9
odc-geo>=0.5.0rc1
odc-stac>=0.4.0

# odc-stac is in PyPI
odc-stats[ows]
Expand Down
48 changes: 2 additions & 46 deletions odc/stats/_algebra.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,7 @@
import dask.array as da
import xarray as xr
import numpy as np
import numexpr as ne
import functools
from dask.base import tokenize
from typing import Any, Dict, Optional
from odc.algo._dask import flatten_kv, unflatten_kv


def apply_numexpr_np(
expr: str,
data: Optional[Dict[str, Any]] = None,
dtype=None,
casting="safe",
order="K",
**params,
) -> np.ndarray:
"""
Apply numexpr to numpy arrays
"""

if data is None:
data = params
else:
data.update(params)

out = ne.evaluate(expr, local_dict=data, casting=casting, order=order)
if dtype is None:
return out
else:
return out.astype(dtype)


def expr_eval(expr, data, dtype="float32", name="expr_eval", **kwargs):
tk = tokenize(apply_numexpr_np, *flatten_kv(data))
op = functools.partial(
apply_numexpr_np, expr, dtype=dtype, casting="unsafe", order="K", **kwargs
)

return da.map_blocks(
lambda op, *data: op(unflatten_kv(data)),
op,
*flatten_kv(data),
name=f"{name}_{tk}",
dtype=dtype,
meta=np.array((), dtype=dtype),
)


def _median_by_ind(a):
Expand All @@ -76,7 +32,7 @@ def median_by_ind(xr_da, dim, dtype="float32", name="median_by_ind"):
meta=np.array((), dtype=dtype),
drop_axis=0,
)
coords = dict((dim, xr_da.coords[dim]) for dim in xr_da.dims[1:])
coords = {dim: xr_da.coords[dim] for dim in xr_da.dims[1:]}

return xr.DataArray(
res, dims=xr_da.dims[1:], coords=coords, attrs=xr_da.attrs.copy()
Expand All @@ -88,5 +44,5 @@ def median_ds(xr_ds, dim, dtype="float32", name="median_ds"):
for var, data in xr_ds.data_vars.items():
res[var] = median_by_ind(data, dim, dtype, name)
# pylint: disable=undefined-loop-variable
coords = dict((dim, xr_ds.coords[dim]) for dim in data.dims[1:])
coords = {dim: xr_ds.coords[dim] for dim in data.dims[1:]}
return xr.Dataset(res, coords=coords, attrs=xr_ds.attrs.copy())
11 changes: 5 additions & 6 deletions odc/stats/_cli_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import logging
import sys
from typing import List, Tuple
import click


Expand All @@ -12,7 +11,7 @@
from urllib.parse import urlparse


TileIdx_txy = Tuple[str, int, int] # pylint: disable=invalid-name
TileIdx_txy = tuple[str, int, int] # pylint: disable=invalid-name


def parse_task(s: str) -> TileIdx_txy:
Expand All @@ -27,8 +26,8 @@ def parse_task(s: str) -> TileIdx_txy:


def parse_all_tasks(
inputs: List[str], all_possible_tasks: List[TileIdx_txy]
) -> List[TileIdx_txy]:
inputs: list[str], all_possible_tasks: list[TileIdx_txy]
) -> list[TileIdx_txy]:
"""
Select a subset of all possible tasks given user input on cli.

Expand All @@ -43,7 +42,7 @@ def parse_all_tasks(
x+10/y-3/2019--P1Y
"""

out: List[TileIdx_txy] = []
out: list[TileIdx_txy] = []
full_set = set(all_possible_tasks)

for s in inputs:
Expand All @@ -68,7 +67,7 @@ def parse_all_tasks(
return out


def parse_resolution(s: str, separator: str = ",") -> Tuple[float, float]:
def parse_resolution(s: str, separator: str = ",") -> tuple[float, float]:
parts = [float(v) for v in split_and_check(s, separator, (1, 2))]

if len(parts) == 1:
Expand Down
9 changes: 4 additions & 5 deletions odc/stats/_cli_publish_tasks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import json
import sys
from typing import List, Optional

import click
import fsspec
import toolz
from datacube.utils.geometry import Geometry
from odc.geo import Geometry
from odc.aws.queue import get_queue, publish_messages
from odc.dscache.tools.tiling import GRIDS
from odc.stats.model import TileIdx_txy
Expand All @@ -28,15 +27,15 @@ def get_geometry(geojson_file: str) -> Geometry:
)


def filter_tasks(tasks: List[TileIdx_txy], geometry: Geometry, grid_name: str):
def filter_tasks(tasks: list[TileIdx_txy], geometry: Geometry, grid_name: str):
for task in tasks:
task_geometry = GRIDS[grid_name].tile_geobox((task[1], task[2])).extent
if task_geometry.intersects(geometry):
yield task


def publish_tasks(
db: str, task_filter: str, geojson_filter: Optional[str], dryrun: bool, queue: str
db: str, task_filter: str, geojson_filter: str | None, dryrun: bool, queue: str
):
reader = TaskReader(db)
if len(task_filter) == 0:
Expand Down Expand Up @@ -67,7 +66,7 @@ def publish_tasks(

# We assume the db files are always be the S3 uri. If they are not, there is no need to use SQS queue to process.
messages = (
dict(Id=str(idx), MessageBody=json.dumps(render_sqs(tidx, db)))
{"Id": str(idx), "MessageBody": json.dumps(render_sqs(tidx, db))}
for idx, tidx in enumerate(tasks)
)

Expand Down
29 changes: 15 additions & 14 deletions odc/stats/_gjson.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import math
from copy import deepcopy
import toolz
from typing import Tuple, Dict, Any
from typing import Any
from datetime import timedelta

from datacube.model import GridSpec
from datacube.utils.geometry import polygon_from_transform, Geometry
from odc.geo.gridspec import GridSpec
from odc.geo import Geometry, wh_
from odc.geo.geom import polygon_from_transform
from odc.dscache.tools import solar_offset
from .model import TileIdx_xy, TileIdx_txy


def gs_bounds(gs: GridSpec, tiles: Tuple[Tuple[int, int], Tuple[int, int]]) -> Geometry:
def gs_bounds(gs: GridSpec, tiles: tuple[tuple[int, int], tuple[int, int]]) -> Geometry:
"""
Compute Polygon for a selection of tiles.

Expand All @@ -20,23 +21,23 @@ def gs_bounds(gs: GridSpec, tiles: Tuple[Tuple[int, int], Tuple[int, int]]) -> G
X,Y ranges are inclusive on the left and exclusive on the right, same as numpy slicing.
"""
((x0, x1), (y0, y1)) = tiles
if gs.resolution[0] < 0:
if gs.resolution.y < 0:
gb = gs.tile_geobox((x0, y1 - 1))
else:
gb = gs.tile_geobox((x0, y0))

nx = (x1 - x0) * gb.shape[1]
ny = (y1 - y0) * gb.shape[0]
return polygon_from_transform(nx, ny, gb.affine, gb.crs)
nx = (x1 - x0) * gb.shape.x
ny = (y1 - y0) * gb.shape.y
return polygon_from_transform(wh_(nx, ny), gb.affine, gb.crs)


def timedelta_to_hours(td: timedelta) -> float:
return td.days * 24 + td.seconds / 3600


def compute_grid_info(
cells: Dict[TileIdx_xy, Any], resolution: float = math.inf, title_width: int = 0
) -> Dict[TileIdx_xy, Any]:
cells: dict[TileIdx_xy, Any], resolution: float = math.inf, title_width: int = 0
) -> dict[TileIdx_xy, Any]:
"""
Compute geojson feature for every cell in ``cells``.
Where ``cells`` is produced by ``bin_dataset_stream``
Expand Down Expand Up @@ -74,8 +75,8 @@ def compute_grid_info(


def gjson_from_tasks(
tasks: Dict[TileIdx_txy, Any], grid_info: Dict[TileIdx_xy, Any]
) -> Dict[str, Dict[str, Any]]:
tasks: dict[TileIdx_txy, Any], grid_info: dict[TileIdx_xy, Any]
) -> dict[str, dict[str, Any]]:
"""
Group tasks by time period and compute geosjon describing every tile covered by each time period.

Expand All @@ -95,14 +96,14 @@ def _get(idx):
dss = tasks[idx]
utc_offset = timedelta(hours=geo["properties"]["utc_offset"])

ndays = len(set((ds.time + utc_offset).date() for ds in dss))
ndays = len({(ds.time + utc_offset).date() for ds in dss})
geo["properties"]["total"] = len(dss)
geo["properties"]["days"] = ndays

return geo

def process(idxs):
return dict(type="FeatureCollection", features=[_get(idx) for idx in idxs])
return {"type": "FeatureCollection", "features": [_get(idx) for idx in idxs]}

return {
t: process(idxs)
Expand Down
Loading
Loading