Skip to content

First version of tile based job splitter #756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
136 changes: 136 additions & 0 deletions openeo/extra/job_management/job_splitting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import abc
import math
from typing import Dict, List, NamedTuple, Optional, Union

import pyproj
import shapely
from shapely.geometry import shape
from shapely.ops import transform


class JobSplittingFailure(Exception):
pass


# TODO: This function is also defined in openeo-python-driver. But maybe we want to avoid a dependency on openeo-python-driver?
def reproject_bounding_box(bbox: Dict, from_crs: Optional[str], to_crs: str) -> Dict:
"""
Reproject given bounding box dictionary

:param bbox: bbox dict with fields "west", "south", "east", "north"
:param from_crs: source CRS. Specify `None` to use the "crs" field of input bbox dict
:param to_crs: target CRS
:return: bbox dict (fields "west", "south", "east", "north", "crs")
"""
box = shapely.geometry.box(bbox["west"], bbox["south"], bbox["east"], bbox["north"])
if from_crs is None:
from_crs = bbox["crs"]
tranformer = pyproj.Transformer.from_crs(crs_from=from_crs, crs_to=to_crs, always_xy=True)
reprojected = transform(tranformer.transform, box)
return dict(zip(["west", "south", "east", "north"], reprojected.bounds), crs=to_crs)


# TODO: This class is also defined in openeo-aggregator. But maybe we want to avoid a dependency on openeo-aggregator?
class BoundingBox(NamedTuple):
"""Simple NamedTuple container for a bounding box"""

west: float
south: float
east: float
north: float
crs: str = "EPSG:4326"

@classmethod
def from_dict(cls, d: Dict) -> "BoundingBox":
return cls(**{k: d[k] for k in cls._fields if k not in cls._field_defaults or k in d})

@classmethod
def from_polygon(cls, polygon: shapely.geometry.Polygon, projection: Optional[str] = None) -> "BoundingBox":
"""Create a bounding box from a shapely Polygon"""
return cls(*polygon.bounds, projection if projection is not None else cls.crs)

def as_dict(self) -> Dict:
return self._asdict()

def as_polygon(self) -> shapely.geometry.Polygon:
"""Get bounding box as a shapely Polygon"""
return shapely.geometry.box(minx=self.west, miny=self.south, maxx=self.east, maxy=self.north)


class TileGridInterface(metaclass=abc.ABCMeta):
"""Interface for tile grid classes"""

@abc.abstractmethod
def get_tiles(self, geometry: Union[Dict, shapely.geometry.Polygon]) -> List[Union[Dict, shapely.geometry.Polygon]]:
"""Calculate tiles to cover given bounding box"""
...


class SizeBasedTileGrid(TileGridInterface):
"""
Specification of a tile grid, parsed from a size and a projection.
"""

def __init__(self, epsg: str, size: float):
self.epsg = epsg
self.size = size

@classmethod
def from_size_projection(cls, size: float, projection: str) -> "SizeBasedTileGrid":
"""Create a tile grid from size and projection"""
return cls(projection.lower(), size)

def get_tiles(self, geometry: Union[Dict, shapely.geometry.Polygon]) -> List[Union[Dict, shapely.geometry.Polygon]]:
if isinstance(geometry, dict):
bbox = BoundingBox.from_dict(geometry)
bbox_crs = bbox.crs
elif isinstance(geometry, shapely.geometry.Polygon):
bbox = BoundingBox.from_polygon(geometry, projection=self.epsg)
bbox_crs = self.epsg
else:
raise JobSplittingFailure("geometry must be a dict or a shapely.geometry.Polygon")

if self.epsg == "epsg:4326":
tile_size = self.size
x_offset = 0
else:
tile_size = self.size * 1000
x_offset = 500_000

to_cover = BoundingBox.from_dict(reproject_bounding_box(bbox.as_dict(), from_crs=bbox_crs, to_crs=self.epsg))
xmin = int(math.floor((to_cover.west - x_offset) / tile_size))
xmax = int(math.ceil((to_cover.east - x_offset) / tile_size)) - 1
ymin = int(math.floor(to_cover.south / tile_size))
ymax = int(math.ceil(to_cover.north / tile_size)) - 1

