-
Notifications
You must be signed in to change notification settings - Fork 5
Feat: restructuring of communication methods (and buffered communication for CUDA-Aware MPI) #167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
tharittk
wants to merge
29
commits into
PyLops:main
Choose a base branch
from
tharittk:cuda-aware
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 18 commits
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
1df4f21
Buffered Send/Recv
tharittk 647ce65
Buffered Allreduce
tharittk 31068f9
minor clean up
tharittk ca558fd
feat: WIP DistributedMix
mrava87 64854bb
feat: added _mpi file with actual mpi comm. implementations
mrava87 838ed0b
feat: moved _send to Distributed
mrava87 ab97e3d
mpi_recv for MixIn
tharittk dbe1f30
MixIn for allgather.
tharittk a08924b
fix flake8
tharittk b8bcd29
feat: added _bcast to DistributedMixIn and added comms as input for a…
mrava87SW f362436
feat: adapted all comm calls in DistributedArray to new method signat…
mrava87SW 693f078
feat: adapted all comm calls in VStack to new method signatures
mrava87SW c852fc4
feat: adapted all comm calls in Fredholm1 to new method signatures
mrava87SW 78d7538
feat: moved methods shared by _mpi and _nccl to _common
mrava87SW 0138e3a
fix env flag precedence bug
tharittk ec88371
fix flake8
tharittk 02ba45b
doc: added details about cuda-aware mpi in doc
mrava87SW a317a88
Merge branch 'main' into cuda-aware
tharittk 473cd97
doc: finalized gpu doc
mrava87SW 2cdb8f7
doc: added some docstrings to Distributed
mrava87SW 1511744
Merge branch 'cuda-aware' of https://github.com/tharittk/pylops-mpi i…
mrava87SW 563db16
minor: fix flake8
mrava87SW 50c5bd2
bug: fix import of methods in test_ncclutils_nccl
mrava87SW a80f00e
bug: added engine to x array i test_matrixmult
mrava87SW 2c67755
feat: finalized passing parameters to all methods in Distributed
mrava87SW e33e8f1
doc: added documentation and type hints to _mpi
mrava87SW e0fd716
minor: added TEST_CUPY_PYLOPS=0 in tests target
mrava87SW 83f7a8b
minor: fix flake8
mrava87SW 02efdbb
minor: fix more flake8
mrava87SW File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| from mpi4py import MPI | ||
| from pylops.utils import deps as pylops_deps # avoid namespace crashes with pylops_mpi.utils | ||
| from pylops_mpi.utils._mpi import mpi_allreduce, mpi_allgather, mpi_bcast, mpi_send, mpi_recv, _prepare_allgather_inputs, _unroll_allgather_recv | ||
| from pylops_mpi.utils import deps | ||
|
|
||
| cupy_message = pylops_deps.cupy_import("the DistributedArray module") | ||
| nccl_message = deps.nccl_import("the DistributedArray module") | ||
|
|
||
| if nccl_message is None and cupy_message is None: | ||
| from pylops_mpi.utils._nccl import ( | ||
| nccl_allgather, nccl_allreduce, nccl_bcast, nccl_send, nccl_recv | ||
| ) | ||
|
|
||
|
|
||
| class DistributedMixIn: | ||
| r"""Distributed Mixin class | ||
|
|
||
| This class implements all methods associated with communication primitives | ||
| from MPI and NCCL. It is mostly charged to identifying which commuicator | ||
| to use and whether the buffered or object MPI primitives should be used | ||
| (the former in the case of NumPy arrays or CuPy arrays when a CUDA-Aware | ||
| MPI installation is available, the latter with CuPy arrays when a CUDA-Aware | ||
| MPI installation is not available). | ||
| """ | ||
| def _allreduce(self, base_comm, base_comm_nccl, | ||
| send_buf, recv_buf=None, op: MPI.Op = MPI.SUM, | ||
| engine="numpy"): | ||
| """Allreduce operation | ||
| """ | ||
| if deps.nccl_enabled and base_comm_nccl is not None: | ||
| return nccl_allreduce(base_comm_nccl, send_buf, recv_buf, op) | ||
| else: | ||
| return mpi_allreduce(base_comm, send_buf, | ||
| recv_buf, engine, op) | ||
|
|
||
| def _allreduce_subcomm(self, sub_comm, base_comm_nccl, | ||
| send_buf, recv_buf=None, op: MPI.Op = MPI.SUM, | ||
| engine="numpy"): | ||
| """Allreduce operation with subcommunicator | ||
| """ | ||
| if deps.nccl_enabled and base_comm_nccl is not None: | ||
| return nccl_allreduce(sub_comm, send_buf, recv_buf, op) | ||
| else: | ||
| return mpi_allreduce(sub_comm, send_buf, | ||
| recv_buf, engine, op) | ||
|
|
||
| def _allgather(self, base_comm, base_comm_nccl, | ||
| send_buf, recv_buf=None, | ||
| engine="numpy"): | ||
| """Allgather operation | ||
| """ | ||
| if deps.nccl_enabled and base_comm_nccl is not None: | ||
| if isinstance(send_buf, (tuple, list, int)): | ||
| return nccl_allgather(base_comm_nccl, send_buf, recv_buf) | ||
| else: | ||
| send_shapes = base_comm.allgather(send_buf.shape) | ||
| (padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine="cupy") | ||
| raw_recv = nccl_allgather(base_comm_nccl, padded_send, recv_buf if recv_buf else padded_recv) | ||
| return _unroll_allgather_recv(raw_recv, padded_send.shape, send_shapes) | ||
| else: | ||
| if isinstance(send_buf, (tuple, list, int)): | ||
| return base_comm.allgather(send_buf) | ||
| return mpi_allgather(base_comm, send_buf, recv_buf, engine) | ||
|
|
||
| def _allgather_subcomm(self, send_buf, recv_buf=None): | ||
| """Allgather operation with subcommunicator | ||
| """ | ||
| if deps.nccl_enabled and getattr(self, "base_comm_nccl"): | ||
| if isinstance(send_buf, (tuple, list, int)): | ||
| return nccl_allgather(self.sub_comm, send_buf, recv_buf) | ||
| else: | ||
| send_shapes = self._allgather_subcomm(send_buf.shape) | ||
| (padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine="cupy") | ||
| raw_recv = nccl_allgather(self.sub_comm, padded_send, recv_buf if recv_buf else padded_recv) | ||
| return _unroll_allgather_recv(raw_recv, padded_send.shape, send_shapes) | ||
| else: | ||
| return mpi_allgather(self.sub_comm, send_buf, recv_buf, self.engine) | ||
|
|
||
| def _bcast(self, local_array, index, value): | ||
mrava87 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """BCast operation | ||
| """ | ||
| if deps.nccl_enabled and getattr(self, "base_comm_nccl"): | ||
| nccl_bcast(self.base_comm_nccl, local_array, index, value) | ||
| else: | ||
| # self.local_array[index] = self.base_comm.bcast(value) | ||
| mpi_bcast(self.base_comm, self.rank, self.local_array, index, value, | ||
| engine=self.engine) | ||
|
|
||
| def _send(self, send_buf, dest, count=None, tag=0): | ||
mrava87 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """Send operation | ||
| """ | ||
| if deps.nccl_enabled and self.base_comm_nccl: | ||
| if count is None: | ||
| count = send_buf.size | ||
| nccl_send(self.base_comm_nccl, send_buf, dest, count) | ||
| else: | ||
| mpi_send(self.base_comm, | ||
| send_buf, dest, count, tag=tag, | ||
| engine=self.engine) | ||
|
|
||
| def _recv(self, recv_buf=None, source=0, count=None, tag=0): | ||
mrava87 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """Receive operation | ||
| """ | ||
| if deps.nccl_enabled and self.base_comm_nccl: | ||
| if recv_buf is None: | ||
| raise ValueError("recv_buf must be supplied when using NCCL") | ||
| if count is None: | ||
| count = recv_buf.size | ||
| nccl_recv(self.base_comm_nccl, recv_buf, source, count) | ||
| return recv_buf | ||
| else: | ||
| return mpi_recv(self.base_comm, | ||
| recv_buf, source, count, tag=tag, | ||
| engine=self.engine) | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.