Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,15 @@ options.
Disable multirail functionality. Enabling this will restrict all
communications to occur over a single NIC per system.

SHMEM_OFI_DISABLE_SINGLE_EP (default: off)
Disable the single endpoint resource optimization. Setting this (to any
value) will enable at least 2 separate endpoints per PE, one for
transmission on the default context and one as the target of
communication. If unset, the default context and the target endpoint
are merged to conserve context resources. Regardless of this
parameter, each PE consumes another endpoint for each OpenSHMEM user
context that is created.

Team Environment variables:

SHMEM_TEAMS_MAX (default: 10)
Expand Down
2 changes: 2 additions & 0 deletions src/shmem_env_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ SHMEM_INTERNAL_ENV_DEF(OFI_STX_DISABLE_PRIVATE, bool, false, SHMEM_INTERNAL_ENV_
"Disallow private contexts from having exclusive STX access")
SHMEM_INTERNAL_ENV_DEF(OFI_DISABLE_MULTIRAIL, bool, false, SHMEM_INTERNAL_ENV_CAT_TRANSPORT,
"Disable usage of multirail functionality")
SHMEM_INTERNAL_ENV_DEF(OFI_DISABLE_SINGLE_EP, bool, false, SHMEM_INTERNAL_ENV_CAT_TRANSPORT,
"Disable single endpoint resource optimization (enable separate Tx and Rx EPs)")
#endif

#ifdef USE_UCX
Expand Down
84 changes: 64 additions & 20 deletions src/transport_ofi.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ shmem_internal_mutex_t shmem_transport_ofi_lock;
pthread_mutex_t shmem_transport_ofi_progress_lock = PTHREAD_MUTEX_INITIALIZER;
#endif /* ENABLE_THREADS */

int shmem_transport_ofi_single_ep;

/* Temporarily redefine SHM_INTERNAL integer types to their FI counterparts to
* translate the DTYPE_* types (defined by autoconf according to system ABI)
* into FI types in the table below */
Expand Down Expand Up @@ -205,6 +207,8 @@ struct shmem_internal_tid shmem_transport_ofi_gettid(void)
return tid;
}

#define SHMEM_TRANSPORT_OFI_PROV_SOCKETS "sockets"

static struct fabric_info shmem_transport_ofi_info = {0};

static size_t shmem_transport_ofi_grow_size = 128;
Expand Down Expand Up @@ -622,15 +626,25 @@ int bind_enable_ep_resources(shmem_transport_ctx_t *ctx)
* removed below. However, there aren't currently any cases where removing
* FI_RECV significantly improves performance or resource usage. */

ret = fi_ep_bind(ctx->ep, &ctx->cq->fid,
FI_SELECTIVE_COMPLETION | FI_TRANSMIT | FI_RECV);
OFI_CHECK_RETURN_STR(ret, "fi_ep_bind CQ to endpoint failed");
if (ctx->ep != shmem_transport_ofi_target_ep) {
ret = fi_ep_bind(ctx->ep, &ctx->cq->fid,
FI_SELECTIVE_COMPLETION | FI_TRANSMIT | FI_RECV);
OFI_CHECK_RETURN_STR(ret, "fi_ep_bind CQ to endpoint failed");

ret = fi_ep_bind(ctx->ep, &shmem_transport_ofi_avfd->fid, 0);
OFI_CHECK_RETURN_STR(ret, "fi_ep_bind AV to endpoint failed");
ret = fi_ep_bind(ctx->ep, &shmem_transport_ofi_avfd->fid, 0);
OFI_CHECK_RETURN_STR(ret, "fi_ep_bind AV to endpoint failed");

ret = fi_enable(ctx->ep);
OFI_CHECK_RETURN_STR(ret, "fi_enable on endpoint failed");
ret = fi_enable(ctx->ep);
OFI_CHECK_RETURN_STR(ret, "fi_enable on endpoint failed");
} /* In single-endpoint mode, the sockets provider requires re-enabling the EP, but other
providers require NOT re-enabling the EP (e.g. as of v2.1.0, tcp, verbs, and opx) */
else if (shmem_transport_ofi_info.p_info->fabric_attr->prov_name != NULL &&
strncmp(shmem_transport_ofi_info.p_info->fabric_attr->prov_name,
SHMEM_TRANSPORT_OFI_PROV_SOCKETS,
strlen(SHMEM_TRANSPORT_OFI_PROV_SOCKETS)) == 0) {
ret = fi_enable(ctx->ep);
OFI_CHECK_RETURN_STR(ret, "fi_enable on endpoint failed");
}

