@@ -43,7 +43,7 @@ def __init__(self, *args: t.Any, **kwargs: t.Any):
43
43
except Exception as e :
44
44
raise RuntimeError (f"Failed to set database context to '{ self .database } '. Reason: { e } " )
45
45
46
- def _get_schema_name (self , name : t .Union [str , exp .Table , exp . Identifier ]) -> t .Optional [str ]:
46
+ def _get_schema_name (self , name : t .Union [str , exp .Table ]) -> t .Optional [str ]:
47
47
"""
48
48
Safely extracts the schema name from a table or schema name, which can be
49
49
a string or a sqlglot expression.
@@ -112,14 +112,31 @@ def _get_data_objects(
112
112
catalog = catalog ,
113
113
schema = row .schema_name ,
114
114
name = row .name ,
115
- type = DataObjectType .from_str (row .type ),
115
+ type = DataObjectType .from_str (str ( row .type ) ),
116
116
)
117
117
for row in dataframe .itertuples ()
118
118
]
119
119
120
+ def schema_exists (self , schema_name : SchemaName ) -> bool :
121
+ """
122
+ Checks if a schema exists.
123
+ """
124
+ schema = exp .to_table (schema_name ).db
125
+ if not schema :
126
+ return False
127
+
128
+ sql = (
129
+ exp .select ("1" )
130
+ .from_ ("INFORMATION_SCHEMA.SCHEMATA" )
131
+ .where (f"SCHEMA_NAME = '{ schema } '" )
132
+ .where (f"CATALOG_NAME = '{ self .database } '" )
133
+ )
134
+ result = self .fetchone (sql , quote_identifiers = True )
135
+ return result [0 ] == 1 if result else False
136
+
120
137
def create_schema (
121
138
self ,
122
- schema_name : SchemaName ,
139
+ schema_name : t . Optional [ SchemaName ] ,
123
140
ignore_if_exists : bool = True ,
124
141
warn_on_error : bool = True ,
125
142
** kwargs : t .Any ,
@@ -128,53 +145,51 @@ def create_schema(
128
145
Creates a schema in a Microsoft Fabric Warehouse.
129
146
130
147
Overridden to handle Fabric's specific T-SQL requirements.
131
- T-SQL's `CREATE SCHEMA` command does not support `IF NOT EXISTS` directly
132
- as part of the statement in all contexts, and error messages suggest
133
- issues with batching or preceding statements like USE.
134
148
"""
135
- if schema_name is None :
149
+ if not schema_name :
136
150
return
137
151
138
- schema_name_str = (
139
- schema_name .name if isinstance (schema_name , exp .Identifier ) else str (schema_name )
140
- )
141
-
142
- if not schema_name_str :
143
- logger .warning ("Attempted to create a schema with an empty name. Skipping." )
144
- return
145
-
146
- schema_name_str = schema_name_str .strip ('[]"' ).rstrip ("." )
152
+ schema_exp = to_schema (schema_name )
153
+ simple_schema_name_str = exp .to_identifier (schema_exp .db ).name if schema_exp .db else None
147
154
148
- if not schema_name_str :
155
+ if not simple_schema_name_str :
149
156
logger .warning (
150
- "Attempted to create a schema with an empty name after sanitization . Skipping."
157
+ f"Could not determine simple schema name from ' { schema_name } ' . Skipping schema creation ."
151
158
)
152
159
return
153
160
154
161
try :
155
- if self .schema_exists (schema_name_str ):
162
+ if self .schema_exists (simple_schema_name_str ):
156
163
if ignore_if_exists :
157
164
return
158
- raise RuntimeError (f"Schema '{ schema_name_str } ' already exists." )
165
+ raise RuntimeError (f"Schema '{ simple_schema_name_str } ' already exists." )
159
166
except Exception as e :
160
167
if warn_on_error :
161
- logger .warning (f"Failed to check for existence of schema '{ schema_name_str } ': { e } " )
168
+ logger .warning (
169
+ f"Failed to check for existence of schema '{ simple_schema_name_str } ': { e } "
170
+ )
162
171
else :
163
172
raise
164
173
165
174
try :
166
- create_sql = f"CREATE SCHEMA [{ schema_name_str } ]"
175
+ create_sql = f"CREATE SCHEMA [{ simple_schema_name_str } ]"
167
176
self .execute (create_sql )
168
177
except Exception as e :
169
- if "already exists" in str (e ).lower () or "There is already an object named" in str (e ):
178
+ error_message = str (e ).lower ()
179
+ if (
180
+ "already exists" in error_message
181
+ or "there is already an object named" in error_message
182
+ ):
170
183
if ignore_if_exists :
171
184
return
172
- raise RuntimeError (f"Schema '{ schema_name_str } ' already exists." ) from e
185
+ raise RuntimeError (
186
+ f"Schema '{ simple_schema_name_str } ' already exists due to race condition."
187
+ ) from e
173
188
else :
174
189
if warn_on_error :
175
- logger .warning (f"Failed to create schema { schema_name_str } . Reason: { e } " )
190
+ logger .warning (f"Failed to create schema { simple_schema_name_str } . Reason: { e } " )
176
191
else :
177
- raise RuntimeError (f"Failed to create schema { schema_name_str } ." ) from e
192
+ raise RuntimeError (f"Failed to create schema { simple_schema_name_str } ." ) from e
178
193
179
194
def _create_table_from_columns (
180
195
self ,
@@ -251,7 +266,7 @@ def _fully_qualify(self, name: t.Union[TableName, SchemaName]) -> exp.Table:
251
266
and isinstance (table .this , exp .Identifier )
252
267
and (table .this .name .startswith ("#" ))
253
268
):
254
- temp_identifier = exp .Identifier (this = table .this .this , quoted = True )
269
+ temp_identifier = exp .Identifier (this = table .this .name , quoted = True )
255
270
return exp .Table (this = temp_identifier )
256
271
257
272
schema = self ._get_schema_name (name )
@@ -308,6 +323,8 @@ def create_view(
308
323
def columns (
309
324
self , table_name : TableName , include_pseudo_columns : bool = False
310
325
) -> t .Dict [str , exp .DataType ]:
326
+ import numpy as np
327
+
311
328
table = exp .to_table (table_name )
312
329
schema = self ._get_schema_name (table_name )
313
330
@@ -346,6 +363,7 @@ def columns(
346
363
)
347
364
348
365
df = self .fetchdf (sql )
366
+ df = df .replace ({np .nan : None })
349
367
350
368
def build_var_length_col (
351
369
column_name : str ,
@@ -356,11 +374,9 @@ def build_var_length_col(
356
374
) -> t .Tuple [str , str ]:
357
375
data_type = data_type .lower ()
358
376
359
- char_len_int = (
360
- int (character_maximum_length ) if character_maximum_length is not None else None
361
- )
362
- prec_int = int (numeric_precision ) if numeric_precision is not None else None
363
- scale_int = int (numeric_scale ) if numeric_scale is not None else None
377
+ char_len_int = character_maximum_length
378
+ prec_int = numeric_precision
379
+ scale_int = numeric_scale
364
380
365
381
if data_type in self .VARIABLE_LENGTH_DATA_TYPES and char_len_int is not None :
366
382
if char_len_int > 0 :
@@ -378,79 +394,31 @@ def build_var_length_col(
378
394
379
395
return (column_name , data_type )
380
396
381
- columns_raw = [
382
- (
383
- row .COLUMN_NAME ,
384
- row .DATA_TYPE ,
385
- getattr (row , "CHARACTER_MAXIMUM_LENGTH" , None ),
386
- getattr (row , "NUMERIC_PRECISION" , None ),
387
- getattr (row , "NUMERIC_SCALE" , None ),
397
+ def _to_optional_int (val : t .Any ) -> t .Optional [int ]:
398
+ """Safely convert DataFrame values to Optional[int] for mypy."""
399
+ if val is None :
400
+ return None
401
+ try :
402
+ return int (val )
403
+ except (ValueError , TypeError ):
404
+ return None
405
+
406
+ columns_processed = [
407
+ build_var_length_col (
408
+ str (row .COLUMN_NAME ),
409
+ str (row .DATA_TYPE ),
410
+ _to_optional_int (row .CHARACTER_MAXIMUM_LENGTH ),
411
+ _to_optional_int (row .NUMERIC_PRECISION ),
412
+ _to_optional_int (row .NUMERIC_SCALE ),
388
413
)
389
414
for row in df .itertuples ()
390
415
]
391
416
392
- columns_processed = [build_var_length_col (* row ) for row in columns_raw ]
393
-
394
417
return {
395
418
column_name : exp .DataType .build (data_type , dialect = self .dialect )
396
419
for column_name , data_type in columns_processed
397
420
}
398
421
399
- def create_schema (
400
- self ,
401
- schema_name : SchemaName ,
402
- ignore_if_exists : bool = True ,
403
- warn_on_error : bool = True ,
404
- ** kwargs : t .Any ,
405
- ) -> None :
406
- if schema_name is None :
407
- return
408
-
409
- schema_exp = to_schema (schema_name )
410
- simple_schema_name_str = None
411
- if schema_exp .db :
412
- simple_schema_name_str = exp .to_identifier (schema_exp .db ).name
413
-
414
- if not simple_schema_name_str :
415
- logger .warning (
416
- f"Could not determine simple schema name from '{ schema_name } '. Skipping schema creation."
417
- )
418
- return
419
-
420
- if ignore_if_exists :
421
- try :
422
- if self .schema_exists (simple_schema_name_str ):
423
- return
424
- except Exception as e :
425
- if warn_on_error :
426
- logger .warning (
427
- f"Failed to check for existence of schema '{ simple_schema_name_str } ': { e } "
428
- )
429
- else :
430
- raise
431
- elif self .schema_exists (simple_schema_name_str ):
432
- raise RuntimeError (f"Schema '{ simple_schema_name_str } ' already exists." )
433
-
434
- try :
435
- create_sql = f"CREATE SCHEMA [{ simple_schema_name_str } ]"
436
- self .execute (create_sql )
437
- except Exception as e :
438
- error_message = str (e ).lower ()
439
- if (
440
- "already exists" in error_message
441
- or "there is already an object named" in error_message
442
- ):
443
- if ignore_if_exists :
444
- return
445
- raise RuntimeError (
446
- f"Schema '{ simple_schema_name_str } ' already exists due to race condition."
447
- ) from e
448
- else :
449
- if warn_on_error :
450
- logger .warning (f"Failed to create schema { simple_schema_name_str } . Reason: { e } " )
451
- else :
452
- raise RuntimeError (f"Failed to create schema { simple_schema_name_str } ." ) from e
453
-
454
422
def _insert_overwrite_by_condition (
455
423
self ,
456
424
table_name : TableName ,
0 commit comments