1
1
import logging
2
+ from typing import ClassVar
2
3
3
4
import pydantic
5
+ from mxcubecore import HardwareRepository as HWR
6
+ from mxcubecore .HardwareObjects .Beamline import Beamline
4
7
5
- BEAMLINE_ADAPTER = None
8
+ from mxcubeweb .core .adapter .adapter_base import ActuatorAdapterBase
9
+ from mxcubeweb .core .models .configmodels import ResourceHandlerConfigModel
6
10
7
- # Singleton like interface is needed to keep the same reference to the
8
- # adapter object and its corresponding hardware objects, so that the signal
9
- # system won't clean up signal handlers. (PyDispatcher removes signal handlers
10
- # when an object is garbage collected )
11
+ resource_handler_config = ResourceHandlerConfigModel (
12
+ commands = [ "prepare_beamline_for_sample" ],
13
+ attributes = [ "data" , "get_value" ],
14
+ )
11
15
12
16
13
- def BeamlineAdapter (* args ):
14
- global BEAMLINE_ADAPTER
15
-
16
- if BEAMLINE_ADAPTER is None :
17
- BEAMLINE_ADAPTER = _BeamlineAdapter (* args )
18
-
19
- return BEAMLINE_ADAPTER
20
-
21
-
22
- class _BeamlineAdapter :
17
+ class BeamlineAdapter (ActuatorAdapterBase ):
23
18
"""
24
19
Adapter between Beamline route and Beamline hardware object.
25
20
"""
26
21
27
- def __init__ (self , beamline_hwobj , app ):
22
+ SUPPORTED_TYPES : ClassVar [list [object ]] = [Beamline ]
23
+
24
+ def __init__ (self , ho , role , app ):
25
+ super ().__init__ (ho , role , app , resource_handler_config )
28
26
self .app = app
29
- self ._bl = beamline_hwobj
30
27
self .adapter_dict = {}
31
28
32
- workflow = self ._bl .workflow
29
+ workflow = self ._ho .workflow
33
30
if workflow :
34
- workflow .connect ("parametersNeeded" , self .wf_parameters_needed )
31
+ workflow .connect ("parametersNeeded" , self ._wf_parameters_needed )
35
32
36
- gphl_workflow = self ._bl .gphl_workflow
33
+ gphl_workflow = self ._ho .gphl_workflow
37
34
if gphl_workflow :
38
35
gphl_workflow .connect (
39
- "GphlJsonParametersNeeded" , self .gphl_json_wf_parameters_needed
36
+ "GphlJsonParametersNeeded" , self ._gphl_json_wf_parameters_needed
40
37
)
41
38
gphl_workflow .connect (
42
- "GphlUpdateUiParameters" , self .gphl_json_wf_update_ui_parameters
39
+ "GphlUpdateUiParameters" , self ._gphl_json_wf_update_ui_parameters
43
40
)
44
41
45
- def wf_parameters_needed (self , params ):
42
+ from mxcubeweb .routes import signals
43
+
44
+ if HWR .beamline .xrf_spectrum :
45
+ HWR .beamline .xrf_spectrum .connect (
46
+ HWR .beamline .xrf_spectrum ,
47
+ "xrf_task_progress" ,
48
+ signals .xrf_task_progress ,
49
+ )
50
+
51
+ def _wf_parameters_needed (self , params ):
46
52
self .app .server .emit ("workflowParametersDialog" , params , namespace = "/hwr" )
47
53
48
- def gphl_json_wf_parameters_needed (self , schema , ui_schema ):
54
+ def _gphl_json_wf_parameters_needed (self , schema , ui_schema ):
49
55
params = {}
50
56
params ["schema" ] = schema
51
57
params ["ui_schema" ] = ui_schema
52
58
self .app .server .emit ("gphlWorkflowParametersDialog" , params , namespace = "/hwr" )
53
59
54
- def gphl_json_wf_update_ui_parameters (self , update_dict ):
60
+ def _gphl_json_wf_update_ui_parameters (self , update_dict ):
55
61
self .app .server .emit (
56
62
"gphlWorkflowUpdateUiParametersDialog" , update_dict , namespace = "/hwr"
57
63
)
58
64
59
- def get_object (self , name ):
60
- return self ._ho .get_hardware_object (name )
65
+ def _get_available_elements (self ):
66
+ escan = self ._ho .energy_scan
67
+ return escan .get_elements () if escan else []
61
68
62
- def dict (self ):
69
+ def get_value (self ) -> dict :
63
70
"""
64
71
Build dictionary value-representation for each beamline attribute
65
72
Returns:
@@ -68,6 +75,10 @@ def dict(self):
68
75
attributes = {}
69
76
70
77
for attr_name in self .app .mxcubecore .adapter_dict :
78
+ # We skip the beamline attribute to avoid endless recursion
79
+ if attr_name == "beamline" :
80
+ continue
81
+
71
82
try :
72
83
_d = self .app .mxcubecore .get_adapter (attr_name ).data ().dict ()
73
84
except pydantic .ValidationError :
@@ -76,13 +87,15 @@ def dict(self):
76
87
77
88
attributes .update ({attr_name : _d })
78
89
79
- return {"hardwareObjects" : attributes }
80
-
81
- def get_available_elements (self ):
82
- escan = self ._bl .energy_scan
83
- elements = []
90
+ return {
91
+ "energyScanElements" : self ._get_available_elements (),
92
+ "path" : HWR .beamline .session .get_base_image_directory (),
93
+ "actionsList" : [],
94
+ "hardwareObjects" : attributes ,
95
+ }
84
96
85
- if escan :
86
- elements = escan .get_elements ()
97
+ def prepare_beamline_for_sample (self ) -> dict :
98
+ if hasattr (HWR .beamline .collect , "prepare_for_new_sample" ):
99
+ HWR .beamline .collect .prepare_for_new_sample ()
87
100
88
- return {"elements" : elements }
101
+ return {}
0 commit comments