-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcreate_dag.py
More file actions
103 lines (75 loc) · 3.68 KB
/
create_dag.py
File metadata and controls
103 lines (75 loc) · 3.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import sys, os, glob
import subprocess
import re
# Append the custom module path
sys.path.append("/data/user/tvaneede/GlobalFit/reco_processing")
# Import the datasets module
from datasets import datasets
# set the inputs
reco_version = "v10"
# Dynamically select the desired dataset
simulation_datasets = getattr(datasets, reco_version)
# fixed paths
dag_base_path = "/scratch/tvaneede/reco/run_taupede_tianlu"
work_path = "/data/user/tvaneede/GlobalFit/reco_processing/"
nfiles = 200 # process x files per subfolder
submit_jobs = True # actually submit the dag jobs
for simulation_name in simulation_datasets:
simulation_subfolders = simulation_datasets[simulation_name]['subfolders']
simulation_flavor = simulation_datasets[simulation_name]["flavor"]
simulation_dataset = simulation_datasets[simulation_name]['dataset']
simulation_reco_base_in_path = simulation_datasets[simulation_name]['reco_base_in_path']
simulation_reco_base_out_path = simulation_datasets[simulation_name]['reco_base_out_path']
for simulation_subfolder in simulation_subfolders:
# fixed paths
reco_input_path = f"{simulation_reco_base_in_path}/{simulation_dataset}/{simulation_subfolder}"
reco_out_path = f"{simulation_reco_base_out_path}/{simulation_dataset}/{simulation_subfolder}"
# fixed dag paths
dag_name = f"reco_dag_{reco_version}_{simulation_dataset}_{simulation_subfolder}"
dag_path = f"{dag_base_path}/{reco_version}/{dag_name}"
log_dir = f"{dag_path}/logs"
backup_path = f"{work_path}/backup_scripts/{reco_version}"
# creating folders and copying scripts
print("creating", dag_path)
os.system(f"mkdir -p {dag_path}")
os.system(f"mkdir -p {log_dir}")
os.system(f"mkdir -p {reco_out_path}")
os.system(f"mkdir -p {backup_path}")
os.system(f"cp rec_tau.sub {dag_path}")
# backup scripts
os.system(f"cp {work_path}/rec_tau.sub {backup_path}")
os.system(f"cp {work_path}/wrapper.sh {backup_path}")
os.system(f"cp {work_path}/rec_tau.py {backup_path}")
# create the dag job
outfile = open(f"{dag_path}/submit.dag", 'w')
infiles_list = glob.glob(f"{reco_input_path}/Level2_{simulation_flavor}_*.i3.zst")
print(f"found {len(infiles_list)} files")
infiles_list = sorted(infiles_list, key=lambda x: int(re.search(r'\.(\d{6})\.i3\.zst$', x).group(1)))
i = 0
for INFILES in infiles_list:
i+=1
# if i <= 200: continue
filename = os.path.basename(INFILES)
JOBID = filename.split("_")[2] # gives the run number
OUTFILE = f"{reco_out_path}/Reco_{simulation_flavor}_{JOBID}_out.i3.bz2"
outfile.write(f"JOB {JOBID} rec_tau.sub\n")
outfile.write(f'VARS {JOBID} LOGDIR="{log_dir}"\n')
outfile.write(f'VARS {JOBID} JOBID="{JOBID}"\n')
outfile.write(f'VARS {JOBID} INFILES="{INFILES}"\n')
outfile.write(f'VARS {JOBID} OUTFILE="{OUTFILE}"\n')
if i == nfiles: break
if submit_jobs:
os.chdir(dag_path)
print(f"Changed directory to: {dag_path}")
# Run the script and capture both stdout and stderr
process = subprocess.run(
"condor_submit_dag submit.dag",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True # Ensures output is in string format
)
# Log output and errors
print("STDOUT:\n", process.stdout)
print("STDERR:\n", process.stderr)
print("Exit Code:", process.returncode)