Skip to content
143 changes: 143 additions & 0 deletions openeo/extra/job_management/_job_splitting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import abc
import math
from typing import Dict, List, NamedTuple, Optional, Union

import shapely
from shapely.geometry import MultiPolygon, Polygon

from openeo.util import normalize_crs


class JobSplittingFailure(Exception):
pass


class _BoundingBox(NamedTuple):
"""Simple NamedTuple container for a bounding box"""

# TODO: this should be moved to more general utility module, and/or merged with existing BBoxDict

west: float
south: float
east: float
north: float
crs: int = 4326

@classmethod
def from_dict(cls, d: Dict) -> "_BoundingBox":
"""Create a bounding box from a dictionary"""
if d.get("crs") is not None:
d["crs"] = normalize_crs(d["crs"])
return cls(**{k: d[k] for k in cls._fields if k not in cls._field_defaults or k in d})

@classmethod
def from_polygon(cls, polygon: Union[MultiPolygon, Polygon], crs: Optional[int] = None) -> "_BoundingBox":
"""Create a bounding box from a shapely Polygon or MultiPolygon"""
crs = normalize_crs(crs)
return cls(*polygon.bounds, crs=4326 if crs is None else crs)

def as_dict(self) -> Dict:
return self._asdict()

def as_polygon(self) -> Polygon:
"""Get bounding box as a shapely Polygon"""
return shapely.geometry.box(minx=self.west, miny=self.south, maxx=self.east, maxy=self.north)


class _TileGridInterface(metaclass=abc.ABCMeta):
"""Interface for tile grid classes"""

@abc.abstractmethod
# TODO: is it intentional that this method returns a list of non-multi polygons even if the input can be multi-polygon?
# TODO: typehint states that geometry can be a dict too, but that is very liberal, it's probably just about bounding box kind of dicts?
def get_tiles(self, geometry: Union[Dict, MultiPolygon, Polygon]) -> List[Polygon]:
"""Calculate tiles to cover given bounding box"""
...


class _SizeBasedTileGrid(_TileGridInterface):
"""
Specification of a tile grid, parsed from a size and a projection.
The size is in m for UTM projections or degrees for WGS84.
"""

def __init__(self, *, epsg: int, size: float):
# TODO: normalize_crs does not necessarily return an int (could also be a WKT2 string, or even None), but further logic seems to assume it's an int
self.epsg = normalize_crs(epsg)
self.size = size

@classmethod
def from_size_projection(cls, *, size: float, projection: str) -> "_SizeBasedTileGrid":
"""Create a tile grid from size and projection"""
# TODO: the constructor also does normalize_crs, so this factory looks like overkill at the moment
return cls(epsg=normalize_crs(projection), size=size)

def _epsg_is_meters(self) -> bool:
"""Check if the projection unit is in meters. (EPSG:3857 or UTM)"""
# TODO: this is a bit misleading: this code just checks some EPSG ranges (UTM and 3857) and calls all the rest to be not in meters.
# It would be better to raise an exception on unknown EPSG codes than claiming they're not in meter
return 32601 <= self.epsg <= 32660 or 32701 <= self.epsg <= 32760 or self.epsg == 3857

@staticmethod
def _split_bounding_box(to_cover: _BoundingBox, x_offset: float, tile_size: float) -> List[Polygon]:
"""
Split a bounding box into tiles of given size and projection.
:param to_cover: bounding box dict with keys "west", "south", "east", "north", "crs"
:param x_offset: offset to apply to the west and east coordinates
:param tile_size: size of tiles in unit of measure of the projection
:return: list of tiles (polygons)
"""
xmin = int(math.floor((to_cover.west - x_offset) / tile_size))
xmax = int(math.ceil((to_cover.east - x_offset) / tile_size)) - 1
ymin = int(math.floor(to_cover.south / tile_size))
ymax = int(math.ceil(to_cover.north / tile_size)) - 1

tiles = []
for x in range(xmin, xmax + 1):
for y in range(ymin, ymax + 1):
tiles.append(
_BoundingBox(
west=max(x * tile_size + x_offset, to_cover.west),
south=max(y * tile_size, to_cover.south),
east=min((x + 1) * tile_size + x_offset, to_cover.east),
north=min((y + 1) * tile_size, to_cover.north),
).as_polygon()
)

return tiles

def get_tiles(self, geometry: Union[Dict, MultiPolygon, Polygon]) -> List[Polygon]:
if isinstance(geometry, dict):
bbox = _BoundingBox.from_dict(geometry)

elif isinstance(geometry, Polygon) or isinstance(geometry, MultiPolygon):
bbox = _BoundingBox.from_polygon(geometry, crs=self.epsg)