tiles = []
for x in range(xmin, xmax + 1):
for y in range(ymin, ymax + 1):
tile = BoundingBox(
west=max(x * tile_size + x_offset, to_cover.west),
south=max(y * tile_size, to_cover.south),
east=min((x + 1) * tile_size + x_offset, to_cover.east),
north=min((y + 1) * tile_size, to_cover.north),
crs=self.epsg,
)

if isinstance(geometry, dict):
tiles.append(reproject_bounding_box(tile.as_dict(), from_crs=self.epsg, to_crs=bbox_crs))
else:
tiles.append(tile.as_polygon())

return tiles


def split_area(
aoi: Union[Dict, shapely.geometry.Polygon], projection="EPSG:3857", tile_size: float = 20.0
) -> List[Union[Dict, shapely.geometry.Polygon]]:
"""
Split area of interest into tiles of given size and projection.
:param aoi: area of interest (bounding box or shapely polygon)
:param projection: projection to use for splitting. Default is web mercator (EPSG:3857)
:param tile_size: size of tiles in km for UTM projections or degrees for WGS84
:return: list of tiles (dicts with keys "west", "south", "east", "north", "crs" or shapely polygons). For dicts the original crs is preserved. For polygons the projection is set to the given projection.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For polygons the projection is set to the given projection.

I'm confused here: shapely polygons don't have a CRS attribute, so you can't set the projection, you can at most assume something

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposed wording change: For dicts the original crs is preserved. For polygons the projection is assumed to be the given tilegrid projection.

"""
tile_grid = SizeBasedTileGrid.from_size_projection(tile_size, projection)
return tile_grid.get_tiles(aoi)
242 changes: 242 additions & 0 deletions tests/extra/job_management/test_job_splitting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
import pytest
import shapely

from openeo.extra.job_management.job_splitting import (
BoundingBox,
JobSplittingFailure,
SizeBasedTileGrid,
reproject_bounding_box,
split_area,
)


@pytest.fixture
def mock_polygon_wgs():
return shapely.geometry.box(0.0, 0.0, 1.0, 1.0)


@pytest.fixture
def mock_polygon_utm():
return shapely.geometry.box(0.0, 0.0, 100_000.0, 100_000.0)


@pytest.fixture
def mock_dict_no_crs():
return {
"west": 0.0,
"south": 0.0,
"east": 1.0,
"north": 1.0,
}


@pytest.fixture
def mock_dict_with_crs_utm():
return {
"west": 0.0,
"south": 0.0,
"east": 100_000.0,
"north": 100_000.0,
"crs": "EPSG:3857",
}


@pytest.mark.parametrize(
["crs", "bbox"],
[
(
"EPSG:32631",
{"west": 640800, "south": 5676000, "east": 642200, "north": 5677000},
),
("EPSG:4326", {"west": 5.01, "south": 51.2, "east": 5.1, "north": 51.5}),
],
)
def test_reproject_bounding_box_same(crs, bbox):
reprojected = reproject_bounding_box(bbox, from_crs=crs, to_crs=crs)
assert reprojected == dict(crs=crs, **bbox)


def test_reproject_bounding_box():
bbox = {"west": 640800, "south": 5676000, "east": 642200.0, "north": 5677000.0}
reprojected = reproject_bounding_box(bbox, from_crs="EPSG:32631", to_crs="EPSG:4326")
assert reprojected == {
"west": pytest.approx(5.016118467277098),
"south": pytest.approx(51.217660146353246),
"east": pytest.approx(5.036548264535997),
"north": pytest.approx(51.22699369149726),
"crs": "EPSG:4326",
}


class TestBoundingBox:
def test_basic(self):
bbox = BoundingBox(1, 2, 3, 4)
assert bbox.west == 1
assert bbox.south == 2
assert bbox.east == 3
assert bbox.north == 4
assert bbox.crs == "EPSG:4326"

def test_from_dict(self):
bbox = BoundingBox.from_dict({"west": 1, "south": 2, "east": 3, "north": 4, "crs": "epsg:32633"})
assert (bbox.west, bbox.south, bbox.east, bbox.north) == (1, 2, 3, 4)
assert bbox.crs == "epsg:32633"

def test_from_dict_defaults(self):
bbox = BoundingBox.from_dict({"west": 1, "south": 2, "east": 3, "north": 4})
assert (bbox.west, bbox.south, bbox.east, bbox.north) == (1, 2, 3, 4)
assert bbox.crs == "EPSG:4326"

