diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d8f14f5..9aad012 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,14 +28,14 @@ jobs: - '1.9' os: - ubuntu-latest - - macOS-latest - windows-latest arch: - x64 - x86 - exclude: + include: - os: macOS-latest - arch: x86 + arch: aarch64 + exclude: - os: windows-latest # Killing workers doesn't work on windows in 1.9 version: '1.9' @@ -58,7 +58,7 @@ jobs: - uses: julia-actions/julia-buildpkg@v1 - uses: julia-actions/julia-runtest@v1 env: - JULIA_NUM_THREADS: 4 + JULIA_NUM_THREADS: 4,4 - uses: julia-actions/julia-processcoverage@v1 - uses: codecov/codecov-action@v5 with: diff --git a/Project.toml b/Project.toml index 26447ea..b5d8749 100644 --- a/Project.toml +++ b/Project.toml @@ -8,17 +8,22 @@ Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" [compat] +Aqua = "0.8" Distributed = "1" +LibSSH = "0.7" +LinearAlgebra = "1" Random = "1" Serialization = "1" Sockets = "1" +Test = "1" julia = "1.9" [extras] +Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" LibSSH = "00483490-30f8-4353-8aba-35b82f51f4d0" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["LinearAlgebra", "Test", "LibSSH", "Distributed"] +test = ["Aqua", "Distributed", "LibSSH", "LinearAlgebra", "Test"] diff --git a/src/cluster.jl b/src/cluster.jl index 3ca82d9..1f52db7 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -777,7 +777,6 @@ function redirect_output_from_additional_worker(pid, port) end function check_master_connect() - timeout = worker_timeout() * 1e9 # If we do not have at least process 1 connect to us within timeout # we log an error and exit, unless we're running on valgrind if ccall(:jl_running_on_valgrind,Cint,()) != 0 @@ -785,14 +784,10 @@ function check_master_connect() end errormonitor( - @async begin - start = time_ns() - while !haskey(map_pid_wrkr, 1) && (time_ns() - start) < timeout - sleep(1.0) - end - - if !haskey(map_pid_wrkr, 1) - print(stderr, "Master process (id 1) could not connect within $(timeout/1e9) seconds.\nexiting.\n") + Threads.@spawn begin + timeout = worker_timeout() + if timedwait(() -> haskey(map_pid_wrkr, 1), timeout) === :timed_out + print(stderr, "Master process (id 1) could not connect within $(timeout) seconds.\nexiting.\n") exit(1) end end diff --git a/src/clusterserialize.jl b/src/clusterserialize.jl index d2f09e7..347ca9c 100644 --- a/src/clusterserialize.jl +++ b/src/clusterserialize.jl @@ -167,10 +167,21 @@ function deserialize_global_from_main(s::ClusterSerializer, sym) return nothing end end - Core.eval(Main, Expr(:global, sym)) + if sym_isconst - ccall(:jl_set_const, Cvoid, (Any, Any, Any), Main, sym, v) + @static if VERSION >= v"1.12" + # Note that the post-lowering const form is not allowed in value + # position, so there needs to be a dummy `nothing` argument to drop the + # return value. + Core.eval(Main, Expr(:block, + Expr(:const, GlobalRef(Main, sym), v), + nothing)) + else + Core.eval(Main, Expr(:global, sym)) + ccall(:jl_set_const, Cvoid, (Any, Any, Any), Main, sym, v) + end else + Core.eval(Main, Expr(:global, sym)) invokelatest(setglobal!, Main, sym, v) end return nothing diff --git a/src/managers.jl b/src/managers.jl index fccc5ea..ab79abe 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -126,7 +126,7 @@ addprocs([ * `exeflags`: additional flags passed to the worker processes. It can either be a `Cmd`, a `String` holding one flag, or a collection of strings, with one element per flag. - E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`. + E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`. * `topology`: Specifies how the workers connect to each other. Sending a message between unconnected workers results in an error. @@ -767,7 +767,8 @@ function kill(manager::SSHManager, pid::Int, config::WorkerConfig) nothing end -function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeout = 15, term_timeout = 15) +function kill(manager::LocalManager, pid::Int, config::WorkerConfig; profile_wait = 6, exit_timeout = 15, term_timeout = 15) + # profile_wait = 6 is 1s for profile, 5s for the report to show # First, try sending `exit()` to the remote over the usual control channels remote_do(exit, pid) @@ -776,7 +777,14 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeou # Check to see if our child exited, and if not, send an actual kill signal if !process_exited(config.process) - @warn("Failed to gracefully kill worker $(pid), sending SIGQUIT") + @warn "Failed to gracefully kill worker $(pid)" + profile_sig = Sys.iswindows() ? nothing : Sys.isbsd() ? ("SIGINFO", 29) : ("SIGUSR1" , 10) + if profile_sig !== nothing + @warn("Sending profile $(profile_sig[1]) to worker $(pid)") + kill(config.process, profile_sig[2]) + sleep(profile_wait) + end + @warn("Sending SIGQUIT to worker $(pid)") kill(config.process, Base.SIGQUIT) sleep(term_timeout) diff --git a/src/workerpool.jl b/src/workerpool.jl index 6d03bc9..b28c726 100644 --- a/src/workerpool.jl +++ b/src/workerpool.jl @@ -149,6 +149,7 @@ function remotecall_pool(rc_f::typeof(remotecall), f, pool::AbstractWorkerPool, t = Threads.@spawn try wait(x) + catch # just wait, ignore errors here finally put!(pool, worker) end @@ -400,3 +401,29 @@ function remotecall_pool(rc_f, f, pool::CachingPool, args...; kwargs...) put!(pool, worker) end end + +# Specialization for remotecall. We have to wait for the Future it returns +# before putting the worker back in the pool. +function remotecall_pool(rc_f::typeof(remotecall), f, pool::CachingPool, args...; kwargs...) + worker = take!(pool) + f_ref = get(pool.map_obj2ref, (worker, f), (f, RemoteChannel(worker))) + isa(f_ref, Tuple) && (pool.map_obj2ref[(worker, f)] = f_ref[2]) # Add to tracker + + local x + try + x = rc_f(exec_from_cache, worker, f_ref, args...; kwargs...) + catch + put!(pool, worker) + rethrow() + end + + t = Threads.@spawn Threads.threadpool() try + wait(x) + catch # just wait, ignore errors here + finally + put!(pool, worker) + end + errormonitor(t) + + return x +end diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 23fba89..e6338f1 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -722,6 +722,8 @@ end @test nworkers() == length(unique(remotecall_fetch(wp->pmap(_->myid(), wp, 1:100), id_other, wp))) wp = WorkerPool(2:3) @test sort(unique(pmap(_->myid(), wp, 1:100))) == [2,3] + @test fetch(remotecall(myid, wp)) in wp.workers + @test_throws RemoteException fetch(remotecall(error, wp)) # wait on worker pool wp = WorkerPool(2:2) @@ -747,6 +749,8 @@ end # CachingPool tests wp = CachingPool(workers()) @test [1:100...] == pmap(x->x, wp, 1:100) + @test fetch(remotecall(myid, wp)) in wp.workers + @test_throws RemoteException fetch(remotecall(error, wp)) clear!(wp) @test length(wp.map_obj2ref) == 0 @@ -1017,15 +1021,19 @@ f16091b = () -> 1 # Test the behaviour of remotecall(f, ::AbstractWorkerPool), this should # keep the worker out of the pool until the underlying remotecall has # finished. - remotechan = RemoteChannel(wrkr1) - pool = WorkerPool([wrkr1]) - put_future = remotecall(() -> wait(remotechan), pool) - @test !isready(pool) - put!(remotechan, 1) - wait(put_future) - # The task that waits on the future to put it back into the pool runs - # asynchronously so we use timedwait() to check when the worker is back in. - @test timedwait(() -> isready(pool), 10) === :ok + for PoolType in (WorkerPool, CachingPool) + let + remotechan = RemoteChannel(wrkr1) + pool = PoolType([wrkr1]) + put_future = remotecall(() -> wait(remotechan), pool) + @test !isready(pool) + put!(remotechan, 1) + wait(put_future) + # The task that waits on the future to put it back into the pool runs + # asynchronously so we use timedwait() to check when the worker is back in. + @test timedwait(() -> isready(pool), 10) === :ok + end + end # Test calling @everywhere from a module not defined on the workers LocalBar.bar() @@ -1707,18 +1715,17 @@ end end # Ensure that the code has indeed been successfully executed everywhere - @test all(in(results), procs()) + return all(in(results), procs()) end # Test that the client port is reused. SO_REUSEPORT may not be supported on # all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX @assert nprocs() == 1 addprocs_with_testenv(4; lazy=false) - if ccall(:jl_has_so_reuseport, Int32, ()) == 1 - reuseport_tests() - else - @info "SO_REUSEPORT is unsupported, skipping reuseport tests" - end + + skip_reuseexport = ccall(:jl_has_so_reuseport, Int32, ()) != 1 + skip_reuseexport && @debug "SO_REUSEPORT support missing, reuseport_tests skipped" + @test reuseport_tests() skip = skip_reuseexport end @testset "Even more various individual issues" begin @@ -1848,11 +1855,11 @@ end end """ cmd = setenv(`$(julia) --project=$(project) -e $(testcode * extracode)`, env) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) # JULIA_PROJECT cmd = setenv(`$(julia) -e $(testcode * extracode)`, (env["JULIA_PROJECT"] = project; env)) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) # Pkg.activate(...) activateish = """ Base.ACTIVE_PROJECT[] = $(repr(project)) @@ -1860,7 +1867,7 @@ end addprocs(1) """ cmd = setenv(`$(julia) -e $(activateish * testcode * extracode)`, env) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) # JULIA_(LOAD|DEPOT)_PATH shufflecode = """ d = reverse(DEPOT_PATH) @@ -1879,7 +1886,7 @@ end end """ cmd = setenv(`$(julia) -e $(shufflecode * addcode * testcode * extracode)`, env) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) # Mismatch when shuffling after proc addition. Note that the use of # `addcode` mimics the behaviour of -p1 as the first worker is started # before `shufflecode` executes. @@ -1891,7 +1898,7 @@ end end """ cmd = setenv(`$(julia) -e $(failcode)`, env) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) # Hideous hack to double escape path separators on Windows so that it gets # interpolated into the string (and then Cmd) correctly. @@ -1918,7 +1925,7 @@ end end """ cmd = setenv(`$(julia) -e $(envcode)`, env) - @test success(cmd) + @test success(pipeline(cmd; stdout, stderr)) end end end @@ -1935,7 +1942,7 @@ include("splitrange.jl") # Next, ensure we get a log message when a worker does not cleanly exit w = only(addprocs(1)) - @test_logs (:warn, r"sending SIGQUIT") begin + @test_logs (:warn, r"Sending SIGQUIT") match_mode=:any begin remote_do(w) do # Cause the 'exit()' message that `rmprocs()` sends to do nothing Core.eval(Base, :(exit() = nothing)) diff --git a/test/runtests.jl b/test/runtests.jl index 5eea288..0e7441d 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,6 +1,8 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license using Test +import DistributedNext +import Aqua # Run the distributed test outside of the main driver since it needs its own # set of dedicated workers. @@ -22,3 +24,7 @@ include("distributed_exec.jl") include("managers.jl") include("distributed_stdlib_detection.jl") + +@testset "Aqua" begin + Aqua.test_all(DistributedNext) +end