Skip to content

Cache avoidance experiment #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: mg/reduce-cache-avoidance
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/MPIBenchmarks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ struct Configuration{T}
iters::Function
stdout::IO
filename::Union{String,Nothing}
off_cache::Union{Int64,Nothing}
end

function iterations(::Type{T}, s::Int) where {T}
Expand All @@ -24,6 +25,7 @@ function Configuration(T::Type;
verbose::Bool=true,
filename::Union{String,Nothing}=nothing,
iterations::Function=iterations,
off_cache::Union{Int64,Nothing}=0,
)
ispow2(max_size) || throw(ArgumentError("Maximum size must be a power of 2, found $(max_size)"))
isprimitivetype(T) || throw(ArgumentError("Type $(T) is not a primitive type"))
Expand All @@ -38,7 +40,7 @@ function Configuration(T::Type;
if isnothing(stdout)
stdout = verbose ? Base.stdout : Base.devnull
end
return Configuration(T, lengths, iterations, stdout, filename)
return Configuration(T, lengths, iterations, stdout, filename, off_cache)
end

"""
Expand Down
10 changes: 6 additions & 4 deletions src/imb_allreduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ function IMBAllreduce(T::Type=Float32;
)
end

function imb_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm)
send_buffer = zeros(T, bufsize)
recv_buffer = zeros(T, bufsize)
function imb_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64)
cache_size = off_cache # Required in Bytes
num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize)))
send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers]
recv_buffer = [zeros(T, bufsize) for _ in 1:num_buffers]
timer = 0.0
MPI.Barrier(comm)
for i in 1:iters
tic = MPI.Wtime()
MPI.Allreduce!(send_buffer, recv_buffer, +, comm)
MPI.Allreduce!(@inbounds(send_buffer[mod1(i, num_buffers)]), @inbounds(recv_buffer[mod1(i, num_buffers)]), +, comm)
toc = MPI.Wtime()
timer += toc - tic
end
Expand Down
10 changes: 5 additions & 5 deletions src/imb_alltoall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ function IMBAlltoall(T::Type=UInt8;
)
end

function imb_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm)
rank = MPI.Comm_rank(comm)
function imb_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64 )
cache_size = off_cache # Required in Bytes
num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize)))
nranks = MPI.Comm_size(comm)
buffer = zeros(T, bufsize * nranks)
root = 0
buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers]
timer = 0.0
MPI.Barrier(comm)
for i in 1:iters
tic = MPI.Wtime()
MPI.Alltoall!(UBuffer(buffer, Cint(bufsize), Cint(nranks), MPI.Datatype(T)), comm)
MPI.Alltoall!(UBuffer(@inbounds(buffer[mod1(i, num_buffers)]), Cint(bufsize), Cint(nranks), MPI.Datatype(T)), comm)
toc = MPI.Wtime()
timer += toc - tic
end
Expand Down
4 changes: 2 additions & 2 deletions src/imb_collective.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ function run_imb_collective(benchmark::MPIBenchmark, func::Function, conf::Confi
nranks = MPI.Comm_size(comm)

# Warmup
func(conf.T, 1, 10, comm)
func(conf.T, 1, 10, comm, conf.off_cache)

if iszero(rank)
print_header(io) = println(io, "size (bytes),iteratons,min_time (seconds),max_time (seconds),avg_time (seconds)")
Expand All @@ -28,7 +28,7 @@ function run_imb_collective(benchmark::MPIBenchmark, func::Function, conf::Confi
size = 1 << s
iters = conf.iters(conf.T, s)
# Measure time on current rank
time = func(conf.T, size, iters, comm)
time = func(conf.T, size, iters, comm, conf.off_cache)

if !iszero(rank)
# If we aren't on rank 0, send to it our time
Expand Down
12 changes: 7 additions & 5 deletions src/imb_gatherv.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@ function IMBGatherv(T::Type=UInt8;
)
end

function imb_gatherv(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm)
function imb_gatherv(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64)
cache_size = off_cache # Required in Bytes
num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize)))
rank = MPI.Comm_rank(comm)
nranks = MPI.Comm_size(comm)
send_buffer = zeros(T, bufsize)
recv_buffer = zeros(T, bufsize * nranks)
send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers]
recv_buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers]
counts = [bufsize for _ in 1:nranks]
root = 0
timer = 0.0
MPI.Barrier(comm)
for i in 1:iters
tic = MPI.Wtime()
if rank == root
MPI.Gatherv!(MPI.IN_PLACE, VBuffer(recv_buffer, counts), comm; root)
MPI.Gatherv!(MPI.IN_PLACE, VBuffer(@inbounds(recv_buffer[mod1(i, num_buffers)]), counts), comm; root)
else
MPI.Gatherv!(send_buffer, nothing, comm; root)
MPI.Gatherv!(@inbounds(send_buffer[mod1(i, num_buffers)]), nothing, comm; root)
end
toc = MPI.Wtime()
timer += toc - tic
Expand Down
8 changes: 3 additions & 5 deletions src/imb_reduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ function IMBReduce(T::Type=Float32;
)
end

function imb_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm)
cache_size = 2 ^ 16 # Assume cache size of 64 KiB
# To avoid hitting the cache, create buffers which are arrays of arrays of size
# `bufsize` so that they exceed the cache size
num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize))
function imb_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64)
cache_size = off_cache # Required in Bytes
num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize)))
send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers]
recv_buffer = [zeros(T, bufsize) for _ in 1:num_buffers]
timer = 0.0
Expand Down
10 changes: 6 additions & 4 deletions src/osu_allreduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ function OSUAllreduce(T::Type=Float32;
)
end

function osu_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm)
send_buffer = ones(T, bufsize)
recv_buffer = zeros(T, bufsize)
function osu_allreduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64)
cache_size = off_cache # Required in Bytes
num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize)))
send_buffer = [ones(T, bufsize) for _ in 1:num_buffers]
recv_buffer = [zeros(T, bufsize) for _ in 1:num_buffers]
timer = 0.0
MPI.Barrier(comm)
for i in 1:iters
tic = MPI.Wtime()
MPI.Allreduce!(send_buffer, recv_buffer, +, comm)
MPI.Allreduce!(@inbounds(send_buffer[mod1(i, num_buffers)]), @inbounds(recv_buffer[mod1(i, num_buffers)]), +, comm)
toc = MPI.Wtime()
timer += toc - tic
end
Expand Down
14 changes: 7 additions & 7 deletions src/osu_alltoall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ function OSUAlltoall(T::Type=UInt8;
)
end

function osu_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm)
rank = MPI.Comm_rank(comm)
function osu_alltoall(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64)
cache_size = off_cache # Required in Bytes
num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize)))
nranks = MPI.Comm_size(comm)
send_buffer = ones(T, bufsize * nranks)
recv_buffer = zeros(T, bufsize * nranks)
root = 0
send_buffer = [ones(T, bufsize * nranks) for _ in 1:num_buffers]
recv_buffer = [zeros(T, bufsize * nranks) for _ in 1:num_buffers]
timer = 0.0
MPI.Barrier(comm)
for i in 1:iters
tic = MPI.Wtime()
MPI.Alltoall!(
UBuffer(send_buffer, Cint(bufsize), Cint(nranks), MPI.Datatype(T)),
UBuffer(recv_buffer, Cint(bufsize), Cint(nranks), MPI.Datatype(T)),
UBuffer(@inbounds(send_buffer[mod1(i, num_buffers)]), Cint(bufsize), Cint(nranks), MPI.Datatype(T)),
UBuffer(@inbounds(recv_buffer[mod1(i, num_buffers)]), Cint(bufsize), Cint(nranks), MPI.Datatype(T)),
comm)
toc = MPI.Wtime()
timer += toc - tic
Expand Down
8 changes: 3 additions & 5 deletions src/osu_reduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ function OSUReduce(T::Type=Float32;
)
end

function osu_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm)
cache_size = 2 ^ 16 # Assume cache size of 64 KiB
# To avoid hitting the cache, create buffers which are arrays of arrays of size
# `bufsize` so that they exceed the cache size
num_buffers = max(1, 2 * cache_size ÷ (sizeof(T) * bufsize))
function osu_reduce(T::Type, bufsize::Int, iters::Int, comm::MPI.Comm, off_cache::Int64 )
cache_size = off_cache # Required in Bytes
num_buffers = max(1, 2 * cache_size ÷ max(1, (sizeof(T) * bufsize)))
send_buffer = [zeros(T, bufsize) for _ in 1:num_buffers]
recv_buffer = [ones(T, bufsize) for _ in 1:num_buffers]
timer = 0.0
Expand Down
1 change: 1 addition & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ end
const verbose = false
mktemp() do filename, io
benchmark(IMBAllreduce(; verbose, filename))
benchmark(IMBAllreduce(; verbose, filename, off_cache=28835))
benchmark(IMBAlltoall(; verbose, filename, max_size=1<<16))
benchmark(IMBGatherv(; verbose, filename))
benchmark(IMBReduce(; verbose, filename))
Expand Down