return ret;
}
Expand Down Expand Up @@ -1668,6 +1682,9 @@ static int shmem_transport_ofi_target_ep_init(void)
struct fabric_info* info = &shmem_transport_ofi_info;
info->p_info->ep_attr->tx_ctx_cnt = 0;
info->p_info->caps = FI_RMA | FI_ATOMIC | FI_REMOTE_READ | FI_REMOTE_WRITE;
if (shmem_transport_ofi_single_ep) {
info->p_info->caps |= FI_WRITE | FI_READ | FI_RECV;
}
#if ENABLE_TARGET_CNTR
info->p_info->caps |= FI_RMA_EVENT;
#endif
Expand All @@ -1693,7 +1710,7 @@ static int shmem_transport_ofi_target_ep_init(void)
OFI_CHECK_RETURN_MSG(ret, "cq_open failed (%s)\n", fi_strerror(errno));

ret = fi_ep_bind(shmem_transport_ofi_target_ep,
&shmem_transport_ofi_target_cq->fid, FI_TRANSMIT | FI_RECV);
&shmem_transport_ofi_target_cq->fid, FI_SELECTIVE_COMPLETION | FI_TRANSMIT | FI_RECV);
OFI_CHECK_RETURN_STR(ret, "fi_ep_bind CQ to target endpoint failed");

ret = fi_enable(shmem_transport_ofi_target_ep);
Expand Down Expand Up @@ -1756,15 +1773,20 @@ static int shmem_transport_ofi_ctx_init(shmem_transport_ctx_t *ctx, int id)
&ctx->get_cntr, NULL);
OFI_CHECK_RETURN_MSG(ret, "get_cntr creation failed (%s)\n", fi_strerror(errno));

ret = fi_cq_open(shmem_transport_ofi_domainfd, &cq_attr, &ctx->cq, NULL);
if (ret && errno == FI_EMFILE) {
DEBUG_STR("Context creation failed because of open files limit, consider increasing with 'ulimit' command");
}
OFI_CHECK_RETURN_MSG(ret, "cq_open failed (%s)\n", fi_strerror(errno));
if (shmem_transport_ofi_single_ep && id == SHMEM_TRANSPORT_CTX_DEFAULT_ID) {
ctx->cq = shmem_transport_ofi_target_cq;
ctx->ep = shmem_transport_ofi_target_ep;
} else {
ret = fi_cq_open(shmem_transport_ofi_domainfd, &cq_attr, &ctx->cq, NULL);
if (ret && errno == FI_EMFILE) {
DEBUG_STR("Context creation failed because of open files limit, consider increasing with 'ulimit' command");
}
OFI_CHECK_RETURN_MSG(ret, "cq_open failed (%s)\n", fi_strerror(errno));

ret = fi_endpoint(shmem_transport_ofi_domainfd,
info->p_info, &ctx->ep, NULL);
OFI_CHECK_RETURN_MSG(ret, "ep creation failed (%s)\n", fi_strerror(errno));
ret = fi_endpoint(shmem_transport_ofi_domainfd,
info->p_info, &ctx->ep, NULL);
OFI_CHECK_RETURN_MSG(ret, "ep creation failed (%s)\n", fi_strerror(errno));
}

/* TODO: Fill in TX attr */

Expand Down Expand Up @@ -1819,6 +1841,13 @@ int shmem_transport_init(void)
else
shmem_transport_ofi_info.domain_name = NULL;

/* Unless SHMEM_OFI_DISABLE_SINGLE_EP env var is set, each PE opens a single libfabric endpoint
* for both transmission (on the default context) and as the target of communication */
if (shmem_internal_params.OFI_DISABLE_SINGLE_EP_provided)
shmem_transport_ofi_single_ep = 0;
else
shmem_transport_ofi_single_ep = 1;

