Skip to content
Merged
14 changes: 14 additions & 0 deletions mpp/shmemx.h4
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ static inline void shmemx_ibget(shmem_ctx_t ctx, $2 *target, const $2 *source,
}')dnl
SHMEM_CXX_DEFINE_FOR_RMA(`SHMEM_CXX_IBGET')

define(`SHMEM_CXX_SUM_EXSCAN',
`static inline int shmemx_sum_exscan(shmem_team_t team, $2* dest, const $2* source,
size_t nelems) {
return shmemx_$1_sum_exscan(team, dest, source, nelems);
}')dnl
SHMEM_CXX_DEFINE_FOR_COLL_SUM_PROD(`SHMEM_CXX_SUM_EXSCAN')

define(`SHMEM_CXX_SUM_INSCAN',
`static inline int shmemx_sum_inscan(shmem_team_t team, $2* dest, const $2* source,
size_t nelems) {
return shmemx_$1_sum_inscan(team, dest, source, nelems);
}')dnl
SHMEM_CXX_DEFINE_FOR_COLL_SUM_PROD(`SHMEM_CXX_SUM_INSCAN')

/* C11 Generic Macros */
#elif (defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L && !defined(SHMEM_INTERNAL_INCLUDE))

Expand Down
10 changes: 10 additions & 0 deletions mpp/shmemx_c_func.h4
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ SH_PAD(`$1') ptrdiff_t tst, ptrdiff_t sst,
SH_PAD(`$1') size_t bsize, size_t nblocks, int pe)')dnl
SHMEM_DECLARE_FOR_SIZES(`SHMEM_C_CTX_IBGET_N')

define(`SHMEM_C_EXSCAN',
`SHMEM_FUNCTION_ATTRIBUTES int SHPRE()shmemx_$1_$4_exscan(shmem_team_t team, $2 *dest, const $2 *source, size_t nelems);')dnl

SHMEM_BIND_C_COLL_SUM_PROD(`SHMEM_C_EXSCAN', `sum')

define(`SHMEM_C_INSCAN',
`SHMEM_FUNCTION_ATTRIBUTES int SHPRE()shmemx_$1_$4_inscan(shmem_team_t team, $2 *dest, const $2 *source, size_t nelems);')dnl

SHMEM_BIND_C_COLL_SUM_PROD(`SHMEM_C_INSCAN', `sum')

/* Performance Counter Query Routines */
SHMEM_FUNCTION_ATTRIBUTES void SHPRE()shmemx_pcntr_get_issued_write(shmem_ctx_t ctx, uint64_t *cntr_value);
SHMEM_FUNCTION_ATTRIBUTES void SHPRE()shmemx_pcntr_get_issued_read(shmem_ctx_t ctx, uint64_t *cntr_value);
Expand Down
210 changes: 210 additions & 0 deletions src/collectives.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

#define SHMEM_INTERNAL_INCLUDE
#include "shmem.h"
#include "shmemx.h"
#include "shmem_internal.h"
#include "shmem_collectives.h"
#include "shmem_internal_op.h"

coll_type_t shmem_internal_barrier_type = AUTO;
coll_type_t shmem_internal_bcast_type = AUTO;
coll_type_t shmem_internal_reduce_type = AUTO;
coll_type_t shmem_internal_scan_type = AUTO;
coll_type_t shmem_internal_collect_type = AUTO;
coll_type_t shmem_internal_fcollect_type = AUTO;
long *shmem_internal_barrier_all_psync;
Expand Down Expand Up @@ -206,6 +208,18 @@ shmem_internal_collectives_init(void)
} else {
RAISE_WARN_MSG("Ignoring bad reduction algorithm '%s'\n", type);
}
}
if (shmem_internal_params.SCAN_ALGORITHM_provided) {
type = shmem_internal_params.SCAN_ALGORITHM;
if (0 == strcmp(type, "auto")) {
shmem_internal_scan_type = AUTO;
} else if (0 == strcmp(type, "linear")) {
shmem_internal_scan_type = LINEAR;
} else if (0 == strcmp(type, "ring")) {
shmem_internal_scan_type = RING;
} else {
RAISE_WARN_MSG("Ignoring bad scan algorithm '%s'\n", type);
}
}
if (shmem_internal_params.COLLECT_ALGORITHM_provided) {
type = shmem_internal_params.COLLECT_ALGORITHM;
Expand Down Expand Up @@ -971,6 +985,202 @@ shmem_internal_op_to_all_recdbl_sw(void *target, const void *source, size_t coun
}


/*****************************************
*
* SCAN
*
*****************************************/
void
shmem_internal_scan_linear(void *target, const void *source, size_t count, size_t type_size,
int PE_start, int PE_stride, int PE_size,
void *pWrk, long *pSync,
shm_internal_op_t op, shm_internal_datatype_t datatype, int scantype)
{

/* scantype is 0 for inscan and 1 for exscan */

long zero = 0, one = 1;
long completion = 0;


if (count == 0) return;

int pe, i;

if (PE_start == shmem_internal_my_pe) {


/* initialize target buffer. The put
will flush any atomic cache value that may currently
exist. */
if(scantype)
{
/* Exclude own value for EXSCAN */
//Create an array of size (count * type_size) of zeroes
uint8_t *zeroes = (uint8_t *) calloc(count, type_size);
shmem_internal_put_nb(SHMEM_CTX_DEFAULT, target, zeroes, count * type_size,
shmem_internal_my_pe, &completion);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_quiet(SHMEM_CTX_DEFAULT);
free(zeroes);
}


/* Send contribution to all */
for (pe = PE_start + PE_stride*scantype, i = scantype ;
i < PE_size ;
i++, pe += PE_stride) {

shmem_internal_put_nb(SHMEM_CTX_DEFAULT, target, source, count * type_size,
pe, &completion);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_fence(SHMEM_CTX_DEFAULT);

}

for (pe = PE_start + PE_stride, i = 1 ;
i < PE_size ;
i++, pe += PE_stride) {
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &one, sizeof(one), pe);
}

/* Wait for others to acknowledge initialization */
SHMEM_WAIT_UNTIL(pSync, SHMEM_CMP_EQ, PE_size - 1);

/* reset pSync */
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &zero, sizeof(zero), shmem_internal_my_pe);
SHMEM_WAIT_UNTIL(pSync, SHMEM_CMP_EQ, 0);


/* Let everyone know sending can start */
for (pe = PE_start + PE_stride, i = 1 ;
i < PE_size ;
i++, pe += PE_stride) {
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &one, sizeof(one), pe);
}


} else {

/* wait for clear to intialization */
SHMEM_WAIT(pSync, 0);

/* reset pSync */
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &zero, sizeof(zero), shmem_internal_my_pe);
SHMEM_WAIT_UNTIL(pSync, SHMEM_CMP_EQ, 0);

/* Send contribution to all pes larger than itself */
for (pe = shmem_internal_my_pe + PE_stride*scantype, i = shmem_internal_my_pe + scantype ;
i < PE_size;
i++, pe += PE_stride) {

shmem_internal_atomicv(SHMEM_CTX_DEFAULT, target, source, count * type_size,
pe, op, datatype, &completion);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_fence(SHMEM_CTX_DEFAULT);

}

shmem_internal_atomic(SHMEM_CTX_DEFAULT, pSync, &one, sizeof(one),
PE_start, SHM_INTERNAL_SUM, SHM_INTERNAL_LONG);

SHMEM_WAIT(pSync, 0);

/* reset pSync */
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &zero, sizeof(zero), shmem_internal_my_pe);
SHMEM_WAIT_UNTIL(pSync, SHMEM_CMP_EQ, 0);

}

}


void
shmem_internal_scan_ring(void *target, const void *source, size_t count, size_t type_size,
int PE_start, int PE_stride, int PE_size,
void *pWrk, long *pSync,
shm_internal_op_t op, shm_internal_datatype_t datatype, int scantype)
{

/* scantype is 0 for inscan and 1 for exscan */

long zero = 0, one = 1;
long completion = 0;


if (count == 0) return;

int pe, i;

if (PE_start == shmem_internal_my_pe) {


/* initialize target buffer. The put
will flush any atomic cache value that may currently
exist. */
if(scantype)
{
/* Exclude own value for EXSCAN */
//Create an array of size (count * type_size) of zeroes
uint8_t *zeroes = (uint8_t *) calloc(count, type_size);
shmem_internal_put_nb(SHMEM_CTX_DEFAULT, target, zeroes, count * type_size,
shmem_internal_my_pe, &completion);
free(zeroes);
}

shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_quiet(SHMEM_CTX_DEFAULT);

/* Send contribution to all */
for (pe = PE_start + PE_stride*scantype, i = scantype ;
i < PE_size ;
i++, pe += PE_stride) {

shmem_internal_put_nb(SHMEM_CTX_DEFAULT, target, source, count * type_size,
pe, &completion);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_fence(SHMEM_CTX_DEFAULT);
}

/* Let next pe know that it's safe to send to us */
if(shmem_internal_my_pe + PE_stride < PE_size)
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &one, sizeof(one), shmem_internal_my_pe + PE_stride);

/* Wait for others to acknowledge sending data */
SHMEM_WAIT_UNTIL(pSync, SHMEM_CMP_EQ, PE_size - 1);

/* reset pSync */
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &zero, sizeof(zero), shmem_internal_my_pe);
SHMEM_WAIT_UNTIL(pSync, SHMEM_CMP_EQ, 0);

} else {
/* wait for clear to send */
SHMEM_WAIT(pSync, 0);

/* reset pSync */
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &zero, sizeof(zero), shmem_internal_my_pe);
SHMEM_WAIT_UNTIL(pSync, SHMEM_CMP_EQ, 0);

/* Send contribution to all pes larger than itself */
for (pe = shmem_internal_my_pe + PE_stride*scantype, i = shmem_internal_my_pe + scantype ;
i < PE_size;
i++, pe += PE_stride) {

shmem_internal_atomicv(SHMEM_CTX_DEFAULT, target, source, count * type_size,
pe, op, datatype, &completion);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_fence(SHMEM_CTX_DEFAULT);
}

/* Let next pe know that it's safe to send to us */
if(shmem_internal_my_pe + PE_stride < PE_size)
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &one, sizeof(one), shmem_internal_my_pe + PE_stride);

