-
Notifications
You must be signed in to change notification settings - Fork 35
schemaview.py - misc minor edits #463
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,7 @@ | |
import uuid | ||
import warnings | ||
from collections import defaultdict, deque | ||
from collections.abc import Mapping | ||
from collections.abc import Callable, Mapping | ||
from copy import copy, deepcopy | ||
from dataclasses import dataclass | ||
from enum import Enum | ||
|
@@ -92,7 +92,13 @@ class OrderedBy(Enum): | |
""" | ||
|
||
|
||
def _closure(f, x, reflexive: bool = True, depth_first: bool = True, **kwargs: dict[str, Any] | None) -> list: # noqa: ARG001 | ||
def _closure( | ||
f: Callable, | ||
x, | ||
reflexive: bool = True, | ||
depth_first: bool = True, | ||
**kwargs: dict[str, Any] | None, # noqa: ARG001 | ||
) -> list[str | ElementName | ClassDefinitionName | EnumDefinitionName | SlotDefinitionName | TypeDefinitionName]: | ||
rv = [x] if reflexive else [] | ||
visited = [] | ||
todo = [x] | ||
|
@@ -686,7 +692,7 @@ def in_schema(self, element_name: ElementName) -> SchemaDefinitionName: | |
@lru_cache(None) | ||
def element_by_schema_map(self) -> dict[ElementName, SchemaDefinitionName]: | ||
ix = {} | ||
schemas = self.all_schema(True) | ||
schemas = self.all_schema(imports=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just for clarity |
||
for schema in schemas: | ||
for type_key in SCHEMA_ELEMENTS: | ||
for k in getattr(schema, type_key, {}): | ||
|
@@ -697,7 +703,7 @@ def element_by_schema_map(self) -> dict[ElementName, SchemaDefinitionName]: | |
return ix | ||
|
||
@lru_cache(None) | ||
def get_class(self, class_name: CLASS_NAME, imports: bool = True, strict: bool = False) -> ClassDefinition: | ||
def get_class(self, class_name: CLASS_NAME, imports: bool = True, strict: bool = False) -> ClassDefinition | None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These methods can all return None |
||
"""Retrieve a class from the schema. | ||
|
||
:param class_name: name of the class to be retrieved | ||
|
@@ -713,7 +719,7 @@ def get_class(self, class_name: CLASS_NAME, imports: bool = True, strict: bool = | |
@lru_cache(None) | ||
def get_slot( | ||
self, slot_name: SLOT_NAME, imports: bool = True, attributes: bool = True, strict: bool = False | ||
) -> SlotDefinition: | ||
) -> SlotDefinition | None: | ||
"""Retrieve a slot from the schema. | ||
|
||
:param slot_name: name of the slot to be retrieved | ||
|
@@ -738,7 +744,9 @@ def get_slot( | |
return slot | ||
|
||
@lru_cache(None) | ||
def get_subset(self, subset_name: SUBSET_NAME, imports: bool = True, strict: bool = False) -> SubsetDefinition: | ||
def get_subset( | ||
self, subset_name: SUBSET_NAME, imports: bool = True, strict: bool = False | ||
) -> SubsetDefinition | None: | ||
"""Retrieve a subset from the schema. | ||
|
||
:param subset_name: name of the subsey to be retrieved | ||
|
@@ -752,7 +760,7 @@ def get_subset(self, subset_name: SUBSET_NAME, imports: bool = True, strict: boo | |
return s | ||
|
||
@lru_cache(None) | ||
def get_enum(self, enum_name: ENUM_NAME, imports: bool = True, strict: bool = False) -> EnumDefinition: | ||
def get_enum(self, enum_name: ENUM_NAME, imports: bool = True, strict: bool = False) -> EnumDefinition | None: | ||
"""Retrieve an enum from the schema. | ||
|
||
:param enum_name: name of the enum to be retrieved | ||
|
@@ -766,7 +774,7 @@ def get_enum(self, enum_name: ENUM_NAME, imports: bool = True, strict: bool = Fa | |
return e | ||
|
||
@lru_cache(None) | ||
def get_type(self, type_name: TYPE_NAME, imports: bool = True, strict: bool = False) -> TypeDefinition: | ||
def get_type(self, type_name: TYPE_NAME, imports: bool = True, strict: bool = False) -> TypeDefinition | None: | ||
"""Retrieve a type from the schema. | ||
|
||
:param type_name: name of the type to be retrieved | ||
|
@@ -812,7 +820,7 @@ def enum_parents( | |
@lru_cache(None) | ||
def permissible_value_parent( | ||
self, permissible_value: str, enum_name: ENUM_NAME | ||
) -> str | PermissibleValueText | None | ValueError: | ||
) -> list[str | PermissibleValueText] | None: | ||
""":param enum_name: child enum name | ||
:param permissible_value: permissible value | ||
:return: all direct parent enum names (is_a) | ||
|
@@ -829,7 +837,7 @@ def permissible_value_parent( | |
@lru_cache(None) | ||
def permissible_value_children( | ||
self, permissible_value: str, enum_name: ENUM_NAME | ||
) -> str | PermissibleValueText | None | ValueError: | ||
) -> list[str | PermissibleValueText] | None: | ||
""":param enum_name: parent enum name | ||
:param permissible_value: permissible value | ||
:return: all direct child permissible values (is_a) | ||
|
@@ -1019,7 +1027,7 @@ def enum_ancestors( | |
|
||
@lru_cache(None) | ||
def type_ancestors( | ||
self, type_name: TYPES, imports: bool = True, reflexive: bool = True, depth_first: bool = True | ||
self, type_name: TYPE_NAME, imports: bool = True, reflexive: bool = True, depth_first: bool = True | ||
) -> list[TypeDefinitionName]: | ||
"""Return all ancestors of a type via typeof. | ||
|
||
|
@@ -1172,7 +1180,7 @@ def slot_is_true_for_metadata_property(self, slot_name: SlotDefinition, metadata | |
msg = 'property to introspect must be of type "boolean"' | ||
raise ValueError(msg) | ||
|
||
def get_element(self, element: ElementName | Element, imports: bool = True) -> Element: | ||
def get_element(self, element: ElementName | Element | str, imports: bool = True) -> Element | None: | ||
"""Fetch an element by name. | ||
|
||
:param element: query element | ||
|
@@ -1181,16 +1189,14 @@ def get_element(self, element: ElementName | Element, imports: bool = True) -> E | |
""" | ||
if isinstance(element, Element): | ||
return element | ||
e = self.get_class(element, imports=imports) | ||
if e is None: | ||
e = self.get_slot(element, imports=imports) | ||
if e is None: | ||
e = self.get_type(element, imports=imports) | ||
if e is None: | ||
e = self.get_enum(element, imports=imports) | ||
if e is None: | ||
e = self.get_subset(element, imports=imports) | ||
return e | ||
|
||
return ( | ||
self.get_class(element, imports=imports) | ||
or self.get_slot(element, imports=imports) | ||
or self.get_type(element, imports=imports) | ||
or self.get_enum(element, imports=imports) | ||
or self.get_subset(element, imports=imports) | ||
) | ||
|
||
def get_uri( | ||
self, | ||
|
@@ -1700,22 +1706,20 @@ def is_inlined(self, slot: SlotDefinition, imports: bool = True) -> bool: | |
return True | ||
|
||
id_slot = self.get_identifier_slot(slot_range, imports=imports) | ||
if id_slot is None: | ||
# must be inlined as has no identifier | ||
return True | ||
# not explicitly declared inline and has an identifier: assume is ref, not inlined | ||
return False | ||
# if slot_id is None, it must be inlined as has no identifier | ||
# if slot_id is defined, it not explicitly declared inline and has an identifier: assume is ref, not inlined | ||
return id_slot is None | ||
return False | ||
|
||
def slot_applicable_range_elements(self, slot: SlotDefinition) -> list[ClassDefinitionName]: | ||
"""Retrieve all applicable metamodel elements for a slot range. | ||
|
||
(metamodel class names returned: class_definition, enum_definition, type_definition) | ||
|
||
Typically any given slot has exactly one range, and one metamodel element type, | ||
but a proposed feature in LinkML 1.2 is range expressions, where ranges can be defined as unions | ||
Typically any given slot has exactly one range, and one metamodel element type. | ||
Starting at LinkML 1.2, ranges can be defined as unions of multiple types. | ||
Comment on lines
+1720
to
+1721
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. linkml 1.2 is in the far distant past now! |
||
|
||
Additionally, if linkml:Any is a class_uri then this maps to the any element | ||
Additionally, if linkml:Any is a class_uri then this maps to the "Any" element | ||
|
||
:param slot: | ||
:return: list of element types | ||
|
@@ -1740,8 +1744,8 @@ def slot_applicable_range_elements(self, slot: SlotDefinition) -> list[ClassDefi | |
def slot_range_as_union(self, slot: SlotDefinition) -> list[ElementName]: | ||
"""Retrieve all applicable ranges for a slot. | ||
|
||
Typically, any given slot has exactly one range, and one metamodel element type, | ||
but a proposed feature in LinkML 1.2 is range expressions, where ranges can be defined as unions | ||
Typically, any given slot has exactly one range, and one metamodel element type. | ||
LinkML 1.2 allows the use of range expressions, where ranges can be defined as unions. | ||
|
||
:param slot: | ||
:return: list of ranges | ||
|
@@ -1832,24 +1836,28 @@ def usage_index(self) -> dict[ElementName, list[SchemaUsage]]: | |
|
||
:return: dictionary of SchemaUsages keyed by used elements | ||
""" | ||
roles = ["domain", "range", "any_of", "exactly_one_of", "none_of", "all_of"] | ||
attrs = ["domain", "range", "any_of", "exactly_one_of", "none_of", "all_of"] | ||
ix = defaultdict(list) | ||
for cn, c in self.all_classes().items(): | ||
direct_slots = c.slots | ||
for sn in self.class_slots(cn): | ||
s = self.induced_slot(sn, cn) | ||
for k in roles: | ||
v = getattr(s, k) | ||
vl = v if isinstance(v, list) else [v] | ||
for x in vl: | ||
if x is not None: | ||
if isinstance(x, AnonymousSlotExpression): | ||
x = x.range | ||
k = f"{k}[range]" | ||
k = k.split("[")[0] + "[range]" if "[range]" in k else k | ||
u = SchemaUsage(used_by=cn, slot=sn, metaslot=k, used=x) | ||
u.inferred = sn in direct_slots | ||
ix[x].append(u) | ||
slot = self.induced_slot(sn, cn) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renamed these variables whilst I was trying to work out how this method worked. Created a couple of new variables to prevent linter complaints about editing an iterator during looping. |
||
for attr in attrs: | ||
attr_value = getattr(slot, attr) | ||
value_list = attr_value if isinstance(attr_value, list) else [attr_value] | ||
for a_value in value_list: | ||
if a_value is not None: | ||
attr_name = attr | ||
real_a_value = a_value | ||
Comment on lines
+1851
to
+1852
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. create new variables for these values rather than editing the loop iterator |
||
if isinstance(a_value, AnonymousSlotExpression): | ||
real_a_value = a_value.range | ||
attr_name = f"{attr_name}[range]" | ||
if "[range]" in attr_name: | ||
attr_name = attr_name.split("[")[0] + "[range]" | ||
attr_name = attr_name.split("[")[0] + "[range]" if "[range]" in attr_name else attr_name | ||
usage = SchemaUsage(used_by=cn, slot=sn, metaslot=attr_name, used=real_a_value) | ||
usage.inferred = sn in direct_slots | ||
ix[real_a_value].append(usage) | ||
return ix | ||
|
||
# MUTATION OPERATIONS | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -901,6 +901,9 @@ def test_creature_schema_entities_with_without_imports( | |
for entity_name in CREATURE_EXPECTED[entity][src]: | ||
e = getattr(creature_view, f"get_{entity}")(entity_name, imports=True) | ||
assert e.from_schema == f"{CREATURE_SCHEMA_BASE_URL}/{src}" | ||
# N.b. this may fail in schemas where there are different types | ||
# of element with the same name | ||
assert e == creature_view.get_element(entity_name) | ||
Comment on lines
+904
to
+906
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add in tests of the |
||
|
||
|
||
@pytest.mark.parametrize("entity", CREATURE_EXPECTED.keys()) | ||
|
@@ -914,12 +917,15 @@ def test_get_entities_with_without_imports(creature_view: SchemaView, entity: st | |
# if the source is the main schema, we can use the method directly | ||
e = getattr(creature_view, get_fn)(entity_name, imports=False) | ||
assert e.name == entity_name | ||
assert e == creature_view.get_element(entity_name, imports=False) | ||
# N.b. BUG: due to caching and how the `from_schema` element is generated, | ||
# we cannot know whether it will be populated. | ||
# assert e.from_schema is None | ||
else: | ||
# if the source is an imported schema, we expect None without imports | ||
assert getattr(creature_view, get_fn)(entity_name, imports=False) is None | ||
assert creature_view.get_element(entity_name, imports=False) is None | ||
|
||
if entity != "element": | ||
# in strict mode, we expect an error if the entity does not exist | ||
with pytest.raises(ValueError, match=f'No such {entity}: "{entity_name}"'): | ||
|
@@ -928,6 +934,7 @@ def test_get_entities_with_without_imports(creature_view: SchemaView, entity: st | |
# turn on imports | ||
e = getattr(creature_view, f"get_{entity}")(entity_name, imports=True) | ||
assert e.from_schema == f"{CREATURE_SCHEMA_BASE_URL}/{src}" | ||
assert e == creature_view.get_element(entity_name, imports=True) | ||
|
||
|
||
@pytest.mark.parametrize("entity", argvalues=[e for e in CREATURE_EXPECTED if e != "element"]) | ||
|
@@ -937,6 +944,7 @@ def test_get_entity_does_not_exist(creature_view: SchemaView, entity: str) -> No | |
|
||
# returns None unless the `strict` flag is passed | ||
assert getattr(creature_view, get_fn)("does_not_exist") is None | ||
assert creature_view.get_element("does_not_exist") is None | ||
|
||
# raises an error with `strict` flag on | ||
with pytest.raises(ValueError, match=f'No such {entity}: "does_not_exist"'): | ||
|
@@ -1374,14 +1382,18 @@ def sv_range_riid_gen(request: pytest.FixtureRequest) -> tuple[SchemaView, tuple | |
{"RangeEnum", "RangeClass", "string"}, | ||
{CD, ED, TD}, | ||
], | ||
# invalid - can't have both any_of and exactly_one_of | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. some extra docs about these combinations |
||
"any_of_and_exactly_one_of_range": [ | ||
"AnyOldRange", | ||
{"AnyOldRange", "RangeEnum", "RangeClass", "string", "range_string"}, | ||
{"RangeEnum", "RangeClass", "string", "range_string"}, | ||
{CD, ED, TD}, | ||
], | ||
# invalid: linkml:Any not specified | ||
"invalid_any_range_no_linkml_any": [None, {"string", "range_string"}, set(), {TD}], | ||
# invalid: RangeEnum instead of linkml:Any | ||
"invalid_any_range_enum": ["RangeEnum", {"RangeEnum", "string", "range_string"}, {"RangeEnum"}, {ED, TD}], | ||
# invalid: RangeClass instead of linkml:Any | ||
"invalid_any_range_class": ["RangeClass", {"RangeClass", "string", "range_string"}, {"RangeClass"}, {CD, TD}], | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding this type annotation reformatted the whole line