32
32
## POSSIBILITY OF SUCH DAMAGE.
33
33
###############################################################################
34
34
35
+ import glob
35
36
import unittest
36
37
import subprocess
37
38
import os
41
42
from contextlib import contextmanager
42
43
43
44
cur_dir = os .path .dirname (os .path .abspath (__file__ ))
44
- src_dir = os .path .join (cur_dir , "../src" )
45
- orfs_dir = os .path .join (cur_dir , "../../../flow" )
46
- os .chdir (src_dir )
47
45
48
46
49
47
@contextmanager
@@ -65,18 +63,16 @@ class ResumeCheck(unittest.TestCase):
65
63
design = "gcd"
66
64
samples = 5
67
65
iterations = 2
66
+ experiment_name = "test-resume"
68
67
69
68
def setUp (self ):
70
69
self .config = os .path .join (
71
- orfs_dir , "designs" , self .platform , self .design , "autotuner.json"
70
+ cur_dir ,
71
+ f"../../../flow/designs/{ self .platform } /{ self .design } /autotuner.json" ,
72
72
)
73
73
self .jobs = self .samples
74
74
self .num_cpus = os .cpu_count ()
75
75
76
- # How it works: Say we have 5 samples and 5 iterations.
77
- # If we want to limit to only 5 trials (and avoid any parallelism magic by Ray)
78
- # We can set resources_per_trial = NUM_CORES/5 = 3.2 (fractional resources_per_trial are allowed!)
79
-
80
76
# Cast to 1 decimal place
81
77
res_per_trial = float ("{:.1f}" .format (self .num_cpus / self .samples ))
82
78
options = ["" , "--resume" ]
@@ -87,21 +83,48 @@ def setUp(self):
87
83
f" --platform { self .platform } "
88
84
f" --config { self .config } "
89
85
f" --jobs { self .jobs } "
90
- f" --experiment test-resume "
86
+ f" --experiment { self . experiment_name } "
91
87
f" tune --iterations { self .iterations } --samples { self .samples } "
92
88
f" --resources_per_trial { res_per_trial } "
93
89
f" { c } "
94
90
for c in options
95
91
]
96
92
93
+ def check_trial_times (self , iteration : int = 0 ) -> str :
94
+ """
95
+ Checks the nth iteration time of a trial.
96
+
97
+ :param iteration: The iteration to check.
98
+ :return: The latest modified UNIX time of the nth iteration.
99
+ """
100
+ if iteration < 0 or iteration >= self .iterations :
101
+ raise ValueError ("Iteration must be between 0 and iterations - 1" )
102
+
103
+ experiment_dir = os .path .join (
104
+ cur_dir ,
105
+ f"../../../flow/logs/{ self .platform } /{ self .design } /{ self .experiment_name } -tune" ,
106
+ )
107
+ folders = glob .glob (os .path .join (experiment_dir , f"variant-*-or-{ iteration } " ))
108
+ return max ((os .path .getmtime (folder ) for folder in folders ), default = 9e99 )
109
+
97
110
def test_tune_resume (self ):
98
111
# Goal is to first run the first config (without resume) and then run the second config (with resume)
99
112
# and check if the run is able to complete.
100
113
101
114
# Run the first config asynchronously.
102
115
print ("Running the first config" )
116
+ latest_modified_time = 0
103
117
with managed_process (self .commands [0 ], shell = True ) as proc :
104
- time .sleep (120 )
118
+ time .sleep (30 )
119
+ # Check if first config is complete
120
+ while True :
121
+ cur_modified_time = self .check_trial_times ()
122
+ print (f"Current modified time: { cur_modified_time } " )
123
+ print (f"Latest modified time: { latest_modified_time } " )
124
+ if abs (cur_modified_time - latest_modified_time ) < 1e-6 :
125
+ break
126
+ latest_modified_time = cur_modified_time
127
+ time .sleep (10 )
105
128
106
129
# Keep trying to stop the ray cluster until it is stopped
107
130
while 1 :
0 commit comments