Skip to content

Commit e0c3454

Browse files
authored
Merge pull request #14 from NSLS2/tw-validation
Add Data Validation in Tiled
2 parents ca1cd19 + b259495 commit e0c3454

File tree

6 files changed

+82
-27
lines changed

6 files changed

+82
-27
lines changed

.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
PREFECT_LOGGING_EXTRA_LOGGERS=validator

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ RUN apt-get -y update && \
77

88
COPY pixi.toml .
99
COPY pixi.lock .
10+
COPY .env .
1011
# use `--locked` to ensure the lockfile is up to date with pixi.toml
1112
RUN pixi install --locked
1213
# create the shell-hook bash script to activate the environment

data_validation.py

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,71 @@
1-
import time as ttime
1+
import time
22

33
from prefect import flow, get_run_logger, task
44
from prefect.blocks.system import Secret
5-
from tiled.client import from_profile
5+
6+
from bluesky_tiled_plugins.writing.validator import validate
7+
from tiled.client import from_uri
8+
9+
10+
@task(retries=2, retry_delay_seconds=10)
11+
def get_run(uid, api_key=None):
12+
tiled_client = from_uri("https://tiled.nsls2.bnl.gov", api_key=api_key)
13+
run = tiled_client["cms/raw"][uid]
14+
return run
15+
16+
17+
@task(retries=2, retry_delay_seconds=10)
18+
def get_run_migration(uid, api_key=None):
19+
tiled_client = from_uri("https://tiled.nsls2.bnl.gov", api_key=api_key)
20+
run = tiled_client["cms/migration"][uid]
21+
return run
22+
23+
24+
@task
25+
def read_stream(run, stream):
26+
stream_data = run[stream].read()
27+
return stream_data
628

729

830
@task(retries=2, retry_delay_seconds=10)
9-
def read_all_streams(beamline_acronym, uid):
31+
def read_all_streams(uid, api_key=None):
1032
logger = get_run_logger()
11-
api_key = Secret.load("tiled-cms-api-key", _sync=True).get()
12-
tiled_client = from_profile("nsls2", api_key=api_key)
13-
run = tiled_client[beamline_acronym]["raw"][uid]
33+
run = get_run(uid, api_key=api_key)
1434
logger.info(f"Validating uid {run.start['uid']}")
15-
start_time = ttime.monotonic()
35+
start_time = time.monotonic()
1636
for stream in run:
1737
logger.info(f"{stream}:")
18-
stream_start_time = ttime.monotonic()
19-
stream_data = run[stream].read()
20-
stream_elapsed_time = ttime.monotonic() - stream_start_time
38+
stream_start_time = time.monotonic()
39+
stream_data = read_stream(run, stream)
40+
stream_elapsed_time = time.monotonic() - stream_start_time
2141
logger.info(f"{stream} elapsed_time = {stream_elapsed_time}")
2242
logger.info(f"{stream} nbytes = {stream_data.nbytes:_}")
23-
elapsed_time = ttime.monotonic() - start_time
43+
elapsed_time = time.monotonic() - start_time
2444
logger.info(f"{elapsed_time = }")
45+
46+
47+
@task(retries=3, retry_delay_seconds=20)
48+
def data_validation_task(uid, api_key=None):
49+
"""Task to validate the data structure and accessibility in Tiled
50+
51+
Parameters
52+
----------
53+
uid : str
54+
The UID of the run to validate
55+
beamline_acronym : str, optional
56+
The acronym of the beamline (default is "cms")
57+
"""
58+
59+
logger = get_run_logger()
60+
logger.info("Connecting to Tiled client for beamline cms")
61+
run = get_run_migration(uid, api_key=api_key)
62+
logger.info(f"Validating uid {uid}")
63+
start_time = time.monotonic()
64+
validate(run, fix_errors=True, try_reading=True, raise_on_error=True)
65+
elapsed_time = time.monotonic() - start_time
66+
logger.info(f"Finished validating data; {elapsed_time = }")
67+
68+
69+
@flow(log_prints=True)
70+
def data_validation_flow(uid, api_key=None):
71+
data_validation_task(uid, api_key=api_key)

