1
1
import logging
2
+ from typing import ClassVar
2
3
3
4
import pydantic
5
+ from mxcubecore import HardwareRepository as HWR
6
+ from mxcubecore .HardwareObjects .Beamline import Beamline
4
7
5
- BEAMLINE_ADAPTER = None
8
+ from mxcubeweb .core .adapter .adapter_base import ActuatorAdapterBase
9
+ from mxcubeweb .core .models .configmodels import ResourceHandlerConfigModel
10
+ from mxcubeweb .routes import signals
6
11
7
- # Singleton like interface is needed to keep the same reference to the
8
- # adapter object and its corresponding hardware objects, so that the signal
9
- # system won't clean up signal handlers. (PyDispatcher removes signal handlers
10
- # when an object is garbage collected )
12
+ resource_handler_config = ResourceHandlerConfigModel (
13
+ commands = [ "prepare_beamline_for_sample" ],
14
+ attributes = [ "data" , "get_value" ],
15
+ )
11
16
12
17
13
- def BeamlineAdapter (* args ):
14
- global BEAMLINE_ADAPTER
15
-
16
- if BEAMLINE_ADAPTER is None :
17
- BEAMLINE_ADAPTER = _BeamlineAdapter (* args )
18
-
19
- return BEAMLINE_ADAPTER
20
-
21
-
22
- class _BeamlineAdapter :
18
+ class BeamlineAdapter (ActuatorAdapterBase ):
23
19
"""
24
20
Adapter between Beamline route and Beamline hardware object.
25
21
"""
26
22
27
- def __init__ (self , beamline_hwobj , app ):
23
+ SUPPORTED_TYPES : ClassVar [list [object ]] = [Beamline ]
24
+
25
+ def __init__ (self , ho , role , app ):
26
+ super ().__init__ (ho , role , app , resource_handler_config )
28
27
self .app = app
29
- self ._bl = beamline_hwobj
30
28
self .adapter_dict = {}
31
29
32
- workflow = self ._bl .workflow
30
+ workflow = self ._ho .workflow
33
31
if workflow :
34
- workflow .connect ("parametersNeeded" , self .wf_parameters_needed )
32
+ workflow .connect ("parametersNeeded" , self ._wf_parameters_needed )
35
33
36
- gphl_workflow = self ._bl .gphl_workflow
34
+ gphl_workflow = self ._ho .gphl_workflow
37
35
if gphl_workflow :
38
36
gphl_workflow .connect (
39
- "GphlJsonParametersNeeded" , self .gphl_json_wf_parameters_needed
37
+ "GphlJsonParametersNeeded" , self ._gphl_json_wf_parameters_needed
40
38
)
41
39
gphl_workflow .connect (
42
- "GphlUpdateUiParameters" , self .gphl_json_wf_update_ui_parameters
40
+ "GphlUpdateUiParameters" , self ._gphl_json_wf_update_ui_parameters
43
41
)
44
42
45
- def wf_parameters_needed (self , params ):
43
+ if HWR .beamline .xrf_spectrum :
44
+ HWR .beamline .xrf_spectrum .connect (
45
+ HWR .beamline .xrf_spectrum ,
46
+ "xrf_task_progress" ,
47
+ signals .xrf_task_progress ,
48
+ )
49
+
50
+ def _wf_parameters_needed (self , params ):
46
51
self .app .server .emit ("workflowParametersDialog" , params , namespace = "/hwr" )
47
52
48
- def gphl_json_wf_parameters_needed (self , schema , ui_schema ):
53
+ def _gphl_json_wf_parameters_needed (self , schema , ui_schema ):
49
54
params = {}
50
55
params ["schema" ] = schema
51
56
params ["ui_schema" ] = ui_schema
52
57
self .app .server .emit ("gphlWorkflowParametersDialog" , params , namespace = "/hwr" )
53
58
54
- def gphl_json_wf_update_ui_parameters (self , update_dict ):
59
+ def _gphl_json_wf_update_ui_parameters (self , update_dict ):
55
60
self .app .server .emit (
56
61
"gphlWorkflowUpdateUiParametersDialog" , update_dict , namespace = "/hwr"
57
62
)
58
63
59
- def get_object (self , name ):
60
- return self ._ho .get_hardware_object (name )
64
+ def _get_available_elements (self ):
65
+ escan = self ._ho .energy_scan
66
+ return escan .get_elements () if escan else []
61
67
62
- def dict (self ):
68
+ def get_value (self ) -> dict :
63
69
"""
64
70
Build dictionary value-representation for each beamline attribute
65
71
Returns:
@@ -68,6 +74,10 @@ def dict(self):
68
74
attributes = {}
69
75
70
76
for attr_name in self .app .mxcubecore .adapter_dict :
77
+ # We skip the beamline attribute to avoid endless recursion
78
+ if attr_name == "beamline" :
79
+ continue
80
+
71
81
try :
72
82
_d = self .app .mxcubecore .get_adapter (attr_name ).data ().dict ()
73
83
except pydantic .ValidationError :
@@ -76,13 +86,15 @@ def dict(self):
76
86
77
87
attributes .update ({attr_name : _d })
78
88
79
- return {"hardwareObjects" : attributes }
80
-
81
- def get_available_elements (self ):
82
- escan = self ._bl .energy_scan
83
- elements = []
89
+ return {
90
+ "energyScanElements" : self ._get_available_elements (),
91
+ "path" : HWR .beamline .session .get_base_image_directory (),
92
+ "actionsList" : [],
93
+ "hardwareObjects" : attributes ,
94
+ }
84
95
85
- if escan :
86
- elements = escan .get_elements ()
96
+ def prepare_beamline_for_sample (self ) -> dict :
97
+ if hasattr (HWR .beamline .collect , "prepare_for_new_sample" ):
98
+ HWR .beamline .collect .prepare_for_new_sample ()
87
99
88
- return {"elements" : elements }
100
+ return {}
0 commit comments