def test_from_dict_underspecified(self):
with pytest.raises(KeyError):
_ = BoundingBox.from_dict({"west": 1, "south": 2, "color": "red"})

def test_from_dict_overspecified(self):
bbox = BoundingBox.from_dict({"west": 1, "south": 2, "east": 3, "north": 4, "crs": "EPSG:4326", "color": "red"})
assert (bbox.west, bbox.south, bbox.east, bbox.north) == (1, 2, 3, 4)
assert bbox.crs == "EPSG:4326"

def test_from_polygon(self):
polygon = shapely.geometry.box(1, 2, 3, 4)
bbox = BoundingBox.from_polygon(polygon, projection="EPSG:4326")
assert (bbox.west, bbox.south, bbox.east, bbox.north) == (1, 2, 3, 4)
assert bbox.crs == "EPSG:4326"

def test_as_dict(self):
bbox = BoundingBox(1, 2, 3, 4)
assert bbox.as_dict() == {"west": 1, "south": 2, "east": 3, "north": 4, "crs": "EPSG:4326"}

def test_as_polygon(self):
bbox = BoundingBox(1, 2, 3, 4)
polygon = bbox.as_polygon()
assert isinstance(polygon, shapely.geometry.Polygon)
assert set(polygon.exterior.coords) == {(1, 2), (3, 2), (3, 4), (1, 4)}


class TestSizeBasedTileGrid:

def test_from_size_projection(self):
splitter = SizeBasedTileGrid.from_size_projection(0.1, "EPSG:4326")
assert splitter.epsg == "epsg:4326"
assert splitter.size == 0.1

def test_get_tiles_raises_exception(self):
"""test get_tiles when the input geometry is not a dict or shapely.geometry.Polygon"""
tile_grid = SizeBasedTileGrid.from_size_projection(0.1, "EPSG:4326")
with pytest.raises(JobSplittingFailure):
tile_grid.get_tiles("invalid_geometry")

def test_get_tiles_dict_returns_dict(self, mock_dict_no_crs):
"""test get_tiles when the input geometry dict returns a list of dicts"""
tile_grid = SizeBasedTileGrid.from_size_projection(0.1, "EPSG:4326")
tiles = tile_grid.get_tiles(mock_dict_no_crs)
assert isinstance(tiles, list)
assert all(isinstance(tile, dict) for tile in tiles)

def test_get_tiles_polygon_returns_polygon(self, mock_polygon_wgs):
"""test get_tiles when the input geometry is a polygon and the tile grid is in wgs"""
tile_grid = SizeBasedTileGrid.from_size_projection(0.1, "EPSG:4326")
tiles = tile_grid.get_tiles(mock_polygon_wgs)
assert isinstance(tiles, list)
assert all(isinstance(tile, shapely.geometry.Polygon) for tile in tiles)

def test_get_tiles_dict_no_crs_utm(self, mock_dict_no_crs):
"""test get_tiles when the input geometry dict has no crs and the tile grid is in utm"""
tile_grid = SizeBasedTileGrid.from_size_projection(20.0, "EPSG:3857")
tiles = tile_grid.get_tiles(mock_dict_no_crs)
assert tiles[0].get("crs") == "EPSG:4326"
assert len(tiles) == 36

def test_get_tiles_dict_no_crs_wgs(self, mock_dict_no_crs):
"""test get_tiles when the input geometry dict has no crs and the tile grid is in wgs"""
tile_grid = SizeBasedTileGrid.from_size_projection(0.1, "EPSG:4326")
tiles = tile_grid.get_tiles(mock_dict_no_crs)
assert tiles[0].get("crs") == "EPSG:4326"
assert len(tiles) == 100

def test_get_tiles_dict_with_crs_same(self, mock_dict_with_crs_utm):
"""test get_tiles when the input geometry dict and the tile grid have the same crs"""
tile_grid = SizeBasedTileGrid.from_size_projection(20.0, "EPSG:3857")
tiles = tile_grid.get_tiles(mock_dict_with_crs_utm)
assert tiles[0].get("crs") == "EPSG:3857"
assert len(tiles) == 25

def test_get_tiles_dict_with_crs_different(self, mock_dict_with_crs_utm):
"""test get_tiles when the input geometry dict and the tile grid have different crs. The original crs from the geometry should be preserved."""
tile_grid = SizeBasedTileGrid.from_size_projection(0.1, "EPSG:4326")
tiles = tile_grid.get_tiles(mock_dict_with_crs_utm)
assert tiles[0].get("crs") == "EPSG:3857"
assert len(tiles) == 81

