Skip to content

Cache avoidance experiment #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: mg/reduce-cache-avoidance
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/MPIBenchmarks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ struct Configuration{T}
iters::Function
stdout::IO
filename::Union{String,Nothing}
off_cache::Union{Int64,Nothing}
end

function iterations(::Type{T}, s::Int) where {T}
Expand All @@ -24,6 +25,7 @@ function Configuration(T::Type;
verbose::Bool=true,
filename::Union{String,Nothing}=nothing,
iterations::Function=iterations,
off_cache::Union{Int64,Nothing}=0,
)
ispow2(max_size) || throw(ArgumentError("Maximum size must be a power of 2, found $(max_size)"))
isprimitivetype(T) || throw(ArgumentError("Type $(T) is not a primitive type"))
Expand All @@ -38,7 +40,7 @@ function Configuration(T::Type;
if isnothing(stdout)
stdout = verbose ? Base.stdout : Base.devnull
end
return Configuration(T, lengths, iterations, stdout, filename)
return Configuration(T, lengths, iterations, stdout, filename, off_cache)
end

"""
Expand Down
19 changes: 15 additions & 4 deletions src/imb_allreduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,25 @@ function IMBAllreduce(T::Type=Float32;
)
end

function imb_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm)
send_buffer = zeros(T, bufsize)
recv_buffer = zeros(T, bufsize)
function imb_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64)
# If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created.
cache_size = off_cache # Required in Bytes

# To avoid integer division error when bufsize is equal to zero
if bufsize == 0
num_buffers = max(1, 2 * cache_size)
else
num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize))
end

send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers]
recv_buffer = [zeros(T, bufsize) for _ in 1:num_buffers]

timer = 0.0
MPI.Barrier(comm)
for i in 1:iters
tic = MPI.Wtime()
MPI.Allreduce!(send_buffer, recv_buffer, +, comm)
MPI.Allreduce!(@inbounds(send_buffer[mod1(i, num_buffers)]), @inbounds(recv_buffer[mod1(i, num_buffers)]), +, comm)
toc = MPI.Wtime()
timer += toc - tic
end
Expand Down
18 changes: 14 additions & 4 deletions src/imb_alltoall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,26 @@ function IMBAlltoall(T::Type=UInt8;
)
end

function imb_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm)
function imb_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64 )
# If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created.
cache_size = off_cache # Required in Bytes

# To avoid integer division error when bufsize is equal to zero
if bufsize == 0
num_buffers = max(1, 2 * cache_size)
else
num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize))
end

rank = MPI.Comm_rank(comm)
nranks = MPI.Comm_size(comm)
buffer = zeros(T, bufsize * nranks)
root = 0
buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers]
timer = 0.0

MPI.Barrier(comm)
for i in 1:iters
tic = MPI.Wtime()
MPI.Alltoall!(UBuffer(buffer, Cint(bufsize), Cint(nranks), MPI.Datatype(T)), comm)
MPI.Alltoall!(UBuffer(@inbounds(buffer[mod1(i, num_buffers)]), Cint(bufsize), Cint(nranks), MPI.Datatype(T)), comm)
toc = MPI.Wtime()
timer += toc - tic
end
Expand Down
4 changes: 2 additions & 2 deletions src/imb_collective.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ function run_imb_collective(benchmark::MPIBenchmark, func::Function, conf::Confi
nranks = MPI.Comm_size(comm)

# Warmup
func(conf.T, 1, 10, comm)
func(conf.T, 1, 10, comm, conf.off_cache)

if iszero(rank)
print_header(io) = println(io, "size (bytes),iteratons,min_time (seconds),max_time (seconds),avg_time (seconds)")
Expand All @@ -28,7 +28,7 @@ function run_imb_collective(benchmark::MPIBenchmark, func::Function, conf::Confi
size = 1 << s
iters = conf.iters(conf.T, s)
# Measure time on current rank
time = func(conf.T, size, iters, comm)
time = func(conf.T, size, iters, comm, conf.off_cache)

if !iszero(rank)
# If we aren't on rank 0, send to it our time
Expand Down
21 changes: 16 additions & 5 deletions src/imb_gatherv.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,32 @@ function IMBGatherv(T::Type=UInt8;
)
end

function imb_gatherv(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm)
function imb_gatherv(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64)
# If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created.
cache_size = off_cache # Required in Bytes

# To avoid integer division error when bufsize is equal to zero
if bufsize == 0
num_buffers = max(1, 2 * cache_size)
else
num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize))
end

rank = MPI.Comm_rank(comm)
nranks = MPI.Comm_size(comm)
send_buffer = zeros(T, bufsize)
recv_buffer = zeros(T, bufsize * nranks)
send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers]
recv_buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers]

counts = [bufsize for _ in 1:nranks]
root = 0
timer = 0.0
MPI.Barrier(comm)
for i in 1:iters
tic = MPI.Wtime()
if rank == root
MPI.Gatherv!(MPI.IN_PLACE, VBuffer(recv_buffer, counts), comm; root)
MPI.Gatherv!(MPI.IN_PLACE, VBuffer(@inbounds(recv_buffer[mod1(i, num_buffers)]), counts), comm; root)
else
MPI.Gatherv!(send_buffer, nothing, comm; root)
MPI.Gatherv!(@inbounds(send_buffer[mod1(i, num_buffers)]), nothing, comm; root)
end
toc = MPI.Wtime()
timer += toc - tic
Expand Down
19 changes: 14 additions & 5 deletions src/imb_reduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,20 @@ function IMBReduce(T::Type=Float32;
)
end

function imb_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm)
cache_size = 2 ^ 16 # Assume cache size of 64 KiB
# To avoid hitting the cache, create buffers which are arrays of arrays of size
# `bufsize` so that they exceed the cache size
num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize))
function imb_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64)
# for Noctua 1, L3 cache is 27.5 MiB
# l3: 27.5*1024*1024 = 28835840 Bytes

# If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created.
cache_size = off_cache # Required in Bytes

# To avoid integer division error when bufsize is equal to zero
if bufsize == 0
num_buffers = max(1, 2 * cache_size)
else
num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize))
end

send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers]
recv_buffer = [zeros(T, bufsize) for _ in 1:num_buffers]
timer = 0.0
Expand Down
20 changes: 16 additions & 4 deletions src/osu_allreduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,26 @@ function OSUAllreduce(T::Type=Float32;
)
end

function osu_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm)
send_buffer = ones(T, bufsize)
recv_buffer = zeros(T, bufsize)
function osu_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64)

# If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created.
cache_size = off_cache # Required in Bytes

# To avoid integer division error when bufsize is equal to zero
if bufsize == 0
num_buffers = max(1, 2 * cache_size)
else
num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize))
end

send_buffer = [ones(T, bufsize) for _ in 1:num_buffers]
recv_buffer = [zeros(T, bufsize) for _ in 1:num_buffers]

timer = 0.0
MPI.Barrier(comm)
for i in 1:iters
tic = MPI.Wtime()
MPI.Allreduce!(send_buffer, recv_buffer, +, comm)
MPI.Allreduce!(@inbounds(send_buffer[mod1(i, num_buffers)]), @inbounds(recv_buffer[mod1(i, num_buffers)]), +, comm)
toc = MPI.Wtime()
timer += toc - tic
end
Expand Down
23 changes: 16 additions & 7 deletions src/osu_alltoall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,28 @@ function OSUAlltoall(T::Type=UInt8;
)
end

function osu_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm)
rank = MPI.Comm_rank(comm)
function osu_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64)
# If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created.
cache_size = off_cache # Required in Bytes

# To avoid integer division error when bufsize is equal to zero
if bufsize == 0
num_buffers = max(1, 2 * cache_size)
else
num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize))
end

nranks = MPI.Comm_size(comm)
send_buffer = ones(T, bufsize * nranks)
recv_buffer = zeros(T, bufsize * nranks)
root = 0
send_buffer = [ones(T, bufsize * nranks) for _ in 1:num_buffers]
recv_buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers]

timer = 0.0
MPI.Barrier(comm)
for i in 1:iters
tic = MPI.Wtime()
MPI.Alltoall!(
UBuffer(send_buffer, Cint(bufsize), Cint(nranks), MPI.Datatype(T)),
UBuffer(recv_buffer, Cint(bufsize), Cint(nranks), MPI.Datatype(T)),
UBuffer(@inbounds(send_buffer[mod1(i, num_buffers)]), Cint(bufsize), Cint(nranks), MPI.Datatype(T)),
UBuffer(@inbounds(recv_buffer[mod1(i, num_buffers)]), Cint(bufsize), Cint(nranks), MPI.Datatype(T)),
comm)
toc = MPI.Wtime()
timer += toc - tic
Expand Down
18 changes: 13 additions & 5 deletions src/osu_reduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,21 @@ function OSUReduce(T::Type=Float32;
)
end

function osu_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm)
cache_size = 2 ^ 16 # Assume cache size of 64 KiB
# To avoid hitting the cache, create buffers which are arrays of arrays of size
# `bufsize` so that they exceed the cache size
num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize))
function osu_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64 )
# for Noctua 1, L3 cache is 27.5 MiB
# l3: 27.5*1024*1024 = 28835840
# If the "off_cache" is equal to zero then there will be no cache avoidance, and only single array of send_buffer & recv_buffer will be created.
cache_size = off_cache # Required in Bytes

# To avoid integer division error when bufsize is equal to zero
if bufsize == 0
num_buffers = max(1, 2 * cache_size)
else
num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize))
end
send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers]
recv_buffer = [ones(T, bufsize) for _ in 1:num_buffers]

timer = 0.0
MPI.Barrier(comm)
for i in 1:iters
Expand Down
1 change: 1 addition & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ end
const verbose = false
mktemp() do filename, io
benchmark(IMBAllreduce(; verbose, filename))
benchmark(IMBAllreduce(; verbose, filename, off_cache=28835))
benchmark(IMBAlltoall(; verbose, filename, max_size=1<<16))
benchmark(IMBGatherv(; verbose, filename))
benchmark(IMBReduce(; verbose, filename))
Expand Down