/* Check STX resource settings */
if ((shmem_internal_thread_level == SHMEM_THREAD_SINGLE ||
shmem_internal_thread_level == SHMEM_THREAD_FUNNELED ) &&
Expand Down Expand Up @@ -2045,6 +2074,7 @@ int shmem_transport_ctx_create(struct shmem_internal_team_t *team, long options,
void shmem_transport_ctx_destroy(shmem_transport_ctx_t *ctx)
{
int ret;
bool close_default_ctx = false;

if (ctx == NULL)
return;
Expand All @@ -2070,7 +2100,12 @@ void shmem_transport_ctx_destroy(shmem_transport_ctx_t *ctx)
SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
}

if (ctx->ep) {
/* When in single-endpoint mode, defer closing the default context because it also
* serves as the target endpoint, which is cleaned up later in transport_fini(). */
if (!shmem_transport_ofi_single_ep || ctx->id != SHMEM_TRANSPORT_CTX_DEFAULT_ID)
close_default_ctx = true;

if (ctx->ep && close_default_ctx) {
ret = fi_close(&ctx->ep->fid);
OFI_CHECK_ERROR_MSG(ret, "Context endpoint close failed (%s)\n", fi_strerror(errno));
}
Expand Down Expand Up @@ -2107,17 +2142,17 @@ void shmem_transport_ctx_destroy(shmem_transport_ctx_t *ctx)
SHMEM_MUTEX_UNLOCK(shmem_transport_ofi_lock);
}

if (ctx->put_cntr) {
if (ctx->put_cntr && close_default_ctx) {
ret = fi_close(&ctx->put_cntr->fid);
OFI_CHECK_ERROR_MSG(ret, "Context put CNTR close failed (%s)\n", fi_strerror(errno));
}

if (ctx->get_cntr) {
if (ctx->get_cntr && close_default_ctx) {
ret = fi_close(&ctx->get_cntr->fid);
OFI_CHECK_ERROR_MSG(ret, "Context get CNTR close failed (%s)\n", fi_strerror(errno));
}

if (ctx->cq) {
if (ctx->cq && close_default_ctx) {
ret = fi_close(&ctx->cq->fid);
OFI_CHECK_ERROR_MSG(ret, "Context CQ close failed (%s)\n", fi_strerror(errno));
}
Expand Down Expand Up @@ -2208,6 +2243,15 @@ int shmem_transport_fini(void)
ret = fi_close(&shmem_transport_ofi_target_ep->fid);
OFI_CHECK_ERROR_MSG(ret, "Target endpoint close failed (%s)\n", fi_strerror(errno));

/* If single-endpoint mode, need to close the default context's put and get counters */
if (shmem_transport_ofi_single_ep) {
ret = fi_close(&shmem_transport_ctx_default.put_cntr->fid);
OFI_CHECK_ERROR_MSG(ret, "Default EP put CNTR close failed (%s)\n", fi_strerror(errno));

ret = fi_close(&shmem_transport_ctx_default.get_cntr->fid);
OFI_CHECK_ERROR_MSG(ret, "Default EP get CNTR close failed (%s)\n", fi_strerror(errno));
}

ret = fi_close(&shmem_transport_ofi_target_cq->fid);
OFI_CHECK_ERROR_MSG(ret, "Target CQ close failed (%s)\n", fi_strerror(errno));

Expand Down
9 changes: 7 additions & 2 deletions src/transport_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ extern long shmem_transport_ofi_max_bounce_buffers;

extern pthread_mutex_t shmem_transport_ofi_progress_lock;

extern int shmem_transport_ofi_single_ep;

#ifndef MIN
#define MIN(a,b) (((a)<(b))?(a):(b))
#endif
Expand Down Expand Up @@ -386,8 +388,11 @@ void shmem_transport_probe(void)
if (0 == pthread_mutex_trylock(&shmem_transport_ofi_progress_lock)) {
# endif
struct fi_cq_entry buf;
int ret = fi_cq_read(shmem_transport_ofi_target_cq, &buf, 1);
if (ret == 1)
/* Do not read a CQ entry in single-endpoint mode, just make progress. */
/* The target EP and default ctx share resources, so a CQ entry is valid */
int ret = fi_cq_read(shmem_transport_ofi_target_cq, (void *)&buf,
!shmem_transport_ofi_single_ep);
if (!shmem_transport_ofi_single_ep && ret == 1)
RAISE_WARN_STR("Unexpected event");
# ifdef USE_THREAD_COMPLETION
pthread_mutex_unlock(&shmem_transport_ofi_progress_lock);
Expand Down
Loading