|
43 | 43 | from tools.aspect_id_tools import extract_aspect_id_name_from_urn_camelcase
|
44 | 44 | from urllib.parse import urljoin
|
45 | 45 |
|
46 |
| - |
47 | 46 | import json
|
| 47 | +import logging |
| 48 | +logger = logging.getLogger(__name__) |
48 | 49 |
|
49 | 50 | class DTRManager:
|
50 | 51 | def __init__(
|
@@ -81,89 +82,174 @@ def get_dtr_url(base_dtr_url: str = '', uri: str = '', api_path: str = '') -> st
|
81 | 82 | full_url = urljoin(base_plus_uri.rstrip('/') + '/', api_path.lstrip('/'))
|
82 | 83 | return full_url
|
83 | 84 |
|
| 85 | + def _reference_from_bpn_list(self, bpn_list, fallback_id=None): |
| 86 | + keys = [] |
| 87 | + if bpn_list: |
| 88 | + keys = [ |
| 89 | + ReferenceKey(type=ReferenceKeyTypes.GLOBAL_REFERENCE, value=bpn) |
| 90 | + for bpn in bpn_list |
| 91 | + ] |
| 92 | + elif fallback_id: |
| 93 | + keys = [ReferenceKey(type=ReferenceKeyTypes.GLOBAL_REFERENCE, value=fallback_id)] |
| 94 | + return Reference( |
| 95 | + type=ReferenceTypes.EXTERNAL_REFERENCE, |
| 96 | + keys=keys, |
| 97 | + ) |
| 98 | + |
| 99 | + def _add_or_update_asset_id(self, name, value, bpn_list, fallback_id=None): |
| 100 | + ref = self._reference_from_bpn_list(bpn_list, fallback_id=fallback_id) |
| 101 | + return SpecificAssetId(name=name, value=value, externalSubjectId=ref) |
| 102 | + def create_or_update_shell_descriptor(self, |
| 103 | + aas_id: UUID, |
| 104 | + global_id: UUID, |
| 105 | + manufacturer_id: str, |
| 106 | + manufacturer_part_id: str, |
| 107 | + customer_part_ids: Dict[str, str] | None, |
| 108 | + part_category: str, |
| 109 | + digital_twin_type: str, |
| 110 | + ) -> ShellDescriptor: |
| 111 | + """ |
| 112 | + Registers or updates a twin in the DTR. |
| 113 | + """ |
| 114 | + try: |
| 115 | + existing_shell = self.aas_service.get_asset_administration_shell_descriptor_by_id(aas_id.urn) |
| 116 | + logger.info(f"Shell with ID {aas_id} already exists and will be updated.") |
| 117 | + specific_asset_ids = existing_shell.specificAssetIds or [] |
| 118 | + existing_keys = {(id.name, id.value) for id in specific_asset_ids} |
| 119 | + except Exception: |
| 120 | + existing_shell = None |
| 121 | + specific_asset_ids = [] |
| 122 | + existing_keys = set() |
| 123 | + |
| 124 | + bpn_list = list(customer_part_ids.values()) if customer_part_ids else [] |
| 125 | + |
| 126 | + if manufacturer_id and ("manufacturerId", manufacturer_id) not in existing_keys: |
| 127 | + specific_asset_ids.append(self._add_or_update_asset_id("manufacturerId", manufacturer_id, bpn_list, fallback_id=manufacturer_id)) |
| 128 | + |
| 129 | + if digital_twin_type and ("digitalTwinType", digital_twin_type) not in existing_keys: |
| 130 | + specific_asset_ids.append(self._add_or_update_asset_id("digitalTwinType", digital_twin_type, bpn_list, fallback_id=manufacturer_id)) |
| 131 | + |
| 132 | + if manufacturer_part_id and ("manufacturerPartId", manufacturer_part_id) not in existing_keys: |
| 133 | + specific_asset_ids.append(self._add_or_update_asset_id("manufacturerPartId", manufacturer_part_id, bpn_list, fallback_id=manufacturer_id)) |
| 134 | + |
| 135 | + if customer_part_ids: |
| 136 | + for customer_part_id, bpn in customer_part_ids.items(): |
| 137 | + if not customer_part_id: |
| 138 | + continue |
| 139 | + key = ("customerPartId", customer_part_id) |
| 140 | + if key not in existing_keys: |
| 141 | + specific_customer_part_asset_id = SpecificAssetId( |
| 142 | + name="customerPartId", |
| 143 | + value=customer_part_id, |
| 144 | + externalSubjectId=self._reference_from_bpn_list([bpn]), |
| 145 | + ) |
| 146 | + specific_asset_ids.append(specific_customer_part_asset_id) |
| 147 | + |
| 148 | + shell = ShellDescriptor( |
| 149 | + id=aas_id.urn, |
| 150 | + globalAssetId=global_id.urn, |
| 151 | + specificAssetIds=specific_asset_ids, |
| 152 | + ) |
| 153 | + |
| 154 | + if existing_shell: |
| 155 | + res = self.aas_service.update_asset_administration_shell_descriptor(shell) |
| 156 | + else: |
| 157 | + res = self.aas_service.create_asset_administration_shell_descriptor(shell) |
| 158 | + |
| 159 | + if isinstance(res, Result): |
| 160 | + raise Exception("Error creating or updating shell descriptor", res.to_json_string()) |
| 161 | + |
| 162 | + return res |
| 163 | + |
| 164 | + |
84 | 165 | def create_shell_descriptor(
|
85 | 166 | self,
|
86 | 167 | aas_id: UUID,
|
87 | 168 | global_id: UUID,
|
88 | 169 | manufacturer_id: str,
|
89 | 170 | manufacturer_part_id: str,
|
90 |
| - customer_part_ids: Dict[str, str], |
| 171 | + customer_part_ids: Dict[str, str] | None, |
91 | 172 | part_category: str,
|
92 | 173 | digital_twin_type: str,
|
93 | 174 | ) -> ShellDescriptor:
|
94 | 175 | """
|
95 | 176 | Registers a twin in the DTR.
|
96 | 177 | """
|
97 |
| - # List with specific asset ids that |
98 |
| - # are required by the Industry Core KIT |
99 | 178 | specific_asset_ids = []
|
100 |
| - # Adds the customerPartId of the customers from which we have |
101 |
| - # this information |
102 |
| - for customer_part_id, bpn in customer_part_ids.items(): |
103 |
| - # This value is optional, so if we don't have it, we skip it |
104 |
| - if not customer_part_id: |
105 |
| - continue |
106 |
| - specific_customer_part_asset_id = SpecificAssetId( |
107 |
| - name="customerPartId", |
108 |
| - value=customer_part_id, |
| 179 | + # Prepare BPN list from customer_part_ids, if present |
| 180 | + bpn_list = list(customer_part_ids.values()) if customer_part_ids else [] |
| 181 | + |
| 182 | + # manufacturerId |
| 183 | + if manufacturer_id: |
| 184 | + ref_keys = ( |
| 185 | + [ReferenceKey(type=ReferenceKeyTypes.GLOBAL_REFERENCE, value=bpn) for bpn in bpn_list] |
| 186 | + if bpn_list else |
| 187 | + [ReferenceKey(type=ReferenceKeyTypes.GLOBAL_REFERENCE, value=manufacturer_id)] |
| 188 | + ) |
| 189 | + specific_manufacturer_asset_id = SpecificAssetId( |
| 190 | + name="manufacturerId", |
| 191 | + value=manufacturer_id, |
109 | 192 | externalSubjectId=Reference(
|
110 | 193 | type=ReferenceTypes.EXTERNAL_REFERENCE,
|
111 |
| - keys=[ |
112 |
| - ReferenceKey( |
113 |
| - type=ReferenceKeyTypes.GLOBAL_REFERENCE, value=bpn |
114 |
| - ), |
115 |
| - ], |
| 194 | + keys=ref_keys, |
116 | 195 | ),
|
117 | 196 | ) # type: ignore
|
118 |
| - specific_asset_ids.append(specific_customer_part_asset_id) |
| 197 | + specific_asset_ids.append(specific_manufacturer_asset_id) |
119 | 198 |
|
120 |
| - # Adds the manufacturerId that we have assigned to the part |
121 |
| - # The visibility of this id is restricted to partners we have |
122 |
| - # a contract with, so we use set their BPN |
123 |
| - specific_manufacturer_asset_id = SpecificAssetId( |
124 |
| - name="manufacturerId", |
125 |
| - value=manufacturer_id, |
126 |
| - externalSubjectId=Reference( |
127 |
| - type=ReferenceTypes.EXTERNAL_REFERENCE, |
128 |
| - keys=[ |
129 |
| - ReferenceKey(type=ReferenceKeyTypes.GLOBAL_REFERENCE, value=bpn) |
130 |
| - for bpn in customer_part_ids.values() |
131 |
| - ], |
132 |
| - ), |
133 |
| - ) # type: ignore |
134 |
| - specific_asset_ids.append(specific_manufacturer_asset_id) |
| 199 | + # digitalTwinType |
| 200 | + if digital_twin_type: |
| 201 | + ref_keys = ( |
| 202 | + [ReferenceKey(type=ReferenceKeyTypes.GLOBAL_REFERENCE, value=bpn) for bpn in bpn_list] |
| 203 | + if bpn_list else |
| 204 | + [ReferenceKey(type=ReferenceKeyTypes.GLOBAL_REFERENCE, value=manufacturer_id)] |
| 205 | + ) |
| 206 | + digital_twin_asset_id = SpecificAssetId( |
| 207 | + name="digitalTwinType", |
| 208 | + value=digital_twin_type, |
| 209 | + externalSubjectId=Reference( |
| 210 | + type=ReferenceTypes.EXTERNAL_REFERENCE, |
| 211 | + keys=ref_keys, |
| 212 | + ), |
| 213 | + ) # type: ignore |
| 214 | + specific_asset_ids.append(digital_twin_asset_id) |
135 | 215 |
|
136 |
| - # Is added to allow data consumers to search for all digital twins of a particular type |
137 |
| - digital_twin_asset_id = SpecificAssetId( |
138 |
| - name="digitalTwinType", |
139 |
| - value=digital_twin_type, |
140 |
| - externalSubjectId=Reference( |
141 |
| - type=ReferenceTypes.EXTERNAL_REFERENCE, |
142 |
| - keys=[ |
143 |
| - ReferenceKey(type=ReferenceKeyTypes.GLOBAL_REFERENCE, value=bpn) |
144 |
| - for bpn in customer_part_ids.values() |
145 |
| - ], |
146 |
| - ), |
147 |
| - ) # type: ignore |
148 |
| - specific_asset_ids.append(digital_twin_asset_id) |
| 216 | + # manufacturerPartId |
| 217 | + if manufacturer_part_id: |
| 218 | + ref_keys = ( |
| 219 | + [ReferenceKey(type=ReferenceKeyTypes.GLOBAL_REFERENCE, value=bpn) for bpn in bpn_list] |
| 220 | + if bpn_list else |
| 221 | + [ReferenceKey(type=ReferenceKeyTypes.GLOBAL_REFERENCE, value=manufacturer_id)] |
| 222 | + ) |
| 223 | + specific_manufacturer_part_asset_id = SpecificAssetId( |
| 224 | + name="manufacturerPartId", |
| 225 | + value=manufacturer_part_id, |
| 226 | + externalSubjectId=Reference( |
| 227 | + type=ReferenceTypes.EXTERNAL_REFERENCE, |
| 228 | + keys=ref_keys, |
| 229 | + ), |
| 230 | + ) # type: ignore |
| 231 | + specific_asset_ids.append(specific_manufacturer_part_asset_id) |
149 | 232 |
|
150 |
| - # The ID of the type/catalog part from the manufacturer |
151 |
| - # visble to everyone |
152 |
| - specific_manufacturer_part_asset_id = SpecificAssetId( |
153 |
| - name="manufacturerPartId", |
154 |
| - value=manufacturer_part_id, |
155 |
| - externalSubjectId=Reference( |
156 |
| - type=ReferenceTypes.EXTERNAL_REFERENCE, |
157 |
| - keys=[ |
158 |
| - ReferenceKey( |
159 |
| - type=ReferenceKeyTypes.GLOBAL_REFERENCE, value="PUBLIC_READABLE" |
| 233 | + # customerPartId(s) |
| 234 | + if customer_part_ids is not None and customer_part_ids != {}: |
| 235 | + for customer_part_id, bpn in customer_part_ids.items(): |
| 236 | + if not customer_part_id: |
| 237 | + continue |
| 238 | + ref_keys = ( |
| 239 | + [ReferenceKey(type=ReferenceKeyTypes.GLOBAL_REFERENCE, value=bpn)] |
| 240 | + if bpn else |
| 241 | + ([ReferenceKey(type=ReferenceKeyTypes.GLOBAL_REFERENCE, value=manufacturer_id)] if manufacturer_id else []) |
| 242 | + ) |
| 243 | + specific_customer_part_asset_id = SpecificAssetId( |
| 244 | + name="customerPartId", |
| 245 | + value=customer_part_id, |
| 246 | + externalSubjectId=Reference( |
| 247 | + type=ReferenceTypes.EXTERNAL_REFERENCE, |
| 248 | + keys=ref_keys, |
160 | 249 | ),
|
161 |
| - ], |
162 |
| - ), |
163 |
| - ) # type: ignore |
164 |
| - specific_asset_ids.append(specific_manufacturer_part_asset_id) |
| 250 | + ) # type: ignore |
| 251 | + specific_asset_ids.append(specific_customer_part_asset_id) |
165 | 252 |
|
166 |
| - # ids need to be saved as urn |
167 | 253 | shell = ShellDescriptor(
|
168 | 254 | id=aas_id.urn,
|
169 | 255 | globalAssetId=global_id.urn,
|
|
0 commit comments