Skip to content
3 changes: 2 additions & 1 deletion cubedash/_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ def get_time_summary(
year: Optional[int] = None,
month: Optional[int] = None,
day: Optional[int] = None,
region: Optional[str] = None,
) -> Optional[TimePeriodOverview]:
return STORE.get(product_name, year, month, day)
return STORE.get(product_name, year, month, day, region)


@cache.memoize(timeout=60)
Expand Down
10 changes: 5 additions & 5 deletions cubedash/_pages.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def region_page(
selected_summary,
year_selector_summary,
time_selector_summary,
) = _load_product(product_name, year, month, day)
) = _load_product(product_name, year, month, day, region_code)

region_info = _model.STORE.get_product_region_info(product_name)
if not region_info:
Expand Down Expand Up @@ -394,7 +394,7 @@ def timeline_page(product_name: str):


def _load_product(
product_name, year, month, day
product_name, year, month, day, region: str = None
) -> Tuple[
DatasetType,
ProductSummary,
Expand All @@ -410,9 +410,9 @@ def _load_product(
abort(404, f"Unknown product {product_name!r}")

product_summary = _model.get_product_summary(product_name)
time_summary = _model.get_time_summary(product_name, year, month, day)
year_selector_summary = _model.get_time_summary(product_name, None, None, None)
time_selector_summary = _model.get_time_summary(product_name, year, None, None)
time_summary = _model.get_time_summary(product_name, year, month, day, region)
year_selector_summary = _model.get_time_summary(product_name, None, None, None, region)
time_selector_summary = _model.get_time_summary(product_name, year, None, None, region)
return (
product,
product_summary,
Expand Down
1 change: 1 addition & 0 deletions cubedash/summary/_extents.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
import uuid
from dataclasses import dataclass

from datetime import date, datetime
from pathlib import Path
from typing import Dict, Generator, Iterable, List, Optional
Expand Down
1 change: 1 addition & 0 deletions cubedash/summary/_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def add_periods(
for p in periods:
timeline_counter.update(p.timeline_dataset_counts)
period = p.timeline_period

timeline_counter, period = cls._group_counter_if_needed(
timeline_counter, period
)
Expand Down
5 changes: 4 additions & 1 deletion cubedash/summary/_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,10 @@
Column("crses", postgres.ARRAY(String)),
# Size of this dataset in bytes, if the product includes it.
Column("size_bytes", BigInteger),
PrimaryKeyConstraint("product_ref", "start_day", "period_type"),
Column("regions_hash", String),
PrimaryKeyConstraint(
"product_ref", "start_day", "period_type", "regions_hash",
),
CheckConstraint(
r"array_length(timeline_dataset_start_days, 1) = "
r"array_length(timeline_dataset_counts, 1)",
Expand Down
147 changes: 136 additions & 11 deletions cubedash/summary/_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ def get(
year: Optional[int] = None,
month: Optional[int] = None,
day: Optional[int] = None,
region_code: Optional[str] = None,
) -> Optional[TimePeriodOverview]:
period, start_day = TimePeriodOverview.flat_period_representation(
year, month, day
Expand All @@ -771,15 +772,56 @@ def get(
if not product:
return None

res = self._engine.execute(
select([TIME_OVERVIEW]).where(
and_(
TIME_OVERVIEW.c.product_ref == product.id_,
TIME_OVERVIEW.c.start_day == start_day,
TIME_OVERVIEW.c.period_type == period,
)
if region_code and year:
return self._summariser.calculate_summary(
product_name,
year_month_day=(year, month, day),
product_refresh_time=datetime.now(),
region_code=region_code,
)
).fetchone()

if region_code:
res = self._engine.execute(
select([TIME_OVERVIEW]).where(
and_(
TIME_OVERVIEW.c.product_ref == product.id_,
TIME_OVERVIEW.c.start_day == start_day,
TIME_OVERVIEW.c.period_type == period,
TIME_OVERVIEW.c.regions.contains([region_code]),
func.cardinality(TIME_OVERVIEW.c.regions) == 1,
)
)
).fetchone()
else:
if self.get_product_all_regions(product.name, period, start_day):
"""
if the product contains region
"""
res = self._engine.execute(
select([TIME_OVERVIEW]).where(
and_(
TIME_OVERVIEW.c.product_ref == product.id_,
TIME_OVERVIEW.c.start_day == start_day,
TIME_OVERVIEW.c.period_type == period,
func.cardinality(TIME_OVERVIEW.c.regions) == len(
self.get_product_all_regions(product.name, period, start_day)
),
)
)
).fetchone()
else:
"""
if the product doesnt contain region
"""
res = self._engine.execute(
select([TIME_OVERVIEW]).where(
and_(
TIME_OVERVIEW.c.product_ref == product.id_,
TIME_OVERVIEW.c.start_day == start_day,
TIME_OVERVIEW.c.period_type == period,
)
).order_by(TIME_OVERVIEW.c.generation_time.desc())
).fetchone()

if not res:
return None
Expand Down Expand Up @@ -834,6 +876,13 @@ def get_dataset_type(self, name) -> DatasetType:
return d
raise KeyError(f"Unknown dataset type {name!r}")

@ttl_cache(ttl=DEFAULT_TTL)
def get_dataset_type_return_none(self, name) -> DatasetType:
for d in self.all_dataset_types():
if d.name == name:
return d
return None

@ttl_cache(ttl=DEFAULT_TTL)
def _dataset_type_by_id(self, id_) -> DatasetType:
for d in self.all_dataset_types():
Expand Down Expand Up @@ -1034,24 +1083,34 @@ def _put(
log.info("product.put")
product = self._product(summary.product_name)
period, start_day = summary.as_flat_period()
region_values, _ = _counter_key_vals(summary.region_dataset_counts)

row = _summary_to_row(summary)

import hashlib
import json
ret = self._engine.execute(
postgres.insert(TIME_OVERVIEW)
.returning(TIME_OVERVIEW.c.generation_time)
.on_conflict_do_update(
index_elements=["product_ref", "start_day", "period_type"],
index_elements=[
"product_ref", "start_day", "period_type",
"regions_hash"
],
set_=row,
where=and_(
TIME_OVERVIEW.c.product_ref == product.id_,
TIME_OVERVIEW.c.start_day == start_day,
TIME_OVERVIEW.c.period_type == period,
TIME_OVERVIEW.c.regions == region_values,
),
)
.values(
product_ref=product.id_, start_day=start_day, period_type=period, **row
product_ref=product.id_, start_day=start_day, period_type=period,
regions_hash=hashlib.sha224(json.dumps(region_values).encode("utf-8")).hexdigest(), **row
)
)

[gen_time] = ret.fetchone()
summary.summary_gen_time = gen_time

Expand Down Expand Up @@ -1313,13 +1372,15 @@ def _recalculate_period(
year: Optional[int] = None,
month: Optional[int] = None,
product_refresh_time: datetime = None,
region_code: str = None,
) -> TimePeriodOverview:
"""Recalculate the given period and store it in the DB"""
if year and month:
summary = self._summariser.calculate_summary(
product.name,
year_month_day=(year, month, None),
product_refresh_time=product_refresh_time,
region_code=region_code,
)
elif year:
summary = TimePeriodOverview.add_periods(
Expand All @@ -1329,19 +1390,34 @@ def _recalculate_period(
# Product. Does it have data?
elif product.dataset_count > 0:
summary = TimePeriodOverview.add_periods(
self.get(product.name, year_, None, None)
self.get(product.name, year_, None, None, region_code=None)
for year_ in range(
product.time_earliest.astimezone(timezone).year,
product.time_latest.astimezone(timezone).year + 1
)
)

if self.get_product_all_regions(product_name=product.name):
for region in self.get_product_all_regions(product_name=product.name):
region_summary = TimePeriodOverview.add_periods(
self.get(product.name, year_, None, None, region_code=region)
for year_ in range(
product.time_earliest.astimezone(timezone).year,
product.time_latest.astimezone(timezone).year + 1
)
)
region_summary.product_refresh_time = product_refresh_time
region_summary.period_tuple = (product.name, year, month, None)
self._put(region_summary)

else:
summary = TimePeriodOverview.empty(product.name)

summary.product_refresh_time = product_refresh_time
summary.period_tuple = (product.name, year, month, None)

self._put(summary)

for listener in self._update_listeners:
listener(
product_name=product.name,
Expand Down Expand Up @@ -1642,6 +1718,55 @@ def _region_summaries(self, product_name: str) -> Dict[str, RegionSummary]:
if geom is not None
}

def get_product_all_regions(self, product_name: str, period_type: str = None, start_day=None) -> List:
"""
return list of regions per date range
"""
dt = self.get_dataset_type_return_none(product_name)
if not dt:
return None
rows = self._engine.execute(
select(
[
REGION.c.region_code,
]
)
.where(REGION.c.dataset_type_ref == dt.id)
.order_by(REGION.c.region_code)
)

if period_type != 'all' and start_day:
year, month, day = TimePeriodOverview.from_flat_period_representation(
period_type, start_day
)
time = _utils.as_time_range(year, month, day)

begin_time = time.begin.replace(tzinfo=tz.gettz("Australia/Darwin"))
end_time = time.end.replace(tzinfo=tz.gettz("Australia/Darwin"))
rows = self._engine.execute(
select(
[
DATASET_SPATIAL.c.region_code
]
)
.where(
and_(
func.tstzrange(
begin_time, end_time, "[]", type_=TSTZRANGE
).contains(
DATASET_SPATIAL.c.center_time
),
DATASET_SPATIAL.c.dataset_type_ref == dt.id

)
)
.distinct()
)
if not rows:
return None

return [region["region_code"] for region in rows]

def get_product_region_info(self, product_name: str) -> RegionInfo:
return RegionInfo.for_product(
dataset_type=self.get_dataset_type(product_name),
Expand Down
53 changes: 36 additions & 17 deletions cubedash/summary/_summarise.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def calculate_summary(
product_name: str,
year_month_day: Tuple[Optional[int], Optional[int], Optional[int]],
product_refresh_time: datetime,
region_code: str = None,
) -> TimePeriodOverview:
"""
Create a summary of the given product/time range.
Expand All @@ -64,7 +65,7 @@ def calculate_summary(
log = self.log.bind(product_name=product_name, time=time)
log.debug("summary.query")

begin_time, end_time, where_clause = self._where(product_name, time)
begin_time, end_time, where_clause = self._where(product_name, time, region_code)
select_by_srid = (
select(
(
Expand Down Expand Up @@ -208,25 +209,43 @@ def _with_default_tz(self, d: datetime) -> datetime:
return d

def _where(
self, product_name: str, time: Range
self, product_name: str, time: Range, region: str = None,
) -> Tuple[datetime, datetime, ColumnElement]:
begin_time = self._with_default_tz(time.begin)
end_time = self._with_default_tz(time.end)
where_clause = and_(
func.tstzrange(begin_time, end_time, "[]", type_=TSTZRANGE).contains(
DATASET_SPATIAL.c.center_time
),
DATASET_SPATIAL.c.dataset_type_ref
== _scalar_subquery(
select([ODC_DATASET_TYPE.c.id]).where(
ODC_DATASET_TYPE.c.name == product_name
)
),
or_(
func.st_isvalid(DATASET_SPATIAL.c.footprint).is_(True),
func.st_isvalid(DATASET_SPATIAL.c.footprint).is_(None),
),
)
if region:
where_clause = and_(
func.tstzrange(begin_time, end_time, "[]", type_=TSTZRANGE).contains(
DATASET_SPATIAL.c.center_time
),
DATASET_SPATIAL.c.dataset_type_ref
== _scalar_subquery(
select([ODC_DATASET_TYPE.c.id]).where(
ODC_DATASET_TYPE.c.name == product_name
)
),
DATASET_SPATIAL.c.region_code == region,
or_(
func.st_isvalid(DATASET_SPATIAL.c.footprint).is_(True),
func.st_isvalid(DATASET_SPATIAL.c.footprint).is_(None),
),
)
else:
where_clause = and_(
func.tstzrange(begin_time, end_time, "[]", type_=TSTZRANGE).contains(
DATASET_SPATIAL.c.center_time
),
DATASET_SPATIAL.c.dataset_type_ref
== _scalar_subquery(
select([ODC_DATASET_TYPE.c.id]).where(
ODC_DATASET_TYPE.c.name == product_name
)
),
or_(
func.st_isvalid(DATASET_SPATIAL.c.footprint).is_(True),
func.st_isvalid(DATASET_SPATIAL.c.footprint).is_(None),
),
)
return begin_time, end_time, where_clause

@lru_cache() # noqa: B019
Expand Down
Loading