1
1
import json
2
2
import logging
3
3
from concurrent .futures import ThreadPoolExecutor , as_completed
4
+ from typing import Optional
5
+
6
+ from geopandas import GeoDataFrame
4
7
5
8
from geospatial_tools import DATA_DIR
6
9
from geospatial_tools .stac import PLANETARY_COMPUTER , StacSearch
11
14
12
15
13
16
class BestProductsForFeatures :
17
+ """
18
+ Class made to facilitate and automate searching for Sentinel 2 products using the Sentinel 2 tiling grid as a
19
+ reference.
20
+
21
+ Current limitation is that vector features used must fit, or be completely contained
22
+ inside a single Sentinel 2 tiling grid.
23
+
24
+ For larger features, a mosaic of products will be necessary.
25
+
26
+ This class was conceived first and foremost to be used for numerous smaller vector
27
+ features, like polygon grids created from
28
+ `geospatial_tools.vector.create_vector_grid`
29
+ """
30
+
14
31
def __init__ (
15
32
self ,
16
- sentinel2_tiling_grid ,
17
- sentinel2_tiling_grid_column ,
18
- vector_features ,
19
- vector_features_column ,
20
- date_range = None ,
21
- max_cloud_cover = None ,
33
+ sentinel2_tiling_grid : GeoDataFrame ,
34
+ sentinel2_tiling_grid_column : str ,
35
+ vector_features : GeoDataFrame ,
36
+ vector_features_column : str ,
37
+ date_ranges : list [ str ] = None ,
38
+ max_cloud_cover : int = None ,
22
39
logger : logging .Logger = LOGGER ,
23
40
):
41
+ """
42
+
43
+ Parameters
44
+ ----------
45
+ sentinel2_tiling_grid
46
+ GeoDataFrame containing Sentinel 2 tiling grid
47
+ sentinel2_tiling_grid_column
48
+ Name of the column in `sentinel2_tiling_grid` that contains the tile names
49
+ (ex tile name: 10SDJ)
50
+ vector_features
51
+ GeoDataFrame containing the vector features for which the best Sentinel 2
52
+ products will be chosen for.
53
+ vector_features_column
54
+ Name of the column in `vector_features` where the best Sentinel 2 products
55
+ will be written to
56
+ date_ranges
57
+ Date range used to search for Sentinel 2 products. should be created using
58
+ `geospatial_tools.utils.create_date_range_for_specific_period` separately,
59
+ or `BestProductsForFeatures.create_date_range` after initialization.
60
+ max_cloud_cover
61
+ Maximum cloud cover used to search for Sentinel 2 products.
62
+ logger
63
+ Logger instance
64
+ """
24
65
self .logger = logger
25
66
self .sentinel2_tiling_grid = sentinel2_tiling_grid
26
67
self .sentinel2_tiling_grid_column = sentinel2_tiling_grid_column
@@ -30,51 +71,104 @@ def __init__(
30
71
self .vector_features_best_product_column = "best_s2_product_id"
31
72
self .vector_features_with_products = None
32
73
self .search_client = StacSearch (PLANETARY_COMPUTER )
33
- self ._date_range = date_range
74
+ self ._date_ranges = date_ranges
34
75
self ._max_cloud_cover = max_cloud_cover
35
- self .tile_dict = {}
36
- self .error_list = {}
76
+ self .successful_results = {}
77
+ self .incomplete_results = []
78
+ self .error_results = []
37
79
38
80
@property
39
81
def max_cloud_cover (self ):
40
82
return self ._max_cloud_cover
41
83
42
84
@max_cloud_cover .setter
43
- def max_cloud_cover (self , max_cloud_cover ):
85
+ def max_cloud_cover (self , max_cloud_cover : int ):
44
86
self ._max_cloud_cover = max_cloud_cover
45
87
46
88
@property
47
- def date_range (self ):
48
- return self ._date_range
89
+ def date_ranges (self ):
90
+ return self ._date_ranges
91
+
92
+ @date_ranges .setter
93
+ def date_ranges (self , date_range : list [str ]):
94
+ self ._date_ranges = date_range
49
95
50
- @ date_range . setter
51
- def date_range ( self , date_range ):
52
- self . _date_range = date_range
96
+ def create_date_ranges ( self , start_year : int , end_year : int , start_month : int , end_month : int ) -> list [ str ]:
97
+ """
98
+ This function create a list of date ranges.
53
99
54
- def create_date_range (self , start_year , end_year , start_month , end_month ):
55
- self .date_range = create_date_range_for_specific_period (
100
+ For example, I want to create date ranges for 2020 and 2021, but only for the months from March to May.
101
+ I therefore expect to have 2 ranges: [2020-03-01 to 2020-05-30, 2021-03-01 to 2021-05-30].
102
+
103
+ Handles the automatic definition of the last day for the end month, as well as periods that cross over years
104
+
105
+ For example, I want to create date ranges for 2020 and 2022, but only for the months from November to January.
106
+ I therefore expect to have 2 ranges: [2020-11-01 to 2021-01-31, 2021-11-01 to 2022-01-31].
107
+
108
+ Parameters
109
+ ----------
110
+ start_year
111
+ Start year for ranges
112
+ end_year
113
+ End year for ranges
114
+ start_month
115
+ Starting month for each period
116
+ end_month
117
+ End month for each period (inclusively)
118
+
119
+ Returns
120
+ -------
121
+ List containing datetime date ranges
122
+ """
123
+ self .date_ranges = create_date_range_for_specific_period (
56
124
start_year = start_year , end_year = end_year , start_month_range = start_month , end_month_range = end_month
57
125
)
126
+ return self .date_ranges
127
+
128
+ def find_best_complete_products (self ) -> dict :
129
+ """
130
+ Finds the best complete products for each Sentinel 2 tiles. This function will filter out all products that have
131
+ more than 5% of nodata values.
132
+
133
+ Filtered out tiles will be stored in `self.incomplete` and tiles for which
134
+ the search has found no results will be stored in `self.error_list`
58
135
59
- def find_best_products (self ):
60
- tile_dict , error_list = find_best_product_per_s2_tile (
61
- date_ranges = self .date_range ,
136
+ Returns
137
+ -------
138
+ tile_dict:
139
+ Tile dictionary containing the successful search results.
140
+ """
141
+ tile_dict , incomplete_list , error_list = find_best_product_per_s2_tile (
142
+ date_ranges = self .date_ranges ,
62
143
max_cloud_cover = self .max_cloud_cover ,
63
144
s2_tile_grid_list = self .sentinel2_tile_list ,
64
145
num_of_workers = 4 ,
65
146
search_client = self .search_client ,
66
147
)
67
- self .tile_dict = tile_dict
68
- self .error_list = error_list
148
+ self .successful_results = tile_dict
149
+ self .incomplete_results = incomplete_list
150
+ if incomplete_list :
151
+ self .logger .warning (
152
+ "Warning, some of the input Sentinel 2 tiles do not have products covering the entire tile. "
153
+ "These tiles will need to be handled differently (ex. creating a mosaic with multiple products"
154
+ )
155
+ self .logger .warning (f"Incomplete list: { incomplete_list } " )
156
+ self .error_results = error_list
69
157
if error_list :
70
158
self .logger .warning (
71
159
"Warning, products for some Sentinel 2 tiles could not be found. "
72
160
"Consider either extending date range input or max cloud cover"
73
161
)
74
162
self .logger .warning (f"Error list: { error_list } " )
75
- return self .tile_dict
163
+ return self .successful_results
164
+
165
+ def select_best_products_per_feature (self ) -> GeoDataFrame :
166
+ """
76
167
77
- def select_best_products_per_feature (self ):
168
+ Returns
169
+ -------
170
+
171
+ """
78
172
spatial_join_results = spatial_join_within (
79
173
polygon_features = self .sentinel2_tiling_grid ,
80
174
polygon_column = self .sentinel2_tiling_grid_column ,
@@ -83,15 +177,29 @@ def select_best_products_per_feature(self):
83
177
)
84
178
write_best_product_ids_to_dataframe (
85
179
spatial_join_results = spatial_join_results ,
86
- tile_dictionary = self .tile_dict ,
180
+ tile_dictionary = self .successful_results ,
87
181
best_product_column = self .vector_features_best_product_column ,
88
182
s2_tiles_column = self .vector_features_column ,
89
183
)
90
184
self .vector_features_with_products = spatial_join_results
91
185
return self .vector_features_with_products
92
186
187
+ def to_file (self ):
188
+ write_results_to_file (
189
+ cloud_cover = self .max_cloud_cover ,
190
+ successful_results = self .successful_results ,
191
+ incomplete_results = self .incomplete_results ,
192
+ error_results = self .error_results ,
193
+ )
194
+
93
195
94
- def sentinel_2_tile_search (tile_id , date_ranges , max_cloud_cover , search_client = None ):
196
+ def sentinel_2_complete_tile_search (
197
+ tile_id : int ,
198
+ date_ranges : list [str ],
199
+ max_cloud_cover : int ,
200
+ max_no_data_value : int = 5 ,
201
+ search_client : StacSearch = None ,
202
+ ) -> tuple [int , str , Optional [float ]]:
95
203
client = search_client
96
204
if client is None :
97
205
client = StacSearch (PLANETARY_COMPUTER )
@@ -105,24 +213,37 @@ def sentinel_2_tile_search(tile_id, date_ranges, max_cloud_cover, search_client=
105
213
)
106
214
try :
107
215
sorted_items = client .sort_results_by_cloud_coverage ()
108
- optimal_result = sorted_items [0 ]
109
- return tile_id , optimal_result .id , optimal_result .properties ["eo:cloud_cover" ]
216
+ if not sorted_items :
217
+ return tile_id , "error: No results found" , None
218
+ optimal_result = None
219
+ for item in sorted_items :
220
+ if item .properties ["s2:nodata_pixel_percentage" ] < max_no_data_value :
221
+ optimal_result = item
222
+ return tile_id , optimal_result .id , optimal_result .properties ["eo:cloud_cover" ]
223
+ if not optimal_result :
224
+ return tile_id , "incomplete: No results found that cover the entire tile" , None
225
+
110
226
except (IndexError , TypeError ) as error :
111
227
print (error )
112
228
return tile_id , f"error: { error } " , None
113
229
114
230
115
231
def find_best_product_per_s2_tile (
116
- date_ranges , max_cloud_cover , s2_tile_grid_list , num_of_workers = 4 , search_client = None
232
+ date_ranges : list [str ],
233
+ max_cloud_cover : int ,
234
+ s2_tile_grid_list : list ,
235
+ num_of_workers : int = 4 ,
236
+ search_client : StacSearch = None ,
117
237
):
118
- tile_dict = {}
238
+ successful_results = {}
119
239
for tile in s2_tile_grid_list :
120
- tile_dict [tile ] = ""
121
- error_list = []
240
+ successful_results [tile ] = ""
241
+ incomplete_results = []
242
+ error_results = []
122
243
with ThreadPoolExecutor (max_workers = num_of_workers ) as executor :
123
244
future_to_tile = {
124
245
executor .submit (
125
- sentinel_2_tile_search ,
246
+ sentinel_2_complete_tile_search ,
126
247
tile_id = tile ,
127
248
date_ranges = date_ranges ,
128
249
max_cloud_cover = max_cloud_cover ,
@@ -133,49 +254,85 @@ def find_best_product_per_s2_tile(
133
254
134
255
for future in as_completed (future_to_tile ):
135
256
tile_id , optimal_result_id , max_cloud_cover = future .result ()
136
- tile_dict [tile_id ] = {"id" : optimal_result_id , "cloud_cover" : max_cloud_cover }
137
257
if optimal_result_id .startswith ("error:" ):
138
- error_list .append (tile_id )
139
- return tile_dict , error_list
258
+ error_results .append (tile_id )
259
+ continue
260
+ if optimal_result_id .startswith ("incomplete:" ):
261
+ incomplete_results .append (tile_id )
262
+ continue
263
+ successful_results [tile_id ] = {"id" : optimal_result_id , "cloud_cover" : max_cloud_cover }
264
+ cleaned_successful_results = {k : v for k , v in successful_results .items () if v != "" }
265
+ return cleaned_successful_results , incomplete_results , error_results
140
266
141
267
142
- def _get_best_product_id_for_each_grid_tile (s2_tile_search_results , feature_s2_tiles ):
143
- print (f"s2_tiles_search_results: { s2_tile_search_results } " )
144
- print (f"feature_s2_tiles: { feature_s2_tiles } " )
145
- if len (feature_s2_tiles ) == 1 :
146
- s2_product_id = s2_tile_search_results [feature_s2_tiles [0 ]]["id" ]
147
- return s2_product_id
268
+ def _get_best_product_id_for_each_grid_tile (
269
+ s2_tile_search_results : dict , feature_s2_tiles : GeoDataFrame , logger : logging .Logger = LOGGER
270
+ ) -> Optional [str ]:
271
+ search_result_keys = s2_tile_search_results .keys ()
272
+ all_keys_present = all (item in search_result_keys for item in feature_s2_tiles )
273
+ if not all_keys_present :
274
+ logger .warning (
275
+ f"Missmatch between search results and required tiles: [{ feature_s2_tiles } ] "
276
+ f"not all found in [{ search_result_keys } ]"
277
+ f"\n \t Only partial results are available; skipping"
278
+ )
279
+ return None
148
280
149
- relevant_results = {k : s2_tile_search_results [k ] for k in feature_s2_tiles if k in s2_tile_search_results }
150
- print (f"relevant_results: { relevant_results } " )
151
- best_s2_tile = min (relevant_results , key = lambda k : relevant_results [k ]["cloud_cover" ])
152
- print (f"best_s2_tile: { best_s2_tile } " )
153
- s2_product_id = relevant_results [best_s2_tile ]["id" ]
154
- print (f"s2_product_id: { s2_product_id } " )
155
- return s2_product_id
281
+ try :
282
+ if len (feature_s2_tiles ) == 1 :
283
+ s2_product_id = s2_tile_search_results [feature_s2_tiles [0 ]]["id" ]
284
+ return s2_product_id
285
+ relevant_results = {k : s2_tile_search_results [k ] for k in feature_s2_tiles if k in s2_tile_search_results }
286
+ best_s2_tile = min (relevant_results , key = lambda k : relevant_results [k ]["cloud_cover" ])
287
+ s2_product_id = relevant_results [best_s2_tile ]["id" ]
288
+ return s2_product_id
289
+ except KeyError as error :
290
+ logger .warning (error )
291
+ logger .warning ("No products found" )
292
+ return None
156
293
157
294
158
295
def write_best_product_ids_to_dataframe (
159
- spatial_join_results , tile_dictionary , best_product_column = "best_s2_product_id" , s2_tiles_column = "s2_tiles"
296
+ spatial_join_results : GeoDataFrame ,
297
+ tile_dictionary : dict ,
298
+ best_product_column : str = "best_s2_product_id" ,
299
+ s2_tiles_column : str = "s2_tiles" ,
300
+ logger : logging .Logger = LOGGER ,
160
301
):
302
+ logger .info ("Writing best product IDs to dataframe" )
161
303
spatial_join_results [best_product_column ] = spatial_join_results [s2_tiles_column ].apply (
162
304
lambda x : _get_best_product_id_for_each_grid_tile (s2_tile_search_results = tile_dictionary , feature_s2_tiles = x )
163
305
)
164
306
165
307
166
- def write_results_to_file (cloud_cover , tile_dictionary , error_list = None ):
308
+ def write_results_to_file (
309
+ cloud_cover : int , successful_results : dict , incomplete_results : list = None , error_results : list = None
310
+ ) -> dict :
167
311
tile_filename = DATA_DIR / f"data_lt{ cloud_cover } cc.json"
168
312
with open (tile_filename , "w" , encoding = "utf-8" ) as json_file :
169
- json .dump (tile_dictionary , json_file , indent = 4 )
313
+ json .dump (successful_results , json_file , indent = 4 )
170
314
print (f"Results have been written to { tile_filename } " )
171
315
316
+ incomplete_filename = "None"
317
+ if incomplete_results :
318
+ print (incomplete_results )
319
+ incomplete_dict = {"incomplete" : incomplete_results }
320
+ incomplete_filename = DATA_DIR / f"incomplete_lt{ cloud_cover } cc.json"
321
+ with open (incomplete_filename , "w" , encoding = "utf-8" ) as json_file :
322
+ json .dump (incomplete_dict , json_file , indent = 4 )
323
+ print (f"Incomplete results have been written to { incomplete_filename } " )
324
+
172
325
error_filename = "None"
173
- if error_list :
174
- print (error_list )
175
- error_dict = {"errors" : error_list }
326
+ if error_results :
327
+ print (error_results )
328
+ error_dict = {"errors" : error_results }
176
329
error_filename = DATA_DIR / f"errors_lt{ cloud_cover } cc.json"
177
330
with open (error_filename , "w" , encoding = "utf-8" ) as json_file :
178
331
json .dump (error_dict , json_file , indent = 4 )
179
- print (f"Errors have been written to { error_filename } " )
332
+ print (f"Errors results have been written to { error_filename } " )
180
333
181
- return {"tile_filename" : tile_filename , "errors_filename" : error_filename }
334
+ return {
335
+ "tile_filename" : tile_filename ,
336
+ "incomplete_filename" : incomplete_filename ,
337
+ "errors_filename" : error_filename ,
338
+ }
0 commit comments