else:
raise JobSplittingFailure("geometry must be a dict or a shapely.geometry.Polygon or MultiPolygon")

# TODO: being a meter based EPSG does not imply that offset should be 500_000
x_offset = 500_000 if self._epsg_is_meters() else 0

tiles = _SizeBasedTileGrid._split_bounding_box(to_cover=bbox, x_offset=x_offset, tile_size=self.size)

return tiles


def split_area(
aoi: Union[Dict, MultiPolygon, Polygon], projection: str = "EPSG:3857", tile_size: float = 20_000.0
) -> List[Polygon]:
"""
Split area of interest into tiles of given size and projection.
:param aoi: area of interest (bounding box or shapely polygon)
:param projection: projection to use for splitting. Default is web mercator (EPSG:3857)
:param tile_size: size of tiles in unit of measure of the projection
:return: list of tiles (polygons).
"""
# TODO EPSG 3857 is probably not a good default projection. Probably better to make it a required parameter
if isinstance(aoi, dict):
# TODO: this possibly overwrites the given projection without the user noticing, making usage confusing
projection = aoi.get("crs", projection)

tile_grid = _SizeBasedTileGrid.from_size_projection(size=tile_size, projection=projection)
return tile_grid.get_tiles(aoi)
167 changes: 167 additions & 0 deletions tests/extra/job_management/test_job_splitting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import pytest
import shapely

from openeo.extra.job_management._job_splitting import (
JobSplittingFailure,
_BoundingBox,
_SizeBasedTileGrid,
split_area,
)

# TODO: using fixtures for these simple objects is a bit overkill, makes the test harder to follow, and undermines opportunity to parameterize

@pytest.fixture
def mock_polygon_wgs():
return shapely.geometry.box(0.0, 0.0, 1.0, 1.0)


@pytest.fixture
def mock_polygon_utm():
return shapely.geometry.box(0.0, 0.0, 100_000.0, 100_000.0)


@pytest.fixture
def mock_dict_no_crs():
return {
"west": 0.0,
"south": 0.0,
"east": 1.0,
"north": 1.0,
}


@pytest.fixture
def mock_dict_with_crs_utm():
return {
"west": 0.0,
"south": 0.0,
"east": 100_000.0,
"north": 100_000.0,
"crs": "EPSG:3857",
}
Comment thread
soxofaan marked this conversation as resolved.


class TestBoundingBox:
def test_basic(self):
bbox = _BoundingBox(1, 2, 3, 4)
assert bbox.west == 1
assert bbox.south == 2
assert bbox.east == 3
assert bbox.north == 4
assert bbox.crs == 4326

def test_from_dict(self):
bbox = _BoundingBox.from_dict({"west": 1, "south": 2, "east": 3, "north": 4, "crs": "epsg:32633"})
assert (bbox.west, bbox.south, bbox.east, bbox.north) == (1, 2, 3, 4)
assert bbox.crs == 32633

def test_from_dict_defaults(self):
bbox = _BoundingBox.from_dict({"west": 1, "south": 2, "east": 3, "north": 4})
assert (bbox.west, bbox.south, bbox.east, bbox.north) == (1, 2, 3, 4)
assert bbox.crs == 4326

def test_from_dict_underspecified(self):
with pytest.raises(KeyError):
_ = _BoundingBox.from_dict({"west": 1, "south": 2, "color": "red"})

def test_from_dict_overspecified(self):
bbox = _BoundingBox.from_dict(
{"west": 1, "south": 2, "east": 3, "north": 4, "crs": "EPSG:4326", "color": "red"}
)
assert (bbox.west, bbox.south, bbox.east, bbox.north) == (1, 2, 3, 4)
assert bbox.crs == 4326

def test_from_polygon(self):
polygon = shapely.geometry.box(1, 2, 3, 4)
bbox = _BoundingBox.from_polygon(polygon)
assert (bbox.west, bbox.south, bbox.east, bbox.north) == (1, 2, 3, 4)
assert bbox.crs == 4326

def test_as_dict(self):
bbox = _BoundingBox(1, 2, 3, 4)
assert bbox.as_dict() == {"west": 1, "south": 2, "east": 3, "north": 4, "crs": 4326}

def test_as_polygon(self):
bbox = _BoundingBox(1, 2, 3, 4)
polygon = bbox.as_polygon()
assert isinstance(polygon, shapely.geometry.Polygon)
assert set(polygon.exterior.coords) == {(1, 2), (3, 2), (3, 4), (1, 4)}


