2
2
import re
3
3
import warnings
4
4
from collections import Counter
5
- from collections .abc import Mapping , Sequence
5
+ from collections .abc import Mapping
6
6
from datetime import datetime
7
7
from pathlib import Path
8
- from typing import Any , Callable , TextIO
8
+ from typing import Any , Callable , Sequence , TextIO
9
9
10
10
import yaml
11
11
@@ -88,23 +88,24 @@ class PfsDocument(PfsSection):
88
88
89
89
def __init__ (
90
90
self ,
91
- data : TextIO | PfsSection | Mapping [str | PfsSection , Any ] | str | Path ,
91
+ data : TextIO
92
+ | Mapping [str | PfsSection , Any ]
93
+ | Sequence [PfsSection ]
94
+ | str
95
+ | Path ,
92
96
* ,
93
97
encoding : str = "cp1252" ,
94
- names : Sequence [str ] | None = None ,
95
98
unique_keywords : bool = False ,
96
99
) -> None :
97
100
if isinstance (data , (str , Path )) or hasattr (data , "read" ):
98
- if names is not None :
99
- raise ValueError ("names cannot be given as argument if input is a file" )
100
101
names , sections = self ._read_pfs_file (data , encoding , unique_keywords ) # type: ignore
101
102
else :
102
- names , sections = self ._parse_non_file_input (data , names )
103
+ names , sections = self ._parse_non_file_input (data )
103
104
104
105
d = self ._to_nonunique_key_dict (names , sections )
105
106
super ().__init__ (d )
106
107
107
- self ._ALIAS_LIST = ["_ALIAS_LIST" ] # ignore these in key list
108
+ self ._ALIAS_LIST = set ( ["_ALIAS_LIST" ]) # ignore these in key list
108
109
if self ._is_FM_engine :
109
110
self ._add_all_FM_aliases ()
110
111
@@ -204,52 +205,31 @@ def _read_pfs_file(
204
205
raise FileNotFoundError (str (e ))
205
206
except Exception as e :
206
207
raise ValueError (f"{ filename } could not be parsed. " + str (e ))
207
- sections = [PfsSection (list (d .values ())[0 ]) for d in target_list ] # type: ignore
208
- names = [list (d .keys ())[0 ] for d in target_list ] # type: ignore
209
- return names , sections
208
+ return PfsDocument ._extract_names_from_list (target_list ) # type: ignore
209
+
210
+ @staticmethod
211
+ def _extract_names_from_list (
212
+ targets : Sequence [PfsSection ],
213
+ ) -> tuple [list [str ], list [PfsSection ]]:
214
+ names , sections = zip (
215
+ * ((k , PfsSection (v )) for target in targets for k , v in target .items ())
216
+ )
217
+ return list (names ), list (sections )
210
218
211
219
@staticmethod
212
220
def _parse_non_file_input (
213
- input : (
214
- Mapping [str | PfsSection , Any ]
215
- | PfsSection
216
- | Sequence [PfsSection ]
217
- | Sequence [dict ]
218
- ),
219
- names : Sequence [str ] | None = None ,
220
- ) -> tuple [Sequence [str ], list [PfsSection ]]:
221
- """dict/PfsSection or lists of these can be parsed."""
222
- if names is None :
223
- assert isinstance (input , Mapping ), "input must be a mapping"
224
- names , sections = PfsDocument ._unravel_items (input .items )
225
- for sec in sections :
226
- assert isinstance (
227
- sec , Mapping
228
- ), "all targets must be PfsSections/dict (no key-value pairs allowed in the root)"
229
- return names , sections
230
-
231
- if isinstance (names , str ):
232
- names = [names ]
233
-
234
- if isinstance (input , PfsSection ):
235
- sections = [input ]
236
- elif isinstance (input , dict ):
237
- sections = [PfsSection (input )]
238
- elif isinstance (input , Sequence ):
239
- if isinstance (input [0 ], PfsSection ):
240
- sections = input # type: ignore
241
- elif isinstance (input [0 ], dict ):
242
- sections = [PfsSection (d ) for d in input ]
243
- else :
244
- raise ValueError ("List input must contain either dict or PfsSection" )
245
- else :
246
- raise ValueError (
247
- f"Input of type ({ type (input )} ) could not be parsed (pfs file, dict, PfsSection, lists of dict or PfsSection)"
248
- )
249
- if len (names ) != len (sections ):
250
- raise ValueError (
251
- f"Length of names ({ len (names )} ) does not match length of target sections ({ len (sections )} )"
252
- )
221
+ input : Mapping [str | PfsSection , Any ] | Sequence [PfsSection ],
222
+ ) -> tuple [list [str ], list [PfsSection ]]:
223
+ if isinstance (input , Sequence ):
224
+ return PfsDocument ._extract_names_from_list (input )
225
+
226
+ assert isinstance (input , Mapping ), "input must be a mapping"
227
+ names , sections = PfsDocument ._unravel_items (input .items )
228
+ for sec in sections :
229
+ if not isinstance (sec , Mapping ):
230
+ raise ValueError (
231
+ "all targets must be PfsSections/dict (no key-value pairs allowed in the root)"
232
+ )
253
233
return names , sections
254
234
255
235
@property
@@ -258,36 +238,38 @@ def _is_FM_engine(self) -> bool:
258
238
259
239
def _add_all_FM_aliases (self ) -> None :
260
240
"""create MIKE FM module aliases."""
261
- self ._add_FM_alias ("HD" , "HYDRODYNAMIC_MODULE" )
262
- self ._add_FM_alias ("SW" , "SPECTRAL_WAVE_MODULE" )
263
- self ._add_FM_alias ("TR" , "TRANSPORT_MODULE" )
264
- self ._add_FM_alias ("MT" , "MUD_TRANSPORT_MODULE" )
265
- self ._add_FM_alias ("EL" , "ECOLAB_MODULE" )
266
- self ._add_FM_alias ("ST" , "SAND_TRANSPORT_MODULE" )
267
- self ._add_FM_alias ("PT" , "PARTICLE_TRACKING_MODULE" )
268
- self ._add_FM_alias ("DA" , "DATA_ASSIMILATION_MODULE" )
241
+ ALIASES = {
242
+ "HD" : "HYDRODYNAMIC_MODULE" ,
243
+ "SW" : "SPECTRAL_WAVE_MODULE" ,
244
+ "TR" : "TRANSPORT_MODULE" ,
245
+ "MT" : "MUD_TRANSPORT_MODULE" ,
246
+ "EL" : "ECOLAB_MODULE" ,
247
+ "ST" : "SAND_TRANSPORT_MODULE" ,
248
+ "PT" : "PARTICLE_TRACKING_MODULE" ,
249
+ "DA" : "DATA_ASSIMILATION_MODULE" ,
250
+ }
251
+ for alias , module in ALIASES .items ():
252
+ self ._add_FM_alias (alias , module )
269
253
270
254
def _add_FM_alias (self , alias : str , module : str ) -> None :
271
255
"""Add short-hand alias for MIKE FM module, e.g. SW, but only if active!"""
272
- if hasattr (self .targets [0 ], module ) and hasattr (
273
- self .targets [0 ], "MODULE_SELECTION"
274
- ):
256
+ target = self .targets [0 ]
257
+ if hasattr (target , module ) and hasattr (target , "MODULE_SELECTION" ):
275
258
mode_name = f"mode_of_{ module .lower ()} "
276
- mode_of = int (self . targets [ 0 ] .MODULE_SELECTION .get (mode_name , 0 ))
259
+ mode_of = int (target .MODULE_SELECTION .get (mode_name , 0 ))
277
260
if mode_of > 0 :
278
- setattr (self , alias , self . targets [ 0 ] [module ])
279
- self ._ALIAS_LIST .append (alias )
261
+ setattr (self , alias , target [module ])
262
+ self ._ALIAS_LIST .add (alias )
280
263
281
264
def _pfs2yaml (
282
265
self , filename : str | Path | TextIO , encoding : str | None = None
283
266
) -> str :
284
267
if hasattr (filename , "read" ): # To read in memory strings StringIO
285
268
pfsstring = filename .read ()
286
269
else :
287
- with open (filename , encoding = encoding ) as f :
288
- pfsstring = f .read ()
270
+ pfsstring = Path (filename ).read_text (encoding = encoding )
289
271
290
- lines = pfsstring .split ( " \n " )
272
+ lines = pfsstring .splitlines ( )
291
273
292
274
output = []
293
275
output .append ("---" )
@@ -303,7 +285,16 @@ def _pfs2yaml(
303
285
def _parse_line (self , line : str , level : int = 0 ) -> tuple [str , int ]:
304
286
section_header = False
305
287
s = line .strip ()
306
- s = re .sub (r"\s*//.*" , "" , s ) # remove comments
288
+ parts = re .split (r'(".*?"|\'.*?\')' , s ) # Preserve quoted strings
289
+ for i , part in enumerate (parts ):
290
+ if not (
291
+ part .startswith ('"' ) or part .startswith ("'" )
292
+ ): # Ignore quoted parts
293
+ part = re .sub (
294
+ r"\s*//.*" , "" , part
295
+ ) # Remove comments only outside quotes
296
+ parts [i ] = part
297
+ s = "" .join (parts ) # Reassemble the line
307
298
308
299
if len (s ) > 0 :
309
300
if s [0 ] == "[" :
@@ -317,7 +308,7 @@ def _parse_line(self, line: str, level: int = 0) -> tuple[str, int]:
317
308
if s [- 1 ] == "]" :
318
309
s = s .replace ("]" , ":" )
319
310
320
- s = s .replace ("//" , "" )
311
+ # s = s.replace("//", "")
321
312
s = s .replace ("\t " , " " )
322
313
323
314
if len (s ) > 0 and s [0 ] != "!" :
@@ -370,21 +361,21 @@ def _parse_token(self, token: str, context: str = "") -> str:
370
361
# Example of complicated string:
371
362
# '<CLOB:22,1,1,false,1,0,"",0,"",0,"",0,"",0,"",0,"",0,"",0,"",||,false>'
372
363
if s .count ("|" ) == 2 and "CLOB" not in context :
373
- parts = s .split ("|" )
374
- if len (parts [ 1 ] ) > 1 and parts [ 1 ] .count ("'" ) > 0 :
364
+ prefix , content , suffix = s .split ("|" )
365
+ if len (content ) > 1 and content .count ("'" ) > 0 :
375
366
# string containing single quotes that needs escaping
376
367
warnings .warn (
377
368
f"The string { s } contains a single quote character which will be temporarily converted to \U0001f600 . If you write back to a pfs file again it will be converted back."
378
369
)
379
- parts [ 1 ] = parts [ 1 ] .replace ("'" , "\U0001f600 " )
380
- s = parts [ 0 ] + "'|" + parts [ 1 ] + "|'" + parts [ 2 ]
370
+ content = content .replace ("'" , "\U0001f600 " )
371
+ s = f" { prefix } '| { content } |' { suffix } "
381
372
382
373
if len (s ) > 2 : # ignore foo = ''
383
374
s = s .replace ("''" , '"' )
384
375
385
376
return s
386
377
387
- def write (self , filename : str ) -> None :
378
+ def write (self , filename : str | Path ) -> None :
388
379
"""Write object to a pfs file.
389
380
390
381
Parameters
@@ -399,19 +390,12 @@ def write(self, filename: str) -> None:
399
390
"""
400
391
from mikeio import __version__ as mikeio_version
401
392
402
- # if filename is None:
403
- # return self._to_txt_lines()
404
-
405
- with open (filename , "w" ) as f :
406
- now = datetime .now ().strftime ("%Y-%m-%d %H:%M:%S" )
407
- f .write (f"// Created : { now } \n " )
408
- f .write (r"// By : MIKE IO" )
409
- f .write ("\n " )
410
- f .write (rf"// Version : { mikeio_version } " )
411
- f .write ("\n \n " )
412
-
413
- self ._write_with_func (f .write , level = 0 )
393
+ now = datetime .now ().strftime ("%Y-%m-%d %H:%M:%S" )
394
+ header = f"""// Created : { now }
395
+ // By : MIKE IO
396
+ // Version : { mikeio_version }
414
397
398
+ """
399
+ txt = header + "\n " .join (self ._to_txt_lines ())
415
400
416
- # TODO remove this alias
417
- Pfs = PfsDocument
401
+ Path (filename ).write_text (txt )
0 commit comments