Skip to content

Commit 3511109

Browse files
SpacemanPaulphaeslerpindgeTisham Dhar
authored
MV search redux (#378)
* Basic SQLAlchemy interface to space_time_view * Search returns unioned extent. * Search returns ODC Datasets. * Use MVs for dataset search. * Delete obsolete file * only return bits of unioned extent inside out query. * Interact with Postgres with EPSG:4326 * Use Count and Extent modes in GetMap. * Pylint is just doing its job. * Unit test MVSelectOpts. * Make unit tests more fun. * Test mv_index.py * Remove legacy dataset search code. Instant coverage boost! * Grant select permission to public. * Handle null extents. * More readable refactor of previous bug fix. * I guess I was right the first time. * Remove support for blocking materialised view updates. * Oops - unfinished refactor. * Update database documentation. * Use MVSelectOpts.COUNT in WCS GetCoverage requests. * Embarassing typo in last commit. Co-authored-by: phaesler <paul.haesler@data61.csiro.au> Co-authored-by: Pin Jin <pindge.j@gmail.com> Co-authored-by: Tisham Dhar <tisham.dhar@ga.gov.au>
1 parent f102aa2 commit 3511109

16 files changed

+339
-105
lines changed

datacube_ows/band_mapper.py

Lines changed: 0 additions & 6 deletions
This file was deleted.

datacube_ows/data.py

Lines changed: 41 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
GetFeatureInfoParameters, solar_correct_data, collapse_datasets_to_times
2525
from datacube_ows.ogc_utils import local_solar_date_range, dataset_center_time, ConfigException, tz_for_geometry, \
2626
solar_date, year_date_range, month_date_range
27-
27+
from datacube_ows.mv_index import MVSelectOpts, mv_search_datasets
2828
from datacube_ows.utils import log_call, group_by_statistical
2929

3030
import logging
@@ -62,51 +62,37 @@ def needed_bands(self):
6262
return self._needed_bands
6363

6464
@log_call
65-
def datasets(self, index, mask=False, all_time=False, point=None):
65+
def n_datasets(self, index, all_time=False, point=None):
66+
return self.datasets(index,
67+
all_time=all_time, point=point,
68+
mode=MVSelectOpts.COUNT)
69+
@log_call
70+
def datasets(self, index,
71+
mask=False, all_time=False, point=None,
72+
mode=MVSelectOpts.DATASETS):
6673
# Return datasets as a time-grouped xarray DataArray. (or None if no datasets)
6774
# No PQ product, so no PQ datasets.
6875
if not self._product.pq_name and mask:
6976
return None
7077

71-
if self._product.multi_product:
72-
prod_name = self._product.pq_names if mask and self._product.pq_name else self._product.product_names
73-
query_args = {
74-
"geopolygon": self._geobox.extent
75-
}
78+
if point:
79+
geom = point
7680
else:
77-
prod_name = self._product.pq_name if mask and self._product.pq_name else self._product.product_name
78-
query_args = {
79-
"product": prod_name,
80-
"geopolygon": self._geobox.extent
81-
}
82-
if all_time or (mask and self._product.pq_ignore_time):
83-
all_datasets = self._dataset_query(index, prod_name, query_args)
81+
geom = self._geobox.extent
82+
83+
if all_time:
84+
times = None
8485
else:
85-
all_datasets = []
86-
for th in self._times:
87-
query_args["time"] = th
88-
all_datasets.extend(self._dataset_query(index, prod_name, query_args))
89-
return datacube.Datacube.group_datasets(all_datasets, self.group_by)
90-
91-
def _dataset_query(self, index, prod_name, query_args):
92-
# ODC Dataset Query
93-
if self._product.multi_product:
94-
queries = []
95-
for pn in prod_name:
96-
query_args["product"] = pn
97-
queries.append(datacube.api.query.Query(**query_args))
98-
_LOG.debug("query start %s", datetime.now().time())
99-
datasets = []
100-
for q in queries:
101-
datasets.extend(index.datasets.search_eager(**q.search_terms))
102-
_LOG.debug("query stop %s", datetime.now().time())
86+
times = self._times
87+
result = mv_search_datasets(index, mode,
88+
layer=self._product,
89+
times=times,
90+
geom=geom,
91+
mask=mask)
92+
if mode == MVSelectOpts.DATASETS:
93+
return datacube.Datacube.group_datasets(result, self.group_by)
10394
else:
104-
query = datacube.api.query.Query(**query_args)
105-
_LOG.debug("query start %s", datetime.now().time())
106-
datasets = index.datasets.search_eager(**query.search_terms)
107-
_LOG.debug("query stop %s", datetime.now().time())
108-
109-
return datasets
95+
return result
11096

11197
@log_call
11298
def data(self, datasets, mask=False, manual_merge=False, skip_corrections=False, **kwargs):
@@ -236,38 +222,26 @@ def get_map(args):
236222
raise WMSException("Database connectivity failure")
237223
# Tiling.
238224
stacker = DataStacker(params.product, params.geobox, params.times, params.resampling, style=params.style)
239-
datasets = stacker.datasets(dc.index)
240-
n_datasets = datasets_in_xarray(datasets)
241225
zoomed_out = params.zf < params.product.min_zoom
242-
too_many_datasets = (params.product.max_datasets_wms > 0
243-
and n_datasets > params.product.max_datasets_wms
244-
)
245-
if n_datasets == 0:
226+
too_many_datasets = False
227+
if not zoomed_out:
228+
n_datasets = stacker.datasets(dc.index, mode=MVSelectOpts.COUNT)
229+
too_many_datasets = (params.product.max_datasets_wms > 0
230+
and n_datasets > params.product.max_datasets_wms
231+
)
232+
if too_many_datasets or zoomed_out:
233+
extent = stacker.datasets(dc.index, mode=MVSelectOpts.EXTENT)
234+
if extent is None:
235+
body = _write_empty(params.geobox)
236+
else:
237+
body = _write_polygon(
238+
params.geobox,
239+
extent,
240+
params.product.zoom_fill)
241+
elif n_datasets == 0:
246242
body = _write_empty(params.geobox)
247-
elif too_many_datasets:
248-
body = _write_polygon(
249-
params.geobox,
250-
params.geobox.extent,
251-
params.product.zoom_fill)
252-
elif zoomed_out:
253-
# Zoomed out to far to properly render data.
254-
# Construct a polygon which is the union of the extents of the matching datasets.
255-
extent = None
256-
extent_crs = None
257-
for dt in datasets.time.values:
258-
tds = datasets.sel(time=dt)
259-
for ds in tds.values.item():
260-
if extent:
261-
new_extent = bbox_to_geom(ds.extent.boundingbox, ds.extent.crs)
262-
if new_extent.crs != extent_crs:
263-
new_extent = new_extent.to_crs(extent_crs)
264-
extent = extent.union(new_extent)
265-
else:
266-
extent = bbox_to_geom(ds.extent.boundingbox, ds.extent.crs)
267-
extent_crs = extent.crs
268-
extent = extent.to_crs(params.crs)
269-
body = _write_polygon(params.geobox, extent, params.product.zoom_fill)
270243
else:
244+
datasets = stacker.datasets(dc.index)
271245
_LOG.debug("load start %s %s", datetime.now().time(), args["requestid"])
272246
data = stacker.data(datasets,
273247
manual_merge=params.product.data_manual_merge,

datacube_ows/mv_index.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
from enum import Enum
2+
import json
3+
4+
from geoalchemy2 import Geometry
5+
from datacube.utils.geometry import Geometry as ODCGeom
6+
from sqlalchemy import MetaData, Table, select, or_, Column, SMALLINT, text
7+
from psycopg2.extras import DateTimeTZRange
8+
from sqlalchemy.dialects.postgresql import UUID, TSTZRANGE
9+
from sqlalchemy.sql.functions import count, func
10+
11+
def get_sqlalc_engine(index):
12+
# pylint: disable=protected-access
13+
return index._db._engine
14+
15+
def get_st_view(meta):
16+
return Table('space_time_view', meta,
17+
Column('id', UUID()),
18+
Column('dataset_type_ref', SMALLINT()),
19+
Column('spatial_extent', Geometry(from_text='ST_GeomFromGeoJSON', name='geometry')),
20+
Column('temporal_extent', TSTZRANGE())
21+
)
22+
_meta = MetaData()
23+
st_view = get_st_view(_meta)
24+
25+
class MVSelectOpts(Enum):
26+
"""
27+
Enum for mv_search_datasets sel parameter.
28+
29+
ALL: return all columns, select *, as result set
30+
IDS: return list of database_ids only.
31+
DATASETS: return list of ODC dataset objects
32+
COUNT: return a count of matching datasets
33+
EXTENT: return full extent of query as a Geometry
34+
"""
35+
ALL = 0
36+
IDS = 1
37+
COUNT = 2
38+
EXTENT = 3
39+
DATASETS = 4
40+
41+
def sel(self, stv):
42+
if self == self.ALL:
43+
return [stv]
44+
if self == self.IDS or self == self.DATASETS:
45+
return [stv.c.id]
46+
if self == self.COUNT:
47+
return [count(stv.c.id)]
48+
if self == self.EXTENT:
49+
return [text("ST_AsGeoJSON(ST_Union(spatial_extent))")]
50+
assert False
51+
52+
def mv_search_datasets(index,
53+
sel=MVSelectOpts.IDS,
54+
times=None,
55+
layer=None,
56+
geom=None,
57+
mask=False):
58+
"""
59+
Perform a dataset query via the space_time_view
60+
61+
:param layer: A ows_configuration.OWSNamedLayer object (single or multiproduct)
62+
:param index: A datacube index (required)
63+
64+
:param sel: Selection mode - a MVSelectOpts enum. Defaults to IDS.
65+
:param times: A list of pairs of datetimes (with time zone)
66+
:param geom: A datacube.utils.geometry.Geometry object
67+
:param mask: Bool, if true use the flags product of layer
68+
69+
:return: See MVSelectOpts doc
70+
"""
71+
engine = get_sqlalc_engine(index)
72+
stv = st_view
73+
if layer is None:
74+
raise Exception("Must filter by product/layer")
75+
if mask:
76+
prod_ids = [p.id for p in layer.pq_products]
77+
else:
78+
prod_ids = [p.id for p in layer.products]
79+
80+
s = select(sel.sel(stv)).where(stv.c.dataset_type_ref.in_(prod_ids))
81+
if times is not None:
82+
s = s.where(
83+
or_(
84+
*[
85+
stv.c.temporal_extent.op("&&")(DateTimeTZRange(*t))
86+
for t in times
87+
]
88+
)
89+
)
90+
orig_crs = None
91+
if geom is not None:
92+
orig_crs = geom.crs
93+
if str(geom.crs) != "EPSG:4326":
94+
geom = geom.to_crs("EPSG:4326")
95+
geom_js = json.dumps(geom.json)
96+
s = s.where(stv.c.spatial_extent.intersects(geom_js))
97+
# print(s) # Print SQL Statement
98+
conn = engine.connect()
99+
if sel == MVSelectOpts.ALL:
100+
return conn.execute(s)
101+
if sel == MVSelectOpts.IDS:
102+
return [r[0] for r in conn.execute(s)]
103+
if sel in (MVSelectOpts.COUNT, MVSelectOpts.EXTENT):
104+
for r in conn.execute(s):
105+
if sel == MVSelectOpts.COUNT:
106+
return r[0]
107+
if sel == MVSelectOpts.EXTENT:
108+
geojson = r[0]
109+
if geojson is None:
110+
return None
111+
uniongeom = ODCGeom(json.loads(geojson), crs="EPSG:4326")
112+
if geom:
113+
intersect = uniongeom.intersection(geom)
114+
if orig_crs and orig_crs != "EPSG:4326":
115+
intersect = intersect.to_crs(orig_crs)
116+
else:
117+
intersect = uniongeom
118+
return intersect
119+
if sel == MVSelectOpts.DATASETS:
120+
return index.datasets.bulk_get(
121+
[r[0] for r in conn.execute(s)]
122+
)
123+
assert False
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
-- Refreshing TIME materialized view
1+
-- Refreshing TIME materialized view (Blocking)
22

33
REFRESH MATERIALIZED VIEW time_view
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
-- Refreshing SPACE materialized view
1+
-- Refreshing SPACE materialized view (blocking)
22

33
REFRESH MATERIALIZED VIEW space_view
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-- Refreshing combined SPACE-TIME materialized view (concurrently)
2+
3+
REFRESH MATERIALIZED VIEW CONCURRENTLY space_time_view

datacube_ows/sql/extent_views/refresh/004_refresh_spacetime_requires_blocking.sql

Lines changed: 0 additions & 3 deletions
This file was deleted.

datacube_ows/update_ranges.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
@click.command()
1616
@click.option("--views", is_flag=True, default=False, help="Refresh the ODC spatio-temporal materialised views.")
17-
@click.option("--blocking/--no-blocking", is_flag=True, default=False, help="In conjunction with --views, controls whether the materialised view refresh is blocking or concurrent. (defaults to concurrent)")
17+
@click.option("--blocking/--no-blocking", is_flag=True, default=False, help="Obsolete")
1818
@click.option("--schema", is_flag=True, default=False, help="Create or update the OWS database schema, including the spatio-temporal materialised views.")
1919
@click.option("--role", default=None, help="Role to grant database permissions to")
2020
@click.option("--summary", is_flag=True, default=False, help="Treat any named ODC products with no corresponding configured OWS Layer as summary products" )
@@ -107,8 +107,11 @@ def main(layers, blocking,
107107
print("Done")
108108
return 0
109109
elif views:
110+
if blocking:
111+
print("WARNING: The 'blocking' flag is "
112+
"no longer supported and will be ignored.")
110113
print("Refreshing materialised views...")
111-
refresh_views(dc, blocking)
114+
refresh_views(dc)
112115
print("Done")
113116
return 0
114117

@@ -137,14 +140,8 @@ def create_views(dc):
137140
run_sql(dc, "extent_views/create", database=dbname)
138141

139142

140-
def refresh_views(dc, blocking):
141-
if blocking:
142-
blocking = ""
143-
print("N.B. Blocking refresh. Spacetime view will be locked until completion.")
144-
else:
145-
blocking = "CONCURRENTLY"
146-
print("N.B. Concurrent refresh. Refresh will not take place immediately.")
147-
run_sql(dc, "extent_views/refresh", blocking=blocking)
143+
def refresh_views(dc):
144+
run_sql(dc, "extent_views/refresh")
148145

149146

150147
def create_schema(dc, role):

datacube_ows/wcs1_utils.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from datacube_ows.ogc_exceptions import WCS1Exception
1717
from datacube_ows.ogc_utils import ProductLayerException
1818
from datacube_ows.ows_configuration import get_config
19+
from datacube_ows.mv_index import MVSelectOpts
1920

2021
class WCS1GetCoverageRequest():
2122
version = Version(1,0,0)
@@ -291,9 +292,9 @@ def get_coverage_data(req):
291292
req.geobox,
292293
req.times,
293294
bands=req.bands)
294-
datasets = stacker.datasets(dc.index)
295-
if not datasets:
296-
# TODO: Return an empty coverage file with full metadata?
295+
n_datasets = stacker.datasets(dc.index, mode=MVSelectOpts.COUNT)
296+
if n_datasets == 0:
297+
# Return an empty coverage file with full metadata?
297298
cfg = get_config()
298299
x_range = (req.minx, req.maxx)
299300
y_range = (req.miny, req.maxy)
@@ -336,12 +337,11 @@ def get_coverage_data(req):
336337

337338
return data
338339

339-
n_datasets = datasets_in_xarray(datasets)
340340
if req.product.max_datasets_wcs > 0 and n_datasets > req.product.max_datasets_wcs:
341341
raise WCS1Exception("This request processes too much data to be served in a reasonable amount of time."
342342
"Please reduce the bounds of your request and try again."
343343
"(max: %d, this request requires: %d)" % (req.product.max_datasets_wcs, n_datasets))
344-
344+
datasets = stacker.datasets(index=dc.index)
345345
stacker = DataStacker(req.product,
346346
req.geobox,
347347
req.times,

0 commit comments

Comments
 (0)