end_of_run_workflow.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
1+
import os
12
from prefect import flow, get_run_logger, task
23
from prefect.task_runners import ConcurrentTaskRunner
34

45
#from analysis import run_analysis
5-
from data_validation import read_all_streams
6+
from data_validation import read_all_streams, data_validation_task
67
from linker import create_symlinks
8+
from dotenv import load_dotenv
9+
10+
11+
def get_api_key_from_env():
12+
with open("/srv/container.secret", "r") as secrets:
13+
load_dotenv(stream=secrets)
14+
api_key = os.environ["TILED_API_KEY"]
15+
return api_key
716

817

918
@task
@@ -13,23 +22,27 @@ def log_completion():
1322

1423

1524
@flow(task_runner=ConcurrentTaskRunner())
16-
def end_of_run_workflow(stop_doc):
25+
def end_of_run_workflow(stop_doc, api_key=None):
1726
logger = get_run_logger()
1827
uid = stop_doc["run_start"]
28+
if not api_key:
29+
api_key = get_api_key_from_env()
1930

2031
# Launch validation, analysis, and linker tasks concurrently
21-
linker_task = create_symlinks.submit(uid)
32+
linker_task = create_symlinks.submit(uid, api_key=api_key)
2233
logger.info("Launched linker task")
2334

24-
validation_task = read_all_streams.submit("cms", uid)
25-
logger.info("Launched validation task")
35+
read_streams_task = read_all_streams.submit(uid, api_key=api_key)
36+
validation_task = data_validation_task.submit(uid, api_key=api_key)
37+
logger.info("Launched validation tasks")
2638

2739
# analysis_task = run_analysis(raw_ref=uid)
2840
# logger.info("Launched analysis task")
2941

3042
# Wait for all tasks to comple
3143
logger.info("Waiting for tasks to complete")
3244
linker_task.result()
45+
read_streams_task.result()
3346
validation_task.result()
3447
# analysis_task.result()
3548
log_completion()

linker.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,9 @@
11
from prefect import task, get_run_logger
2-
from prefect.blocks.system import Secret
32
from pathlib import Path
4-
from tiled.client import from_profile
53
import os
64
import glob
5+
from data_validation import get_run
76

8-
#tiled_client = from_uri('https://tiled.nsls2.bnl.gov')
9-
api_key = Secret.load("tiled-cms-api-key", _sync=True).get()
10-
tiled_client = from_profile("nsls2", api_key=api_key)['cms']
11-
tiled_client_raw = tiled_client["raw"]
12-
13-
#logger = logging.getLogger()
147

158

169
def detector_mapping(detector):
@@ -33,7 +26,7 @@ def chmod_and_chown(path, *, uid=None, gid=None, mode=0o775):
3326
# os.chown(path, uid, gid)
3427

3528
@task
36-
def create_symlinks(ref):
29+
def create_symlinks(ref, api_key=None):
3730
"""
3831
Parameters
3932
----------
@@ -43,7 +36,7 @@ def create_symlinks(ref):
4336
"""
4437
logger = get_run_logger()
4538

46-
hrf = tiled_client_raw[ref]
39+
hrf = get_run(ref, api_key=api_key)
4740
for name, doc in hrf.documents():
4841
if name == "start":
4942
if doc.get('experiment_project'):

pixi.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ platforms = ["linux-64"]
55

66
[dependencies]
77
prefect = "3.*"
8-
tiled-client = ">=0.2.3"
8+
tiled = ">=0.2.3"
99
python = "<3.12"
1010
bluesky-tiled-plugins = ">=2"
1111
prefect-docker = "*"

0 commit comments

Comments
 (0)