1
1
from adapters .adapter_base import AdapterBase
2
2
from nmdc_schema .migrators .migrator_base import MigratorBase
3
3
from nmdc_schema .migrators .helpers import create_schema_view , logger
4
- from nmdc_schema .migrators .migration_reporter import create_migration_reporter
4
+ from nmdc_schema .migrators .migration_reporter import (
5
+ create_migration_reporter ,
6
+ get_most_specific_class_for_reporting ,
7
+ parse_schema_path ,
8
+ get_clean_schema_path ,
9
+ resolve_class_from_schema_path
10
+ )
5
11
from pymongo .client_session import ClientSession
6
- from typing import Optional , Set , Dict , List
12
+ from typing import Optional , List
7
13
from functools import lru_cache
8
14
import logging
9
15
import copy
@@ -108,7 +114,13 @@ class Migrator(MigratorBase):
108
114
>>> m = Migrator(DictionaryAdapter(database))
109
115
>>> # Initialize required dependencies for standalone testing
110
116
>>> from nmdc_schema.migrators.helpers import create_schema_view
111
- >>> from nmdc_schema.migrators.migration_reporter import create_migration_reporter
117
+ >>> from nmdc_schema.migrators.migration_reporter import (
118
+ create_migration_reporter,
119
+ get_most_specific_class_for_reporting,
120
+ parse_schema_path,
121
+ get_clean_schema_path,
122
+ resolve_class_from_schema_path
123
+ )
112
124
>>> m._schema_view = create_schema_view()
113
125
>>> m._unit_alias_map = m._build_unit_alias_map(m._schema_view)
114
126
>>> m.reporter = create_migration_reporter(m.logger)
@@ -261,7 +273,7 @@ def upgrade(self, commit_changes: bool = False) -> None:
261
273
"""
262
274
Migrates all QuantityValue instances in records to have non-null has_unit values conformant to enumeration PVs.
263
275
264
- All operations are wrapped in a MongoDB transaction for atomicity and rollback capability.
276
+ All operations are wrapped in a MongoDB transaction for rollback capability.
265
277
All actions are logged in a reporter class so that we can see some statistics at the end of the migration.
266
278
267
279
Args:
@@ -473,12 +485,12 @@ def _fix_quantity_value_unit(self, quantity_value: dict, document_root: dict, pa
473
485
# Get root collection class for reporting (the class that has a MongoDB collection)
474
486
root_collection_class = document_root .get ('type' , 'nmdc:Unknown' )
475
487
# Get clean schema path without array indices for reporting
476
- clean_schema_path = self . _get_clean_schema_path (path )
488
+ clean_schema_path = get_clean_schema_path (path )
477
489
478
490
# Check if `has_unit` is missing or is None
479
491
if 'has_unit' not in quantity_value or quantity_value ['has_unit' ] is None :
480
492
# Get most specific class for unit lookup (for special cases)
481
- most_specific_class = self ._get_most_specific_class_for_reporting ( document_root , path )
493
+ most_specific_class = get_most_specific_class_for_reporting ( self ._schema_view , document_root , path )
482
494
483
495
# Check for special cases where we can extract unit from raw_value
484
496
unit = self ._handle_one_off_unit_cases (quantity_value , most_specific_class , path , None )
@@ -504,7 +516,7 @@ def _fix_quantity_value_unit(self, quantity_value: dict, document_root: dict, pa
504
516
# has_unit exists, check if it needs normalization
505
517
current_unit = quantity_value ['has_unit' ]
506
518
# Get most specific class for unit lookup (for special cases)
507
- most_specific_class = self ._get_most_specific_class_for_reporting ( document_root , path )
519
+ most_specific_class = get_most_specific_class_for_reporting ( self ._schema_view , document_root , path )
508
520
509
521
# Check if current unit is an alias that should be normalized
510
522
if current_unit in self ._unit_alias_map :
@@ -560,7 +572,7 @@ def _infer_unit_from_context(self, full_document: dict, path: str) -> Optional[s
560
572
str or None: The inferred unit, or None if not found
561
573
"""
562
574
# Parse path into components, filtering out array indices
563
- path_parts = self . _parse_schema_path (path )
575
+ path_parts = parse_schema_path (path )
564
576
if not path_parts :
565
577
return None
566
578
@@ -572,120 +584,13 @@ def _infer_unit_from_context(self, full_document: dict, path: str) -> Optional[s
572
584
root_class = doc_type .replace ('nmdc:' , '' ) if doc_type .startswith ('nmdc:' ) else doc_type
573
585
574
586
# Use schema to resolve the class context for this field
575
- target_class = self ._resolve_class_from_schema_path ( root_class , slot_path )
587
+ target_class = resolve_class_from_schema_path ( self ._schema_view , root_class , slot_path )
576
588
if target_class :
577
589
return self ._get_unit_for_class_slot (f"nmdc:{ target_class } " , field_name , None )
578
590
579
591
# Fallback to document type if schema resolution fails
580
592
return self ._get_unit_for_class_slot (doc_type , field_name , None )
581
593
582
- def _parse_schema_path (self , path : str ) -> List [str ]:
583
- r"""
584
- Parses a document path into schema-relevant components, filtering out array indices.
585
-
586
- Args:
587
- path: Path like "substances_used[0].volume" or "extraction.input_mass"
588
-
589
- Returns:
590
- List of schema slot names: ["substances_used", "volume"] or ["extraction", "input_mass"]
591
- """
592
- if not path :
593
- return []
594
-
595
- parts = []
596
- for part in path .split ('.' ):
597
- if '[' in part :
598
- # Extract slot name, ignore array index
599
- slot_name = part .split ('[' )[0 ]
600
- if slot_name : # Only add non-empty slot names
601
- parts .append (slot_name )
602
- else :
603
- parts .append (part )
604
-
605
- return parts
606
-
607
- def _get_clean_schema_path (self , path : str ) -> str :
608
- """
609
- Converts a document path with array indices to a clean schema path for reporting.
610
-
611
- Args:
612
- path: Path like "substances_used[0].volume" or "extraction.input_mass"
613
-
614
- Returns:
615
- Clean schema path: "substances_used.volume" or "extraction.input_mass"
616
- """
617
- if not path :
618
- return "root"
619
-
620
- # Parse and rejoin without array indices
621
- schema_parts = self ._parse_schema_path (path )
622
- return '.' .join (schema_parts ) if schema_parts else "root"
623
-
624
- def _resolve_class_from_schema_path (self , root_class : str , slot_path : List [str ]) -> Optional [str ]:
625
- r"""
626
- Uses schema definitions to resolve the target class for a nested slot path.
627
-
628
- Args:
629
- root_class: Starting class name (without nmdc: prefix)
630
- slot_path: List of slot names leading to the target field
631
-
632
- Returns:
633
- str or None: The resolved class name (without nmdc: prefix), or None if not found
634
- """
635
- if not slot_path :
636
- return root_class
637
-
638
- current_class = root_class
639
-
640
- try :
641
- for slot_name in slot_path :
642
- # Get the slot definition for this class
643
- slot_def = self ._schema_view .induced_slot (slot_name , current_class )
644
- if not slot_def or not slot_def .range :
645
- return None
646
-
647
- # Move to the range class
648
- current_class = slot_def .range
649
-
650
- return current_class
651
-
652
- except Exception :
653
- # If schema traversal fails, return None
654
- return None
655
-
656
- def _get_most_specific_class_for_reporting (self , document_root : dict , path : str ) -> str :
657
- """
658
- Determines the most specific class type for reporting purposes.
659
- For nested objects, uses schema resolution to find the immediate parent class.
660
-
661
- Args:
662
- document_root: The root document for fallback context
663
- path: Path to the QuantityValue in the document
664
-
665
- Returns:
666
- str: The most specific class URI (e.g., "nmdc:PortionOfSubstance")
667
- """
668
- # Parse path to get components leading to the QuantityValue
669
- path_parts = self ._parse_schema_path (path )
670
- if not path_parts :
671
- return document_root .get ('type' , 'unknown' )
672
-
673
- # Remove the final field name to get the path to the containing object
674
- container_path = path_parts [:- 1 ] if len (path_parts ) > 1 else []
675
-
676
- # Start with document's root class
677
- doc_type = document_root .get ('type' , 'nmdc:Unknown' )
678
- root_class = doc_type .replace ('nmdc:' , '' ) if doc_type .startswith ('nmdc:' ) else doc_type
679
-
680
- # Use schema to resolve the class context for the container
681
- if container_path :
682
- target_class = self ._resolve_class_from_schema_path (root_class , container_path )
683
- if target_class :
684
- return f"nmdc:{ target_class } "
685
-
686
- # Fallback to document type
687
- return doc_type
688
-
689
594
def _add_unit_to_quantity_value (self , quantity_value : dict , class_uri : str , slot_name : str , full_document : dict = None ) -> None :
690
595
r"""
691
596
Adds an appropriate unit to a QuantityValue instance if it doesn't have one,
0 commit comments