From 7fb3adc58e3b38859880d04a732f36cd234e9865 Mon Sep 17 00:00:00 2001 From: Khalid Bin Huda Date: Fri, 18 Nov 2022 18:29:56 +0100 Subject: [PATCH 1/9] remove division error --- src/osu_reduce.jl | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/osu_reduce.jl b/src/osu_reduce.jl index ddf6699..6821d4b 100644 --- a/src/osu_reduce.jl +++ b/src/osu_reduce.jl @@ -16,12 +16,20 @@ function OSUReduce(T::Type=Float32; end function osu_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) - cache_size = 2 ^ 16 # Assume cache size of 64 KiB - # To avoid hitting the cache, create buffers which are arrays of arrays of size - # `bufsize` so that they exceed the cache size - num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) + # for Noctua 1, L3 cache is 27.5 MiB + # l3: 27.5*1024*1024 = 28835840 + #cache_size = 28835840 + cache_size = 6291456 + + # To avoid integer division error when bufsize is equal to zero + if bufsize == 0 + num_buffers = max(1, 2 * cache_size) + else + num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) + end send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] recv_buffer = [ones(T, bufsize) for _ in 1:num_buffers] + timer = 0.0 MPI.Barrier(comm) for i in 1:iters From a4250f11ae831311b8aa668efd41883743cdfbaa Mon Sep 17 00:00:00 2001 From: Khalid Bin Huda Date: Fri, 18 Nov 2022 18:58:08 +0100 Subject: [PATCH 2/9] corrected the cache size --- src/osu_reduce.jl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/osu_reduce.jl b/src/osu_reduce.jl index 6821d4b..fd8b6da 100644 --- a/src/osu_reduce.jl +++ b/src/osu_reduce.jl @@ -18,9 +18,8 @@ end function osu_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) # for Noctua 1, L3 cache is 27.5 MiB # l3: 27.5*1024*1024 = 28835840 - #cache_size = 28835840 - cache_size = 6291456 - + cache_size = 28835840 + # To avoid integer division error when bufsize is equal to zero if bufsize == 0 num_buffers = max(1, 2 * cache_size) From 5d9963727f45209ca9fc63ab31ca1882d77029c9 Mon Sep 17 00:00:00 2001 From: Khalid Bin Huda Date: Fri, 18 Nov 2022 19:10:14 +0100 Subject: [PATCH 3/9] remove division error from imb_reduce --- src/imb_reduce.jl | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/imb_reduce.jl b/src/imb_reduce.jl index 4b585bf..3d7f4f0 100644 --- a/src/imb_reduce.jl +++ b/src/imb_reduce.jl @@ -16,10 +16,17 @@ function IMBReduce(T::Type=Float32; end function imb_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) - cache_size = 2 ^ 16 # Assume cache size of 64 KiB - # To avoid hitting the cache, create buffers which are arrays of arrays of size - # `bufsize` so that they exceed the cache size - num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) + # for Noctua 1, L3 cache is 27.5 MiB + # l3: 27.5*1024*1024 = 28835840 + cache_size = 28835840 + + # To avoid integer division error when bufsize is equal to zero + if bufsize == 0 + num_buffers = max(1, 2 * cache_size) + else + num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) + end + send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] recv_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] timer = 0.0 From 2e2d34ff573667a2c5a47c1078e21f894e7a972d Mon Sep 17 00:00:00 2001 From: Khalid Bin Huda Date: Sun, 11 Dec 2022 14:18:03 +0100 Subject: [PATCH 4/9] added cache avoidance --- src/MPIBenchmarks.jl | 4 +++- src/imb_allreduce.jl | 19 +++++++++++++++---- src/imb_alltoall.jl | 18 ++++++++++++++---- src/imb_collective.jl | 4 ++-- src/imb_gatherv.jl | 19 +++++++++++++++---- src/imb_reduce.jl | 8 +++++--- src/osu_allreduce.jl | 20 ++++++++++++++++---- src/osu_alltoall.jl | 22 +++++++++++++++------- src/osu_reduce.jl | 5 +++-- 9 files changed, 88 insertions(+), 31 deletions(-) diff --git a/src/MPIBenchmarks.jl b/src/MPIBenchmarks.jl index 8de21d0..a43eb59 100644 --- a/src/MPIBenchmarks.jl +++ b/src/MPIBenchmarks.jl @@ -11,6 +11,7 @@ struct Configuration{T} iters::Function stdout::IO filename::Union{String,Nothing} + off_cache::Union{Int64,Nothing} end function iterations(::Type{T}, s::Int) where {T} @@ -24,6 +25,7 @@ function Configuration(T::Type; verbose::Bool=true, filename::Union{String,Nothing}=nothing, iterations::Function=iterations, + off_cache::Union{Int64,Nothing}=0, ) ispow2(max_size) || throw(ArgumentError("Maximum size must be a power of 2, found $(max_size)")) isprimitivetype(T) || throw(ArgumentError("Type $(T) is not a primitive type")) @@ -38,7 +40,7 @@ function Configuration(T::Type; if isnothing(stdout) stdout = verbose ? Base.stdout : Base.devnull end - return Configuration(T, lengths, iterations, stdout, filename) + return Configuration(T, lengths, iterations, stdout, filename, off_cache) end """ diff --git a/src/imb_allreduce.jl b/src/imb_allreduce.jl index 20a512d..7e56d0a 100644 --- a/src/imb_allreduce.jl +++ b/src/imb_allreduce.jl @@ -15,14 +15,25 @@ function IMBAllreduce(T::Type=Float32; ) end -function imb_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) - send_buffer = zeros(T, bufsize) - recv_buffer = zeros(T, bufsize) +function imb_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64) + # If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created. + cache_size = off_cache # Required in Bytes + + # To avoid integer division error when bufsize is equal to zero + if bufsize == 0 + num_buffers = max(1, 2 * cache_size) + else + num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) + end + + send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] + recv_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] + timer = 0.0 MPI.Barrier(comm) for i in 1:iters tic = MPI.Wtime() - MPI.Allreduce!(send_buffer, recv_buffer, +, comm) + MPI.Allreduce!(@inbounds(send_buffer[mod1(i, num_buffers)]), @inbounds(recv_buffer[mod1(i, num_buffers)]), +, comm) toc = MPI.Wtime() timer += toc - tic end diff --git a/src/imb_alltoall.jl b/src/imb_alltoall.jl index 8873695..4f9f8bd 100644 --- a/src/imb_alltoall.jl +++ b/src/imb_alltoall.jl @@ -15,16 +15,26 @@ function IMBAlltoall(T::Type=UInt8; ) end -function imb_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) +function imb_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64 ) + # If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created. + cache_size = off_cache # Required in Bytes + + # To avoid integer division error when bufsize is equal to zero + if bufsize == 0 + num_buffers = max(1, 2 * cache_size) + else + num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) + end + + buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers] rank = MPI.Comm_rank(comm) nranks = MPI.Comm_size(comm) - buffer = zeros(T, bufsize * nranks) - root = 0 timer = 0.0 + MPI.Barrier(comm) for i in 1:iters tic = MPI.Wtime() - MPI.Alltoall!(UBuffer(buffer, Cint(bufsize), Cint(nranks), MPI.Datatype(T)), comm) + MPI.Alltoall!(UBuffer(@inbounds(buffer[mod1(i, num_buffers)]), Cint(bufsize), Cint(nranks), MPI.Datatype(T)), comm) toc = MPI.Wtime() timer += toc - tic end diff --git a/src/imb_collective.jl b/src/imb_collective.jl index 5507b45..0b98145 100644 --- a/src/imb_collective.jl +++ b/src/imb_collective.jl @@ -8,7 +8,7 @@ function run_imb_collective(benchmark::MPIBenchmark, func::Function, conf::Confi nranks = MPI.Comm_size(comm) # Warmup - func(conf.T, 1, 10, comm) + func(conf.T, 1, 10, comm, conf.off_cache) if iszero(rank) print_header(io) = println(io, "size (bytes),iteratons,min_time (seconds),max_time (seconds),avg_time (seconds)") @@ -28,7 +28,7 @@ function run_imb_collective(benchmark::MPIBenchmark, func::Function, conf::Confi size = 1 << s iters = conf.iters(conf.T, s) # Measure time on current rank - time = func(conf.T, size, iters, comm) + time = func(conf.T, size, iters, comm, conf.off_cache) if !iszero(rank) # If we aren't on rank 0, send to it our time diff --git a/src/imb_gatherv.jl b/src/imb_gatherv.jl index 93b111d..c9d57d5 100644 --- a/src/imb_gatherv.jl +++ b/src/imb_gatherv.jl @@ -16,10 +16,21 @@ function IMBGatherv(T::Type=UInt8; end function imb_gatherv(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) + # If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created. + cache_size = off_cache # Required in Bytes + + # To avoid integer division error when bufsize is equal to zero + if bufsize == 0 + num_buffers = max(1, 2 * cache_size) + else + num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) + end + + send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] + recv_buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers] rank = MPI.Comm_rank(comm) nranks = MPI.Comm_size(comm) - send_buffer = zeros(T, bufsize) - recv_buffer = zeros(T, bufsize * nranks) + counts = [bufsize for _ in 1:nranks] root = 0 timer = 0.0 @@ -27,9 +38,9 @@ function imb_gatherv(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) for i in 1:iters tic = MPI.Wtime() if rank == root - MPI.Gatherv!(MPI.IN_PLACE, VBuffer(recv_buffer, counts), comm; root) + MPI.Gatherv!(MPI.IN_PLACE, VBuffer(@inbounds(recv_buffer[mod1(i, num_buffers)]), counts), comm; root) else - MPI.Gatherv!(send_buffer, nothing, comm; root) + MPI.Gatherv!(@inbounds(send_buffer[mod1(i, num_buffers)]), nothing, comm; root) end toc = MPI.Wtime() timer += toc - tic diff --git a/src/imb_reduce.jl b/src/imb_reduce.jl index 3d7f4f0..d60ff8e 100644 --- a/src/imb_reduce.jl +++ b/src/imb_reduce.jl @@ -15,10 +15,12 @@ function IMBReduce(T::Type=Float32; ) end -function imb_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) +function imb_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64) # for Noctua 1, L3 cache is 27.5 MiB - # l3: 27.5*1024*1024 = 28835840 - cache_size = 28835840 + # l3: 27.5*1024*1024 = 28835840 Bytes + + # If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created. + cache_size = off_cache # Required in Bytes # To avoid integer division error when bufsize is equal to zero if bufsize == 0 diff --git a/src/osu_allreduce.jl b/src/osu_allreduce.jl index 1071500..0e7a759 100644 --- a/src/osu_allreduce.jl +++ b/src/osu_allreduce.jl @@ -15,14 +15,26 @@ function OSUAllreduce(T::Type=Float32; ) end -function osu_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) - send_buffer = ones(T, bufsize) - recv_buffer = zeros(T, bufsize) +function osu_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64) + + # If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created. + cache_size = off_cache # Required in Bytes + + # To avoid integer division error when bufsize is equal to zero + if bufsize == 0 + num_buffers = max(1, 2 * cache_size) + else + num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) + end + + send_buffer = [ones(T, bufsize) for _ in 1:num_buffers] + recv_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] + timer = 0.0 MPI.Barrier(comm) for i in 1:iters tic = MPI.Wtime() - MPI.Allreduce!(send_buffer, recv_buffer, +, comm) + MPI.Allreduce!(@inbounds(send_buffer[mod1(i, num_buffers)]), @inbounds(recv_buffer[mod1(i, num_buffers)]), +, comm) toc = MPI.Wtime() timer += toc - tic end diff --git a/src/osu_alltoall.jl b/src/osu_alltoall.jl index 542ead0..4e756a3 100644 --- a/src/osu_alltoall.jl +++ b/src/osu_alltoall.jl @@ -15,19 +15,27 @@ function OSUAlltoall(T::Type=UInt8; ) end -function osu_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) - rank = MPI.Comm_rank(comm) +function osu_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64) + # If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created. + cache_size = off_cache # Required in Bytes + + # To avoid integer division error when bufsize is equal to zero + if bufsize == 0 + num_buffers = max(1, 2 * cache_size) + else + num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) + end + + send_buffer = [ones(T, bufsize * nranks) for _ in 1:num_buffers] + recv_buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers] nranks = MPI.Comm_size(comm) - send_buffer = ones(T, bufsize * nranks) - recv_buffer = zeros(T, bufsize * nranks) - root = 0 timer = 0.0 MPI.Barrier(comm) for i in 1:iters tic = MPI.Wtime() MPI.Alltoall!( - UBuffer(send_buffer, Cint(bufsize), Cint(nranks), MPI.Datatype(T)), - UBuffer(recv_buffer, Cint(bufsize), Cint(nranks), MPI.Datatype(T)), + UBuffer(@inbounds(send_buffer[mod1(i, num_buffers)]), Cint(bufsize), Cint(nranks), MPI.Datatype(T)), + UBuffer(@inbounds(recv_buffer[mod1(i, num_buffers)]), Cint(bufsize), Cint(nranks), MPI.Datatype(T)), comm) toc = MPI.Wtime() timer += toc - tic diff --git a/src/osu_reduce.jl b/src/osu_reduce.jl index fd8b6da..755e9c3 100644 --- a/src/osu_reduce.jl +++ b/src/osu_reduce.jl @@ -15,10 +15,11 @@ function OSUReduce(T::Type=Float32; ) end -function osu_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) +function osu_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64 ) # for Noctua 1, L3 cache is 27.5 MiB # l3: 27.5*1024*1024 = 28835840 - cache_size = 28835840 + # If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created. + cache_size = off_cache # Required in Bytes # To avoid integer division error when bufsize is equal to zero if bufsize == 0 From 245626ee101657fa9a9ab9bfa2a178265da8ac5f Mon Sep 17 00:00:00 2001 From: Khalid Bin Huda Date: Sun, 11 Dec 2022 14:34:14 +0100 Subject: [PATCH 5/9] remove error --- src/imb_alltoall.jl | 2 +- src/imb_gatherv.jl | 4 ++-- src/osu_alltoall.jl | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/imb_alltoall.jl b/src/imb_alltoall.jl index 4f9f8bd..97ca6c5 100644 --- a/src/imb_alltoall.jl +++ b/src/imb_alltoall.jl @@ -26,9 +26,9 @@ function imb_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cac num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) end - buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers] rank = MPI.Comm_rank(comm) nranks = MPI.Comm_size(comm) + buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers] timer = 0.0 MPI.Barrier(comm) diff --git a/src/imb_gatherv.jl b/src/imb_gatherv.jl index c9d57d5..eb6f029 100644 --- a/src/imb_gatherv.jl +++ b/src/imb_gatherv.jl @@ -26,10 +26,10 @@ function imb_gatherv(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) end - send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] - recv_buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers] rank = MPI.Comm_rank(comm) nranks = MPI.Comm_size(comm) + send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] + recv_buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers] counts = [bufsize for _ in 1:nranks] root = 0 diff --git a/src/osu_alltoall.jl b/src/osu_alltoall.jl index 4e756a3..7199457 100644 --- a/src/osu_alltoall.jl +++ b/src/osu_alltoall.jl @@ -26,9 +26,10 @@ function osu_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cac num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) end + nranks = MPI.Comm_size(comm) send_buffer = [ones(T, bufsize * nranks) for _ in 1:num_buffers] recv_buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers] - nranks = MPI.Comm_size(comm) + timer = 0.0 MPI.Barrier(comm) for i in 1:iters From 0538a020bce2071e0ea041b6bd60fcc37935e472 Mon Sep 17 00:00:00 2001 From: Khalid Bin Huda Date: Sun, 11 Dec 2022 14:50:22 +0100 Subject: [PATCH 6/9] add cache_off parameter to imb_gatherv --- src/imb_gatherv.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/imb_gatherv.jl b/src/imb_gatherv.jl index eb6f029..803ead9 100644 --- a/src/imb_gatherv.jl +++ b/src/imb_gatherv.jl @@ -15,7 +15,7 @@ function IMBGatherv(T::Type=UInt8; ) end -function imb_gatherv(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm) +function imb_gatherv(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64) # If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created. cache_size = off_cache # Required in Bytes From f8c0b391d26bac81720640456c1cf5b0e8fb2999 Mon Sep 17 00:00:00 2001 From: Khalid Bin Huda Date: Sun, 11 Dec 2022 14:58:23 +0100 Subject: [PATCH 7/9] added test for cache avoidance --- test/runtests.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/test/runtests.jl b/test/runtests.jl index 69ed55f..d1b808a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -40,6 +40,7 @@ end const verbose = false mktemp() do filename, io benchmark(IMBAllreduce(; verbose, filename)) + benchmark(IMBAllreduce(; verbose, filename, off_cache=28835)) benchmark(IMBAlltoall(; verbose, filename, max_size=1<<16)) benchmark(IMBGatherv(; verbose, filename)) benchmark(IMBReduce(; verbose, filename)) From 0b8bdb531e36316fa71ca14c7563459df19ee26d Mon Sep 17 00:00:00 2001 From: Khalid Bin Huda Siddiqui Date: Sun, 11 Dec 2022 23:41:07 +0100 Subject: [PATCH 8/9] Update src/imb_allreduce.jl MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Mosè Giordano --- src/imb_allreduce.jl | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/imb_allreduce.jl b/src/imb_allreduce.jl index 7e56d0a..33e376c 100644 --- a/src/imb_allreduce.jl +++ b/src/imb_allreduce.jl @@ -20,11 +20,7 @@ function imb_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_ca cache_size = off_cache # Required in Bytes # To avoid integer division error when bufsize is equal to zero - if bufsize == 0 - num_buffers = max(1, 2 * cache_size) - else - num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) - end + num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize))) send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] recv_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] From e1a511e39edcd9fa91eeacd745ea67a64b52871c Mon Sep 17 00:00:00 2001 From: Khalid Bin Huda Date: Sun, 11 Dec 2022 23:47:46 +0100 Subject: [PATCH 9/9] simplify num_buffer --- src/imb_allreduce.jl | 7 +------ src/imb_alltoall.jl | 12 +----------- src/imb_gatherv.jl | 11 +---------- src/imb_reduce.jl | 13 +------------ src/osu_allreduce.jl | 12 +----------- src/osu_alltoall.jl | 11 +---------- src/osu_reduce.jl | 12 +----------- 7 files changed, 7 insertions(+), 71 deletions(-) diff --git a/src/imb_allreduce.jl b/src/imb_allreduce.jl index 33e376c..dd53f94 100644 --- a/src/imb_allreduce.jl +++ b/src/imb_allreduce.jl @@ -16,15 +16,10 @@ function IMBAllreduce(T::Type=Float32; end function imb_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64) - # If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created. - cache_size = off_cache # Required in Bytes - - # To avoid integer division error when bufsize is equal to zero + cache_size = off_cache # Required in Bytes num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize))) - send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] recv_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] - timer = 0.0 MPI.Barrier(comm) for i in 1:iters diff --git a/src/imb_alltoall.jl b/src/imb_alltoall.jl index 97ca6c5..2c5c9e9 100644 --- a/src/imb_alltoall.jl +++ b/src/imb_alltoall.jl @@ -16,21 +16,11 @@ function IMBAlltoall(T::Type=UInt8; end function imb_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64 ) - # If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created. cache_size = off_cache # Required in Bytes - - # To avoid integer division error when bufsize is equal to zero - if bufsize == 0 - num_buffers = max(1, 2 * cache_size) - else - num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) - end - - rank = MPI.Comm_rank(comm) + num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize))) nranks = MPI.Comm_size(comm) buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers] timer = 0.0 - MPI.Barrier(comm) for i in 1:iters tic = MPI.Wtime() diff --git a/src/imb_gatherv.jl b/src/imb_gatherv.jl index 803ead9..a7efe11 100644 --- a/src/imb_gatherv.jl +++ b/src/imb_gatherv.jl @@ -16,21 +16,12 @@ function IMBGatherv(T::Type=UInt8; end function imb_gatherv(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64) - # If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created. cache_size = off_cache # Required in Bytes - - # To avoid integer division error when bufsize is equal to zero - if bufsize == 0 - num_buffers = max(1, 2 * cache_size) - else - num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) - end - + num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize))) rank = MPI.Comm_rank(comm) nranks = MPI.Comm_size(comm) send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] recv_buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers] - counts = [bufsize for _ in 1:nranks] root = 0 timer = 0.0 diff --git a/src/imb_reduce.jl b/src/imb_reduce.jl index d60ff8e..8bfbf27 100644 --- a/src/imb_reduce.jl +++ b/src/imb_reduce.jl @@ -16,19 +16,8 @@ function IMBReduce(T::Type=Float32; end function imb_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64) - # for Noctua 1, L3 cache is 27.5 MiB - # l3: 27.5*1024*1024 = 28835840 Bytes - - # If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created. cache_size = off_cache # Required in Bytes - - # To avoid integer division error when bufsize is equal to zero - if bufsize == 0 - num_buffers = max(1, 2 * cache_size) - else - num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) - end - + num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize))) send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] recv_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] timer = 0.0 diff --git a/src/osu_allreduce.jl b/src/osu_allreduce.jl index 0e7a759..2787d61 100644 --- a/src/osu_allreduce.jl +++ b/src/osu_allreduce.jl @@ -16,20 +16,10 @@ function OSUAllreduce(T::Type=Float32; end function osu_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64) - - # If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created. cache_size = off_cache # Required in Bytes - - # To avoid integer division error when bufsize is equal to zero - if bufsize == 0 - num_buffers = max(1, 2 * cache_size) - else - num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) - end - + num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize))) send_buffer = [ones(T, bufsize) for _ in 1:num_buffers] recv_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] - timer = 0.0 MPI.Barrier(comm) for i in 1:iters diff --git a/src/osu_alltoall.jl b/src/osu_alltoall.jl index 7199457..2ac8205 100644 --- a/src/osu_alltoall.jl +++ b/src/osu_alltoall.jl @@ -16,20 +16,11 @@ function OSUAlltoall(T::Type=UInt8; end function osu_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64) - # If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created. cache_size = off_cache # Required in Bytes - - # To avoid integer division error when bufsize is equal to zero - if bufsize == 0 - num_buffers = max(1, 2 * cache_size) - else - num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) - end - + num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize))) nranks = MPI.Comm_size(comm) send_buffer = [ones(T, bufsize * nranks) for _ in 1:num_buffers] recv_buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers] - timer = 0.0 MPI.Barrier(comm) for i in 1:iters diff --git a/src/osu_reduce.jl b/src/osu_reduce.jl index 755e9c3..dc857dc 100644 --- a/src/osu_reduce.jl +++ b/src/osu_reduce.jl @@ -16,20 +16,10 @@ function OSUReduce(T::Type=Float32; end function osu_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64 ) - # for Noctua 1, L3 cache is 27.5 MiB - # l3: 27.5*1024*1024 = 28835840 - # If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created. cache_size = off_cache # Required in Bytes - - # To avoid integer division error when bufsize is equal to zero - if bufsize == 0 - num_buffers = max(1, 2 * cache_size) - else - num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize)) - end + num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize))) send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers] recv_buffer = [ones(T, bufsize) for _ in 1:num_buffers] - timer = 0.0 MPI.Barrier(comm) for i in 1:iters