diff --git a/src/MPIBenchmarks.jl b/src/MPIBenchmarks.jl index 8de21d0..a43eb59 100644 --- a/src/MPIBenchmarks.jl +++ b/src/MPIBenchmarks.jl @@ -11,6 +11,7 @@ struct Configuration{T} iters::Function stdout::IO filename::Union{String,Nothing} + off_cache::Union{Int64,Nothing} end function iterations(::Type{T}, s::Int) where {T} @@ -24,6 +25,7 @@ function Configuration(T::Type; verbose::Bool=true, filename::Union{String,Nothing}=nothing, iterations::Function=iterations, + off_cache::Union{Int64,Nothing}=0, ) ispow2(max_size) || throw(ArgumentError("Maximum size must be a power of 2, found $(max_size)")) isprimitivetype(T) || throw(ArgumentError("Type $(T) is not a primitive type")) @@ -38,7 +40,7 @@ function Configuration(T::Type; if isnothing(stdout) stdout = verbose ? Base.stdout : Base.devnull end - return Configuration(T, lengths, iterations, stdout, filename) + return Configuration(T, lengths, iterations, stdout, filename, off_cache) end """ diff --git a/src/imb_allreduce.jl b/src/imb_allreduce.jl index 20a512d..dd53f94 100644 --- a/src/imb_allreduce.jl +++ b/src/imb_allreduce.jl @@ -15,14 +15,16 @@ function IMBAllreduce(T::Type=Float32; ) end -function imb_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) - send_buffer = zeros(T, bufsize) - recv_buffer = zeros(T, bufsize) +function imb_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64) + cache_size = off_cache # Required in Bytes + num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize))) + send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] + recv_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] timer = 0.0 MPI.Barrier(comm) for i in 1:iters tic = MPI.Wtime() - MPI.Allreduce!(send_buffer, recv_buffer, +, comm) + MPI.Allreduce!(@inbounds(send_buffer[mod1(i, num_buffers)]), @inbounds(recv_buffer[mod1(i, num_buffers)]), +, comm) toc = MPI.Wtime() timer += toc - tic end diff --git a/src/imb_alltoall.jl b/src/imb_alltoall.jl index 8873695..2c5c9e9 100644 --- a/src/imb_alltoall.jl +++ b/src/imb_alltoall.jl @@ -15,16 +15,16 @@ function IMBAlltoall(T::Type=UInt8; ) end -function imb_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) - rank = MPI.Comm_rank(comm) +function imb_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64 ) + cache_size = off_cache # Required in Bytes + num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize))) nranks = MPI.Comm_size(comm) - buffer = zeros(T, bufsize * nranks) - root = 0 + buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers] timer = 0.0 MPI.Barrier(comm) for i in 1:iters tic = MPI.Wtime() - MPI.Alltoall!(UBuffer(buffer, Cint(bufsize), Cint(nranks), MPI.Datatype(T)), comm) + MPI.Alltoall!(UBuffer(@inbounds(buffer[mod1(i, num_buffers)]), Cint(bufsize), Cint(nranks), MPI.Datatype(T)), comm) toc = MPI.Wtime() timer += toc - tic end diff --git a/src/imb_collective.jl b/src/imb_collective.jl index 5507b45..0b98145 100644 --- a/src/imb_collective.jl +++ b/src/imb_collective.jl @@ -8,7 +8,7 @@ function run_imb_collective(benchmark::MPIBenchmark, func::Function, conf::Confi nranks = MPI.Comm_size(comm) # Warmup - func(conf.T, 1, 10, comm) + func(conf.T, 1, 10, comm, conf.off_cache) if iszero(rank) print_header(io) = println(io, "size (bytes),iteratons,min_time (seconds),max_time (seconds),avg_time (seconds)") @@ -28,7 +28,7 @@ function run_imb_collective(benchmark::MPIBenchmark, func::Function, conf::Confi size = 1 << s iters = conf.iters(conf.T, s) # Measure time on current rank - time = func(conf.T, size, iters, comm) + time = func(conf.T, size, iters, comm, conf.off_cache) if !iszero(rank) # If we aren't on rank 0, send to it our time diff --git a/src/imb_gatherv.jl b/src/imb_gatherv.jl index 93b111d..a7efe11 100644 --- a/src/imb_gatherv.jl +++ b/src/imb_gatherv.jl @@ -15,11 +15,13 @@ function IMBGatherv(T::Type=UInt8; ) end -function imb_gatherv(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) +function imb_gatherv(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64) + cache_size = off_cache # Required in Bytes + num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize))) rank = MPI.Comm_rank(comm) nranks = MPI.Comm_size(comm) - send_buffer = zeros(T, bufsize) - recv_buffer = zeros(T, bufsize * nranks) + send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] + recv_buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers] counts = [bufsize for _ in 1:nranks] root = 0 timer = 0.0 @@ -27,9 +29,9 @@ function imb_gatherv(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) for i in 1:iters tic = MPI.Wtime() if rank == root - MPI.Gatherv!(MPI.IN_PLACE, VBuffer(recv_buffer, counts), comm; root) + MPI.Gatherv!(MPI.IN_PLACE, VBuffer(@inbounds(recv_buffer[mod1(i, num_buffers)]), counts), comm; root) else - MPI.Gatherv!(send_buffer, nothing, comm; root) + MPI.Gatherv!(@inbounds(send_buffer[mod1(i, num_buffers)]), nothing, comm; root) end toc = MPI.Wtime() timer += toc - tic diff --git a/src/imb_reduce.jl b/src/imb_reduce.jl index 4b585bf..8bfbf27 100644 --- a/src/imb_reduce.jl +++ b/src/imb_reduce.jl @@ -15,11 +15,9 @@ function IMBReduce(T::Type=Float32; ) end -function imb_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) - cache_size = 2 ^ 16 # Assume cache size of 64 KiB - # To avoid hitting the cache, create buffers which are arrays of arrays of size - # `bufsize` so that they exceed the cache size - num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) +function imb_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64) + cache_size = off_cache # Required in Bytes + num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize))) send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] recv_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] timer = 0.0 diff --git a/src/osu_allreduce.jl b/src/osu_allreduce.jl index 1071500..2787d61 100644 --- a/src/osu_allreduce.jl +++ b/src/osu_allreduce.jl @@ -15,14 +15,16 @@ function OSUAllreduce(T::Type=Float32; ) end -function osu_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) - send_buffer = ones(T, bufsize) - recv_buffer = zeros(T, bufsize) +function osu_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64) + cache_size = off_cache # Required in Bytes + num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize))) + send_buffer = [ones(T, bufsize) for _ in 1:num_buffers] + recv_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] timer = 0.0 MPI.Barrier(comm) for i in 1:iters tic = MPI.Wtime() - MPI.Allreduce!(send_buffer, recv_buffer, +, comm) + MPI.Allreduce!(@inbounds(send_buffer[mod1(i, num_buffers)]), @inbounds(recv_buffer[mod1(i, num_buffers)]), +, comm) toc = MPI.Wtime() timer += toc - tic end diff --git a/src/osu_alltoall.jl b/src/osu_alltoall.jl index 542ead0..2ac8205 100644 --- a/src/osu_alltoall.jl +++ b/src/osu_alltoall.jl @@ -15,19 +15,19 @@ function OSUAlltoall(T::Type=UInt8; ) end -function osu_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) - rank = MPI.Comm_rank(comm) +function osu_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64) + cache_size = off_cache # Required in Bytes + num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize))) nranks = MPI.Comm_size(comm) - send_buffer = ones(T, bufsize * nranks) - recv_buffer = zeros(T, bufsize * nranks) - root = 0 + send_buffer = [ones(T, bufsize * nranks) for _ in 1:num_buffers] + recv_buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers] timer = 0.0 MPI.Barrier(comm) for i in 1:iters tic = MPI.Wtime() MPI.Alltoall!( - UBuffer(send_buffer, Cint(bufsize), Cint(nranks), MPI.Datatype(T)), - UBuffer(recv_buffer, Cint(bufsize), Cint(nranks), MPI.Datatype(T)), + UBuffer(@inbounds(send_buffer[mod1(i, num_buffers)]), Cint(bufsize), Cint(nranks), MPI.Datatype(T)), + UBuffer(@inbounds(recv_buffer[mod1(i, num_buffers)]), Cint(bufsize), Cint(nranks), MPI.Datatype(T)), comm) toc = MPI.Wtime() timer += toc - tic diff --git a/src/osu_reduce.jl b/src/osu_reduce.jl index ddf6699..dc857dc 100644 --- a/src/osu_reduce.jl +++ b/src/osu_reduce.jl @@ -15,11 +15,9 @@ function OSUReduce(T::Type=Float32; ) end -function osu_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) - cache_size = 2 ^ 16 # Assume cache size of 64 KiB - # To avoid hitting the cache, create buffers which are arrays of arrays of size - # `bufsize` so that they exceed the cache size - num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) +function osu_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64 ) + cache_size = off_cache # Required in Bytes + num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize))) send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] recv_buffer = [ones(T, bufsize) for _ in 1:num_buffers] timer = 0.0 diff --git a/test/runtests.jl b/test/runtests.jl index 69ed55f..d1b808a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -40,6 +40,7 @@ end const verbose = false mktemp() do filename, io benchmark(IMBAllreduce(; verbose, filename)) + benchmark(IMBAllreduce(; verbose, filename, off_cache=28835)) benchmark(IMBAlltoall(; verbose, filename, max_size=1<<16)) benchmark(IMBGatherv(; verbose, filename)) benchmark(IMBReduce(; verbose, filename))