-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathjob_splitting.py
131 lines (101 loc) · 4.8 KB
/
job_splitting.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import abc
import math
from typing import Dict, List, NamedTuple, Optional, Union
import shapely
from shapely.geometry import MultiPolygon, Polygon
from openeo.util import normalize_crs
class JobSplittingFailure(Exception):
pass
class _BoundingBox(NamedTuple):
"""Simple NamedTuple container for a bounding box"""
west: float
south: float
east: float
north: float
crs: int = 4326
@classmethod
def from_dict(cls, d: Dict) -> "_BoundingBox":
"""Create a bounding box from a dictionary"""
if d.get("crs") is not None:
d["crs"] = normalize_crs(d["crs"])
return cls(**{k: d[k] for k in cls._fields if k not in cls._field_defaults or k in d})
@classmethod
def from_polygon(cls, polygon: Union[MultiPolygon, Polygon], crs: Optional[int] = None) -> "_BoundingBox":
"""Create a bounding box from a shapely Polygon or MultiPolygon"""
crs = normalize_crs(crs)
return cls(*polygon.bounds, crs=4326 if crs is None else crs)
def as_dict(self) -> Dict:
return self._asdict()
def as_polygon(self) -> Polygon:
"""Get bounding box as a shapely Polygon"""
return shapely.geometry.box(minx=self.west, miny=self.south, maxx=self.east, maxy=self.north)
class _TileGridInterface(metaclass=abc.ABCMeta):
"""Interface for tile grid classes"""
@abc.abstractmethod
def get_tiles(self, geometry: Union[Dict, MultiPolygon, Polygon]) -> List[Polygon]:
"""Calculate tiles to cover given bounding box"""
...
class _SizeBasedTileGrid(_TileGridInterface):
"""
Specification of a tile grid, parsed from a size and a projection.
The size is in m for UTM projections or degrees for WGS84.
"""
def __init__(self, epsg: int, size: float):
self.epsg = normalize_crs(epsg)
self.size = size
@classmethod
def from_size_projection(cls, size: float, projection: str) -> "_SizeBasedTileGrid":
"""Create a tile grid from size and projection"""
return cls(normalize_crs(projection), size)
def _epsg_is_meters(self) -> bool:
"""Check if the projection unit is in meters. (EPSG:3857 or UTM)"""
return 32601 <= self.epsg <= 32660 or 32701 <= self.epsg <= 32760 or self.epsg == 3857
@staticmethod
def _split_bounding_box(to_cover: _BoundingBox, x_offset: float, tile_size: float) -> List[Polygon]:
"""
Split a bounding box into tiles of given size and projection.
:param to_cover: bounding box dict with keys "west", "south", "east", "north", "crs"
:param x_offset: offset to apply to the west and east coordinates
:param tile_size: size of tiles in unit of measure of the projection
:return: list of tiles (polygons)
"""
xmin = int(math.floor((to_cover.west - x_offset) / tile_size))
xmax = int(math.ceil((to_cover.east - x_offset) / tile_size)) - 1
ymin = int(math.floor(to_cover.south / tile_size))
ymax = int(math.ceil(to_cover.north / tile_size)) - 1
tiles = []
for x in range(xmin, xmax + 1):
for y in range(ymin, ymax + 1):
tiles.append(
_BoundingBox(
west=max(x * tile_size + x_offset, to_cover.west),
south=max(y * tile_size, to_cover.south),
east=min((x + 1) * tile_size + x_offset, to_cover.east),
north=min((y + 1) * tile_size, to_cover.north),
).as_polygon()
)
return tiles
def get_tiles(self, geometry: Union[Dict, MultiPolygon, Polygon]) -> List[Polygon]:
if isinstance(geometry, dict):
bbox = _BoundingBox.from_dict(geometry)
elif isinstance(geometry, Polygon) or isinstance(geometry, MultiPolygon):
bbox = _BoundingBox.from_polygon(geometry, crs=self.epsg)
else:
raise JobSplittingFailure("geometry must be a dict or a shapely.geometry.Polygon or MultiPolygon")
x_offset = 500_000 if self._epsg_is_meters() else 0
tiles = _SizeBasedTileGrid._split_bounding_box(bbox, x_offset, self.size)
return tiles
def split_area(
aoi: Union[Dict, MultiPolygon, Polygon], projection: str = "EPSG:3857", tile_size: float = 20_000.0
) -> List[Polygon]:
"""
Split area of interest into tiles of given size and projection.
:param aoi: area of interest (bounding box or shapely polygon)
:param projection: projection to use for splitting. Default is web mercator (EPSG:3857)
:param tile_size: size of tiles in unit of measure of the projection
:return: list of tiles (polygons).
"""
if isinstance(aoi, dict):
projection = aoi.get("crs", projection)
tile_grid = _SizeBasedTileGrid.from_size_projection(tile_size, projection)
return tile_grid.get_tiles(aoi)