class TestSizeBasedTileGrid:
def test_from_size_projection(self):
splitter = _SizeBasedTileGrid.from_size_projection(size=0.1, projection="EPSG:4326")
assert splitter.epsg == 4326
assert splitter.size == 0.1

def test_get_tiles_raises_exception(self):
"""test get_tiles when the input geometry is not a dict or shapely.geometry.Polygon"""
tile_grid = _SizeBasedTileGrid.from_size_projection(size=0.1, projection="EPSG:4326")
with pytest.raises(JobSplittingFailure):
tile_grid.get_tiles("invalid_geometry")

def test_simple_get_tiles_dict(self, mock_dict_with_crs_utm, mock_polygon_utm):
"""test get_tiles when the the tile grid size is equal to the size of the input geometry. The original geometry should be returned as polygon."""
tile_grid = _SizeBasedTileGrid.from_size_projection(size=100_000, projection="EPSG:3857")
tiles = tile_grid.get_tiles(mock_dict_with_crs_utm)
assert len(tiles) == 1
assert tiles[0] == mock_polygon_utm

def test_multiple_get_tile_dict(self, mock_dict_with_crs_utm):
"""test get_tiles when the the tile grid size is smaller than the size of the input geometry. The input geometry should be split into multiple tiles."""
tile_grid = _SizeBasedTileGrid.from_size_projection(size=20_000, projection="EPSG:3857")
tiles = tile_grid.get_tiles(mock_dict_with_crs_utm)
assert len(tiles) == 25
assert tiles[0] == shapely.geometry.box(0.0, 0.0, 20_000.0, 20_000.0)

def test_larger_get_tile_dict(self, mock_dict_with_crs_utm, mock_polygon_utm):
"""test get_tiles when the the tile grid size is larger than the size of the input geometry. The original geometry should be returned."""
tile_grid = _SizeBasedTileGrid.from_size_projection(size=200_000, projection="EPSG:3857")
tiles = tile_grid.get_tiles(mock_dict_with_crs_utm)
assert len(tiles) == 1
assert tiles[0] == mock_polygon_utm

def test_get_tiles_polygon_wgs(self, mock_polygon_wgs):
"""test get_tiles when the input geometry is a polygon in wgs and the tile grid is in wgs"""
tile_grid = _SizeBasedTileGrid.from_size_projection(size=0.1, projection="EPSG:4326")
tiles = tile_grid.get_tiles(mock_polygon_wgs)
assert len(tiles) == 100
assert tiles[0] == shapely.geometry.box(0.0, 0.0, 0.1, 0.1)

def test_simple_get_tiles_polygon(self, mock_polygon_utm):
"""test get_tiles when the the tile grid size is equal to the size of the input geometry. The original geometry should be returned."""
tile_grid = _SizeBasedTileGrid.from_size_projection(size=100_000.0, projection="EPSG:3857")
tiles = tile_grid.get_tiles(mock_polygon_utm)
assert len(tiles) == 1
assert tiles[0] == mock_polygon_utm

def test_larger_get_tiles_polygon(self, mock_polygon_utm):
"""test get_tiles when the the tile grid size is larger than the size of the input geometry. The original geometry should be returned."""
tile_grid = _SizeBasedTileGrid.from_size_projection(size=200_000.0, projection="EPSG:3857")
tiles = tile_grid.get_tiles(mock_polygon_utm)
assert len(tiles) == 1
assert tiles[0] == mock_polygon_utm


def test_split_area_default():
"""test split_area with default parameters"""
aoi = {"west": 0.0, "south": 0.0, "east": 20_000.0, "north": 20_000.0, "crs": "EPSG:3857"}
tiles = split_area(aoi)
assert len(tiles) == 1
assert tiles[0] == shapely.geometry.box(0.0, 0.0, 20_000.0, 20_000.0)


def test_split_area_custom():
"""test split_area with wgs projection"""
aoi = {"west": 0.0, "south": 0.0, "east": 1.0, "north": 1.0, "crs": "EPSG:4326"}
tiles = split_area(aoi, "EPSG:4326", 1.0)
assert len(tiles) == 1
assert tiles[0] == shapely.geometry.box(0.0, 0.0, 1.0, 1.0)


def test_split_area_custom_no_crs_specified():
"""test split_area with crs in dict, but not in split_area. The crs in the dict should be used."""
aoi = {"west": 0.0, "south": 0.0, "east": 1.0, "north": 1.0, "crs": "EPSG:4326"}
tiles = split_area(aoi=aoi, tile_size=1.0)
assert len(tiles) == 1
assert tiles[0] == shapely.geometry.box(0.0, 0.0, 1.0, 1.0)