def test_simple_get_tiles_dict(self, mock_dict_with_crs_utm):
"""test get_tiles when the the tile grid size is equal to the size of the input geometry. The original geometry should be returned."""
tile_grid = SizeBasedTileGrid.from_size_projection(100, "EPSG:3857")
tiles = tile_grid.get_tiles(mock_dict_with_crs_utm)
assert len(tiles) == 1
assert tiles[0] == mock_dict_with_crs_utm

def test_multiple_get_tile_dict(self, mock_dict_with_crs_utm):
"""test get_tiles when the the tile grid size is smaller than the size of the input geometry. The input geometry should be split into multiple tiles."""
tile_grid = SizeBasedTileGrid.from_size_projection(20, "EPSG:3857")
tiles = tile_grid.get_tiles(mock_dict_with_crs_utm)
assert len(tiles) == 25
assert tiles[0].get("crs") == "EPSG:3857"
assert tiles[0].get("west") == 0
assert tiles[0].get("south") == 0
assert tiles[0].get("east") == 20_000
assert tiles[0].get("north") == 20_000

def test_larger_get_tile_dict(self, mock_dict_with_crs_utm):
"""test get_tiles when the the tile grid size is larger than the size of the input geometry. The original geometry should be returned."""
tile_grid = SizeBasedTileGrid.from_size_projection(200, "EPSG:3857")
tiles = tile_grid.get_tiles(mock_dict_with_crs_utm)
assert len(tiles) == 1
assert tiles[0] == mock_dict_with_crs_utm

def test_get_tiles_polygon_utm(self, mock_polygon_utm):
"""test get_tiles when the input geometry is a polygon in wgs and the tile grid is in utm"""
tile_grid = SizeBasedTileGrid.from_size_projection(20.0, "EPSG:3857")
tiles = tile_grid.get_tiles(mock_polygon_utm)
assert isinstance(tiles, list)
assert all(isinstance(tile, shapely.geometry.Polygon) for tile in tiles)
assert len(tiles) == 25
assert tiles[0] == shapely.geometry.box(0.0, 0.0, 20_000.0, 20_000.0)

def test_get_tiles_polygon_wgs(self, mock_polygon_wgs):
"""test get_tiles when the input geometry is a polygon in wgs and the tile grid is in wgs"""
tile_grid = SizeBasedTileGrid.from_size_projection(0.1, "EPSG:4326")
tiles = tile_grid.get_tiles(mock_polygon_wgs)
assert isinstance(tiles, list)
assert all(isinstance(tile, shapely.geometry.Polygon) for tile in tiles)
assert len(tiles) == 100
assert tiles[0] == shapely.geometry.box(0.0, 0.0, 0.1, 0.1)

def test_simple_get_tiles_polygon(self, mock_polygon_utm):
"""test get_tiles when the the tile grid size is equal to the size of the input geometry. The original geometry should be returned."""
tile_grid = SizeBasedTileGrid.from_size_projection(100.0, "EPSG:3857")
tiles = tile_grid.get_tiles(mock_polygon_utm)
assert len(tiles) == 1
assert tiles[0] == mock_polygon_utm

def test_larger_get_tiles_polygon(self, mock_polygon_utm):
"""test get_tiles when the the tile grid size is larger than the size of the input geometry. The original geometry should be returned."""
tile_grid = SizeBasedTileGrid.from_size_projection(200.0, "EPSG:3857")
tiles = tile_grid.get_tiles(mock_polygon_utm)
assert len(tiles) == 1
assert tiles[0] == mock_polygon_utm


def test_split_area_default():
"""test split_area with default parameters"""
aoi = {"west": 0.0, "south": 0.0, "east": 20_000.0, "north": 20_000.0, "crs": "EPSG:3857"}
tiles = split_area(aoi)
assert len(tiles) == 1
assert tiles[0] == aoi


def test_split_area_custom():
"""test split_area with wgs projection"""
aoi = {"west": 0.0, "south": 0.0, "east": 1.0, "north": 1.0, "crs": "EPSG:4326"}
tiles = split_area(aoi, "EPSG:4326", 1.0)
assert len(tiles) == 1
assert tiles[0] == aoi