@@ -118,6 +118,9 @@ struct Server{L <: Listener}
118
118
connections:: Set{Connection}
119
119
# server listenandserve loop task
120
120
task:: Task
121
+ # Protects the connections Set which is mutated in the listenloop
122
+ # while potentially being accessed by the close method at the same time
123
+ connections_lock:: ReentrantLock
121
124
end
122
125
123
126
port (s:: Server ) = Int (s. listener. addr. port)
@@ -127,8 +130,10 @@ Base.wait(s::Server) = wait(s.task)
127
130
function forceclose (s:: Server )
128
131
shutdown (s. on_shutdown)
129
132
close (s. listener)
130
- for c in s. connections
131
- close (c)
133
+ Base. @lock s. connections_lock begin
134
+ for c in s. connections
135
+ close (c)
136
+ end
132
137
end
133
138
return wait (s. task)
134
139
end
@@ -166,14 +171,19 @@ function Base.close(s::Server)
166
171
shutdown (s. on_shutdown)
167
172
close (s. listener)
168
173
# first pass to mark or request connections to close
169
- for c in s. connections
170
- requestclose! (c)
174
+ Base. @lock s. connections_lock begin
175
+ for c in s. connections
176
+ requestclose! (c)
177
+ end
171
178
end
172
179
# second pass to wait for connections to close
173
180
# we wait for connections to empty because as
174
181
# connections close themselves, they are removed
175
182
# from our connections Set
176
- while ! isempty (s. connections)
183
+ while true
184
+ Base. @lock s. connections_lock begin
185
+ isempty (s. connections) && break
186
+ end
177
187
sleep (0.5 + rand () * 0.1 )
178
188
end
179
189
return wait (s. task)
@@ -346,25 +356,28 @@ function listen!(f, listener::Listener;
346
356
access_log:: Union{Function,Nothing} = nothing ,
347
357
verbose= false , kw... )
348
358
conns = Set {Connection} ()
359
+ conns_lock = ReentrantLock ()
349
360
ready_to_accept = Threads. Event ()
350
361
if verbose > 0
351
362
tsk = @_spawn_interactive LoggingExtras. withlevel (Logging. Debug; verbosity= verbose) do
352
- listenloop (f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, verbose)
363
+ listenloop (f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, conns_lock, verbose)
353
364
end
354
365
else
355
- tsk = @_spawn_interactive listenloop (f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, verbose)
366
+ tsk = @_spawn_interactive listenloop (f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, conns_lock, verbose)
356
367
end
357
368
# wait until the listenloop enters the loop
358
369
wait (ready_to_accept)
359
- return Server (listener, on_shutdown, conns, tsk)
370
+ return Server (listener, on_shutdown, conns, tsk, conns_lock )
360
371
end
361
372
362
373
""" "
363
374
Main server loop.
364
375
Accepts new tcp connections and spawns async tasks to handle them."
365
376
"""
366
- function listenloop (f, listener, conns, tcpisvalid,
367
- max_connections, readtimeout, access_log, ready_to_accept, verbose)
377
+ function listenloop (
378
+ f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept,
379
+ conns_lock, verbose
380
+ )
368
381
sem = Base. Semaphore (max_connections)
369
382
verbose >= 0 && @infov 1 " Listening on: $(listener. hostname) :$(listener. hostport) , thread id: $(Threads. threadid ()) "
370
383
notify (ready_to_accept)
@@ -382,13 +395,13 @@ function listenloop(f, listener, conns, tcpisvalid,
382
395
end
383
396
conn = Connection (io)
384
397
conn. state = IDLE
385
- push! (conns, conn)
398
+ Base . @lock conns_lock push! (conns, conn)
386
399
conn. host, conn. port = listener. hostname, listener. hostport
387
400
@async try
388
401
handle_connection (f, conn, listener, readtimeout, access_log)
389
402
finally
390
403
# handle_connection is in charge of closing the underlying io
391
- delete! (conns, conn)
404
+ Base . @lock conns_lock delete! (conns, conn)
392
405
Base. release (sem)
393
406
end
394
407
catch e
0 commit comments