2
2
import math
3
3
from typing import Dict , List , NamedTuple , Optional , Union
4
4
5
- import pyproj
6
5
import shapely
7
- from shapely .geometry import shape
8
- from shapely .ops import transform
6
+ from shapely .geometry import MultiPolygon , Polygon
7
+
8
+ from openeo .util import normalize_crs
9
9
10
10
11
11
class JobSplittingFailure (Exception ):
12
12
pass
13
13
14
-
15
- # TODO: This function is also defined in openeo-python-driver. But maybe we want to avoid a dependency on openeo-python-driver?
16
- def reproject_bounding_box (bbox : Dict , from_crs : Optional [str ], to_crs : str ) -> Dict :
17
- """
18
- Reproject given bounding box dictionary
19
-
20
- :param bbox: bbox dict with fields "west", "south", "east", "north"
21
- :param from_crs: source CRS. Specify `None` to use the "crs" field of input bbox dict
22
- :param to_crs: target CRS
23
- :return: bbox dict (fields "west", "south", "east", "north", "crs")
24
- """
25
- box = shapely .geometry .box (bbox ["west" ], bbox ["south" ], bbox ["east" ], bbox ["north" ])
26
- if from_crs is None :
27
- from_crs = bbox ["crs" ]
28
- tranformer = pyproj .Transformer .from_crs (crs_from = from_crs , crs_to = to_crs , always_xy = True )
29
- reprojected = transform (tranformer .transform , box )
30
- return dict (zip (["west" , "south" , "east" , "north" ], reprojected .bounds ), crs = to_crs )
31
-
32
-
33
- # TODO: This class is also defined in openeo-aggregator. But maybe we want to avoid a dependency on openeo-aggregator?
34
- class BoundingBox (NamedTuple ):
14
+ class _BoundingBox (NamedTuple ):
35
15
"""Simple NamedTuple container for a bounding box"""
36
16
37
17
west : float
38
18
south : float
39
19
east : float
40
20
north : float
41
- crs : str = "EPSG: 4326"
21
+ crs : int = 4326
42
22
43
23
@classmethod
44
- def from_dict (cls , d : Dict ) -> "BoundingBox" :
24
+ def from_dict (cls , d : Dict ) -> "_BoundingBox" :
25
+ """Create a bounding box from a dictionary"""
26
+ if d .get ("crs" ) is not None :
27
+ d ["crs" ] = normalize_crs (d ["crs" ])
45
28
return cls (** {k : d [k ] for k in cls ._fields if k not in cls ._field_defaults or k in d })
46
29
47
30
@classmethod
48
- def from_polygon (cls , polygon : shapely .geometry .Polygon , projection : Optional [str ] = None ) -> "BoundingBox" :
49
- """Create a bounding box from a shapely Polygon"""
50
- return cls (* polygon .bounds , projection if projection is not None else cls .crs )
31
+ def from_polygon (cls , polygon : Union [MultiPolygon , Polygon ], crs : Optional [int ] = None ) -> "_BoundingBox" :
32
+ """Create a bounding box from a shapely Polygon or MultiPolygon"""
33
+ crs = normalize_crs (crs )
34
+ return cls (* polygon .bounds , crs = 4326 if crs is None else crs )
51
35
52
36
def as_dict (self ) -> Dict :
53
37
return self ._asdict ()
54
38
55
- def as_polygon (self ) -> shapely . geometry . Polygon :
39
+ def as_polygon (self ) -> Polygon :
56
40
"""Get bounding box as a shapely Polygon"""
57
41
return shapely .geometry .box (minx = self .west , miny = self .south , maxx = self .east , maxy = self .north )
58
42
59
43
60
- class TileGridInterface (metaclass = abc .ABCMeta ):
44
+ class _TileGridInterface (metaclass = abc .ABCMeta ):
61
45
"""Interface for tile grid classes"""
62
46
63
47
@abc .abstractmethod
64
- def get_tiles (self , geometry : Union [Dict , shapely . geometry . Polygon ]) -> List [Union [ Dict , shapely . geometry . Polygon ] ]:
48
+ def get_tiles (self , geometry : Union [Dict , MultiPolygon , Polygon ]) -> List [Polygon ]:
65
49
"""Calculate tiles to cover given bounding box"""
66
50
...
67
51
68
52
69
- class SizeBasedTileGrid ( TileGridInterface ):
53
+ class _SizeBasedTileGrid ( _TileGridInterface ):
70
54
"""
71
55
Specification of a tile grid, parsed from a size and a projection.
56
+ The size is in m for UTM projections or degrees for WGS84.
72
57
"""
73
58
74
- def __init__ (self , epsg : str , size : float ):
75
- self .epsg = epsg
59
+ def __init__ (self , epsg : int , size : float ):
60
+ self .epsg = normalize_crs ( epsg )
76
61
self .size = size
77
62
78
63
@classmethod
79
- def from_size_projection (cls , size : float , projection : str ) -> "SizeBasedTileGrid " :
64
+ def from_size_projection (cls , size : float , projection : str ) -> "_SizeBasedTileGrid " :
80
65
"""Create a tile grid from size and projection"""
81
- return cls (projection .lower (), size )
82
-
83
- def get_tiles (self , geometry : Union [Dict , shapely .geometry .Polygon ]) -> List [Union [Dict , shapely .geometry .Polygon ]]:
84
- if isinstance (geometry , dict ):
85
- bbox = BoundingBox .from_dict (geometry )
86
- bbox_crs = bbox .crs
87
- elif isinstance (geometry , shapely .geometry .Polygon ):
88
- bbox = BoundingBox .from_polygon (geometry , projection = self .epsg )
89
- bbox_crs = self .epsg
90
- else :
91
- raise JobSplittingFailure ("geometry must be a dict or a shapely.geometry.Polygon" )
92
-
93
- if self .epsg == "epsg:4326" :
94
- tile_size = self .size
95
- x_offset = 0
96
- else :
97
- tile_size = self .size * 1000
98
- x_offset = 500_000
99
-
100
- to_cover = BoundingBox .from_dict (reproject_bounding_box (bbox .as_dict (), from_crs = bbox_crs , to_crs = self .epsg ))
66
+ return cls (normalize_crs (projection ), size )
67
+
68
+ def _epsg_is_meters (self ) -> bool :
69
+ """Check if the projection unit is in meters. (EPSG:3857 or UTM)"""
70
+ return 32601 <= self .epsg <= 32660 or 32701 <= self .epsg <= 32760 or self .epsg == 3857
71
+
72
+ @staticmethod
73
+ def _split_bounding_box (to_cover : _BoundingBox , x_offset : float , tile_size : float ) -> List [Polygon ]:
74
+ """
75
+ Split a bounding box into tiles of given size and projection.
76
+ :param to_cover: bounding box dict with keys "west", "south", "east", "north", "crs"
77
+ :param x_offset: offset to apply to the west and east coordinates
78
+ :param tile_size: size of tiles in unit of measure of the projection
79
+ :return: list of tiles (polygons)
80
+ """
101
81
xmin = int (math .floor ((to_cover .west - x_offset ) / tile_size ))
102
82
xmax = int (math .ceil ((to_cover .east - x_offset ) / tile_size )) - 1
103
83
ymin = int (math .floor (to_cover .south / tile_size ))
@@ -106,31 +86,46 @@ def get_tiles(self, geometry: Union[Dict, shapely.geometry.Polygon]) -> List[Uni
106
86
tiles = []
107
87
for x in range (xmin , xmax + 1 ):
108
88
for y in range (ymin , ymax + 1 ):
109
- tile = BoundingBox (
110
- west = max (x * tile_size + x_offset , to_cover .west ),
111
- south = max (y * tile_size , to_cover .south ),
112
- east = min ((x + 1 ) * tile_size + x_offset , to_cover .east ),
113
- north = min ((y + 1 ) * tile_size , to_cover .north ),
114
- crs = self .epsg ,
89
+ tiles .append (
90
+ _BoundingBox (
91
+ west = max (x * tile_size + x_offset , to_cover .west ),
92
+ south = max (y * tile_size , to_cover .south ),
93
+ east = min ((x + 1 ) * tile_size + x_offset , to_cover .east ),
94
+ north = min ((y + 1 ) * tile_size , to_cover .north ),
95
+ ).as_polygon ()
115
96
)
116
97
117
- if isinstance (geometry , dict ):
118
- tiles .append (reproject_bounding_box (tile .as_dict (), from_crs = self .epsg , to_crs = bbox_crs ))
119
- else :
120
- tiles .append (tile .as_polygon ())
98
+ return tiles
99
+
100
+ def get_tiles (self , geometry : Union [Dict , MultiPolygon , Polygon ]) -> List [Polygon ]:
101
+ if isinstance (geometry , dict ):
102
+ bbox = _BoundingBox .from_dict (geometry )
103
+
104
+ elif isinstance (geometry , Polygon ) or isinstance (geometry , MultiPolygon ):
105
+ bbox = _BoundingBox .from_polygon (geometry , crs = self .epsg )
106
+
107
+ else :
108
+ raise JobSplittingFailure ("geometry must be a dict or a shapely.geometry.Polygon or MultiPolygon" )
109
+
110
+ x_offset = 500_000 if self ._epsg_is_meters () else 0
111
+
112
+ tiles = _SizeBasedTileGrid ._split_bounding_box (bbox , x_offset , self .size )
121
113
122
114
return tiles
123
115
124
116
125
117
def split_area (
126
- aoi : Union [Dict , shapely . geometry . Polygon ], projection = "EPSG:3857" , tile_size : float = 20 .0
127
- ) -> List [Union [ Dict , shapely . geometry . Polygon ] ]:
118
+ aoi : Union [Dict , MultiPolygon , Polygon ], projection : str = "EPSG:3857" , tile_size : float = 20_000 .0
119
+ ) -> List [Polygon ]:
128
120
"""
129
121
Split area of interest into tiles of given size and projection.
130
122
:param aoi: area of interest (bounding box or shapely polygon)
131
123
:param projection: projection to use for splitting. Default is web mercator (EPSG:3857)
132
- :param tile_size: size of tiles in km for UTM projections or degrees for WGS84
133
- :return: list of tiles (dicts with keys "west", "south", "east", "north", "crs" or shapely polygons). For dicts the original crs is preserved. For polygons the projection is set to the given projection .
124
+ :param tile_size: size of tiles in unit of measure of the projection
125
+ :return: list of tiles (polygons).
134
126
"""
135
- tile_grid = SizeBasedTileGrid .from_size_projection (tile_size , projection )
127
+ if isinstance (aoi , dict ):
128
+ projection = aoi .get ("crs" , projection )
129
+
130
+ tile_grid = _SizeBasedTileGrid .from_size_projection (tile_size , projection )
136
131
return tile_grid .get_tiles (aoi )
0 commit comments