@@ -99,7 +99,7 @@ mutable struct Worker
99
99
del_msgs:: Array{Any,1} # XXX : Could del_msgs and add_msgs be Channels?
100
100
add_msgs:: Array{Any,1}
101
101
@atomic gcflag:: Bool
102
- state:: WorkerState
102
+ @atomic state:: WorkerState
103
103
c_state:: Threads.Condition # wait for state changes, lock for state
104
104
ct_time:: Float64 # creation time
105
105
conn_func:: Any # used to setup connections lazily
@@ -145,15 +145,13 @@ end
145
145
146
146
function set_worker_state (w, state)
147
147
lock (w. c_state) do
148
- w. state = state
148
+ @atomic w. state = state
149
149
notify (w. c_state; all= true )
150
150
end
151
151
end
152
152
153
153
function check_worker_state (w:: Worker )
154
- lock (w. c_state)
155
154
if w. state === W_CREATED
156
- unlock (w. c_state)
157
155
if ! isclusterlazy ()
158
156
if PGRP. topology === :all_to_all
159
157
# Since higher pids connect with lower pids, the remote worker
@@ -173,9 +171,8 @@ function check_worker_state(w::Worker)
173
171
errormonitor (t)
174
172
wait_for_conn (w)
175
173
end
176
- else
177
- unlock (w. c_state)
178
174
end
175
+ return nothing
179
176
end
180
177
181
178
exec_conn_func (id:: Int ) = exec_conn_func (worker_from_id (id):: Worker )
@@ -193,9 +190,7 @@ function exec_conn_func(w::Worker)
193
190
end
194
191
195
192
function wait_for_conn (w)
196
- lock (w. c_state)
197
193
if w. state === W_CREATED
198
- unlock (w. c_state)
199
194
timeout = worker_timeout () - (time () - w. ct_time)
200
195
timeout <= 0 && error (" peer $(w. id) has not connected to $(myid ()) " )
201
196
@@ -210,8 +205,6 @@ function wait_for_conn(w)
210
205
wait (w. c_state)
211
206
w. state === W_CREATED && error (" peer $(w. id) didn't connect to $(myid ()) within $timeout seconds" )
212
207
end
213
- else
214
- unlock (w. c_state)
215
208
end
216
209
nothing
217
210
end
@@ -667,8 +660,8 @@ function create_worker(manager, wconfig)
667
660
for jw in PGRP. workers
668
661
if (jw. id != 1 ) && (jw. id < w. id)
669
662
# wait for wl to join
670
- lock ( jw. c_state) do
671
- if jw. state === W_CREATED
663
+ if jw. state === W_CREATED
664
+ lock ( jw. c_state) do
672
665
wait (jw. c_state)
673
666
end
674
667
end
0 commit comments