1
1
from __future__ import annotations
2
2
from typing import TYPE_CHECKING , Tuple , List
3
3
import os
4
- import subprocess
4
+ import tarfile
5
5
import pandas as pd
6
6
import xarray as xr
7
7
import numpy as np
@@ -99,7 +99,7 @@ def __download_era5_from_cds(self,start_time, end_time, lon, lat, variable, fol
99
99
Downloads ERA5 data from the Copernicus Climate Data Store for a
100
100
given point and time period
101
101
"""
102
- import cdsapi
102
+ import cdsapi #Optional dependency
103
103
start_time = pd .Timestamp (start_time )
104
104
end_time = pd .Timestamp (end_time )
105
105
c = cdsapi .Client ()
@@ -164,19 +164,15 @@ def import_data(self, ts: TimeSeries, save_csv=True, save_nc=False, use_cache=Fa
164
164
"tidal_elevation" ,
165
165
"total_water_level" ,
166
166
]
167
- filenames = self .__download_gtsm_from_cds (ts .start_time , ts .end_time , ts .variable , folder = 'cache' )
167
+ filenames = self .__download_gtsm_from_cds (ts .start_time , ts .end_time , ts .variable , folder = 'cache' , use_cache = use_cache )
168
168
169
169
if not isinstance (filenames , list ):
170
170
filenames = [filenames ]
171
171
172
172
all_nc_files = []
173
173
for filename in filenames :
174
- temppath = os .path .dirname (filename )
175
174
# Unpack the tar.gz file.
176
- nc_files = subprocess .run (['tar' , '-ztf' , filename ], stdout = subprocess .PIPE , check = True ).stdout .decode ('utf-8' ).split ('\n ' )[0 :- 1 ]
177
- nc_files = sorted ([ff .strip ('\r ' ) for ff in nc_files ])
178
- subprocess .run (['tar' , '-xzvf' , filename , '--directory' , temppath ], stdout = subprocess .PIPE , check = True ) # Extract tar file
179
- all_nc_files .extend ([os .path .join (temppath , file ) for file in nc_files ])
175
+ all_nc_files .extend (self .__unpack_files (filename ,use_cache ))
180
176
181
177
# Open multiple netCDF files as a single xarray dataset
182
178
with xr .open_mfdataset (all_nc_files ) as ds :
@@ -193,19 +189,37 @@ def import_data(self, ts: TimeSeries, save_csv=True, save_nc=False, use_cache=Fa
193
189
194
190
return df
195
191
196
- def __download_gtsm_from_cds (self ,start_time , end_time , variable , folder = 'cache' ) -> str :
192
+ def __unpack_files (self , filename ,use_cache ) -> List [str ]:
193
+ temppath = os .path .dirname (filename )
194
+ with tarfile .open (filename , "r:*" ) as tar :
195
+ try :
196
+ files_from_tar = []
197
+ for file in tar .getmembers ():
198
+ if file .name .endswith (".nc" ):
199
+ nc_file = os .path .join (temppath , file .name )
200
+ if use_cache and os .path .exists (nc_file ):
201
+ print (f"Reusing cached file { nc_file } " )
202
+ else :
203
+ print (f"Extracting { file .name } from { filename } " )
204
+ tar .extract (file , path = temppath )
205
+ files_from_tar .append (nc_file )
206
+ return files_from_tar
207
+ except Exception as e :
208
+ # Seen with corrupted tar files from CDS
209
+ raise RuntimeError (f"Error extracting { filename } : { e } , consider deleting the file to download it again" ) from e
210
+
211
+ def __download_gtsm_from_cds (self ,start_time , end_time , variable , folder = 'cache' ,use_cache = True ) -> List [str ]:
197
212
"""
198
213
Downloads GTSM model water level data from the Copernicus Climate Data Store for a
199
214
given point and time period
200
215
"""
201
- import cdsapi
216
+ import cdsapi #Optional dependency
202
217
filename = []
203
218
filename_list = []
204
219
start_time = pd .Timestamp (start_time )
205
220
end_time = pd .Timestamp (end_time )
206
221
c = cdsapi .Client ()
207
222
208
-
209
223
days = pd .date_range (start = start_time , end = end_time , freq = 'D' )
210
224
years = days .year
211
225
years = years .unique ()
@@ -241,8 +255,12 @@ def __download_gtsm_from_cds(self,start_time, end_time, variable, folder='cache
241
255
242
256
}
243
257
print ('Download variable:' ,var , year )
244
- c .retrieve ('sis-water-level-change-timeseries-cmip6' , cds_command , filename )
245
- filename_list .append (filename )
258
+ if use_cache and os .path .exists (filename ):
259
+ print (f'Reusing cached file { filename } ' )
260
+ else :
261
+ print (f'Download variable { var } for year { year } ' )
262
+ c .retrieve ('sis-water-level-change-timeseries-cmip6' , cds_command , filename )
263
+ filename_list .append (filename )
246
264
return filename_list
247
265
248
266
def download_temporary_files (self , ts : TimeSeries , use_cache : bool = False ) -> Tuple [List [str ], float , float ]:
0 commit comments