shmem_internal_atomic(SHMEM_CTX_DEFAULT, pSync, &one, sizeof(one),
PE_start, SHM_INTERNAL_SUM, SHM_INTERNAL_LONG);
}

}
/*****************************************
*
* COLLECT (variable size)
Expand Down
61 changes: 61 additions & 0 deletions src/collectives_c.c4
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ include(shmem_bind_c.m4)dnl

#define SHMEM_INTERNAL_INCLUDE
#include "shmem.h"
#include "shmemx.h"
#include "shmem_internal.h"
#include "shmem_comm.h"
#include "shmem_collectives.h"
Expand Down Expand Up @@ -81,6 +82,18 @@ SHMEM_BIND_C_COLL_SUM_PROD(`SHMEM_PROF_DEF_REDUCE', `prod', `SHM_INTERNAL_PROD')
SHMEM_BIND_C_COLL_MIN_MAX(`SHMEM_PROF_DEF_REDUCE', `min', `SHM_INTERNAL_MIN')
SHMEM_BIND_C_COLL_MIN_MAX(`SHMEM_PROF_DEF_REDUCE', `max', `SHM_INTERNAL_MAX')

define(`SHMEM_PROF_DEF_EXSCAN',
`#pragma weak shmem_$1_$4_exscan = pshmem_$1_$4_exscan
#define shmem_$1_$4_exscan pshmem_$1_$4_exscan')dnl
dnl
SHMEM_BIND_C_COLL_SUM_PROD(`SHMEM_PROF_DEF_EXSCAN', `sum', `SHM_INTERNAL_SUM')

define(`SHMEM_PROF_DEF_INSCAN',
`#pragma weak shmem_$1_$4_inscan = pshmem_$1_$4_inscan
#define shmem_$1_$4_inscan pshmem_$1_$4_inscan')dnl
dnl
SHMEM_BIND_C_COLL_SUM_PROD(`SHMEM_PROF_DEF_INSCAN', `sum', `SHM_INTERNAL_SUM')

define(`SHMEM_PROF_DEF_BCAST',
`#pragma weak shmem_$1_broadcast = pshmem_$1_broadcast
#define shmem_$1_broadcast pshmem_$1_broadcast')dnl
Expand Down Expand Up @@ -279,6 +292,54 @@ SHMEM_BIND_C_COLL_SUM_PROD(`SHMEM_DEF_REDUCE', `prod', `SHM_INTERNAL_PROD')
SHMEM_BIND_C_COLL_MIN_MAX(`SHMEM_DEF_REDUCE', `min', `SHM_INTERNAL_MIN')
SHMEM_BIND_C_COLL_MIN_MAX(`SHMEM_DEF_REDUCE', `max', `SHM_INTERNAL_MAX')

#define SHMEM_DEF_EXSCAN(STYPE,TYPE,ITYPE,SOP,IOP) \
int SHMEM_FUNCTION_ATTRIBUTES \
shmemx_##STYPE##_##SOP##_exscan(shmem_team_t team, TYPE *dest, \
const TYPE *source, \
size_t nelems) \
{ \
SHMEM_ERR_CHECK_INITIALIZED(); \
SHMEM_ERR_CHECK_TEAM_VALID(team); \
SHMEM_ERR_CHECK_SYMMETRIC(dest, sizeof(TYPE)*nelems); \
SHMEM_ERR_CHECK_SYMMETRIC(source, sizeof(TYPE)*nelems); \
SHMEM_ERR_CHECK_OVERLAP(dest, source, sizeof(TYPE)*nelems, \
sizeof(TYPE)*nelems, 1, 1); \
TYPE *pWrk = NULL; \
\
shmem_internal_team_t *myteam = (shmem_internal_team_t *)team; \
long *psync = shmem_internal_team_choose_psync(myteam, SCAN); \
shmem_internal_exscan(dest, source, nelems, sizeof(TYPE), \
myteam->start, myteam->stride, myteam->size, pWrk, \
psync, IOP, ITYPE); \
shmem_internal_team_release_psyncs(myteam, SCAN); \
return 0; \
}
SHMEM_BIND_C_COLL_SUM_PROD(`SHMEM_DEF_EXSCAN', `sum', `SHM_INTERNAL_SUM')

#define SHMEM_DEF_INSCAN(STYPE,TYPE,ITYPE,SOP,IOP) \
int SHMEM_FUNCTION_ATTRIBUTES \
shmemx_##STYPE##_##SOP##_inscan(shmem_team_t team, TYPE *dest, \
const TYPE *source, \
size_t nelems) \
{ \
SHMEM_ERR_CHECK_INITIALIZED(); \
SHMEM_ERR_CHECK_TEAM_VALID(team); \
SHMEM_ERR_CHECK_SYMMETRIC(dest, sizeof(TYPE)*nelems); \
SHMEM_ERR_CHECK_SYMMETRIC(source, sizeof(TYPE)*nelems); \
SHMEM_ERR_CHECK_OVERLAP(dest, source, sizeof(TYPE)*nelems, \
sizeof(TYPE)*nelems, 1, 1); \
TYPE *pWrk = NULL; \
\
shmem_internal_team_t *myteam = (shmem_internal_team_t *)team; \
long *psync = shmem_internal_team_choose_psync(myteam, SCAN); \
shmem_internal_inscan(dest, source, nelems, sizeof(TYPE), \
myteam->start, myteam->stride, myteam->size, pWrk, \
psync, IOP, ITYPE); \
shmem_internal_team_release_psyncs(myteam, SCAN); \
return 0; \
}
SHMEM_BIND_C_COLL_SUM_PROD(`SHMEM_DEF_INSCAN', `sum', `SHM_INTERNAL_SUM')

void SHMEM_FUNCTION_ATTRIBUTES
shmem_broadcast32(void *target, const void *source, size_t nlong,
int PE_root, int PE_start, int logPE_stride, int PE_size,
Expand Down
Loading
Loading