Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ompi/mca/coll/acoll/coll_acoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ END_C_DECLS
#define MCA_COLL_ACOLL_ROOT_CHANGE_THRESH 10
#define MCA_COLL_ACOLL_SPLIT_FACTOR_LIST_LEN 6
#define MCA_COLL_ACOLL_SPLIT_FACTOR_LIST {2, 4, 8, 16, 32, 64}
#define MCA_COLL_ACOLL_PROGRESS_COUNT 10000

/* Hybrid backoff spin-wait thresholds for intra-node synchronization */
#define MCA_COLL_ACOLL_SPIN_FAST_PATH_ITERS 200 /* Pure spinning iterations */
#define MCA_COLL_ACOLL_SPIN_MEDIUM_PATH_ITERS 300 /* Moderate progress iterations */
#define MCA_COLL_ACOLL_SPIN_MEDIUM_PATH_FREQ 20 /* Progress call frequency in medium path */
#define MCA_COLL_ACOLL_SPIN_SLOW_PATH_MAX_FREQ 3 /* Max progress call frequency in slow path */

typedef enum MCA_COLL_ACOLL_SG_SIZES {
MCA_COLL_ACOLL_SG_SIZE_1 = 8,
Expand Down Expand Up @@ -234,6 +241,8 @@ struct mca_coll_acoll_module_t {
int force_numa;
int use_dyn_rules;
int disable_shmbcast;
int disable_fallback;
int red_algo;
// Todo: Use substructure for every API related ones
int use_mnode;
int use_lin0;
Expand Down
89 changes: 88 additions & 1 deletion ompi/mca/coll/acoll/coll_acoll_allreduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,94 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,

alg = coll_allreduce_decision_fixed(size, total_dsize);

if (1 == num_nodes) {
/* Try with socket/node based split */
if (num_nodes > 1) {
if (total_dsize > 16384) {
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op,
comm, module);
}
int use_socket = acoll_module->use_socket != -1 ? acoll_module->use_socket : 0;
coll_acoll_subcomms_t *soc_subc = NULL;
ompi_communicator_t *soc_comm = use_socket ? subc->socket_comm : subc->local_comm;
ompi_communicator_t *ldr_comm = use_socket ? subc->socket_ldr_comm : subc->leader_comm;
int ldr_root = use_socket ? subc->socket_ldr_root : subc->outer_grp_root;
int soc_root = use_socket ? subc->local_root[MCA_COLL_ACOLL_LYR_SOCKET] : subc->local_root[MCA_COLL_ACOLL_LYR_NODE];

/* Validate communicator hierarchy before proceeding */
if (NULL == soc_comm || NULL == ldr_comm) {
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op,
comm, module);
}

err = check_and_create_subc(soc_comm, acoll_module, &soc_subc);
if (NULL != soc_subc) {
if (!soc_subc->initialized || (soc_root != soc_subc->prev_init_root)) {
err = mca_coll_acoll_comm_split_init(soc_comm, acoll_module, soc_subc, soc_root);
if (MPI_SUCCESS != err)
return err;
}
char *inplacebuf_free = NULL, *inplacebuf = NULL;
void *tmp_rbuf = rbuf;
void *tmp_sbuf = (void *)sbuf;
/* Socket level reduce */
if (ompi_comm_size(soc_comm) > 1) {
ptrdiff_t span, gap = 0;
span = opal_datatype_span(&dtype->super, count, &gap);
if (ompi_comm_rank(soc_comm) == soc_root) {
inplacebuf_free = (char*) malloc(span);
if (NULL == inplacebuf_free) { err = -1; return err; }
inplacebuf = inplacebuf_free - gap;
tmp_rbuf = (void *)inplacebuf;
tmp_sbuf = tmp_rbuf;
}

if((total_dsize > 8192) &&
((subc->smsc_use_sr_buf != 0) || (subc->smsc_buf_size > 2 * total_dsize)) &&
(subc->without_smsc != 1) && is_opt) {
err = mca_coll_acoll_reduce_smsc_h(sbuf, tmp_rbuf, count, dtype, op,
soc_comm, module, soc_subc);
} else {
acoll_module->red_algo = total_dsize <= 8192 ? 0 : 1;
err = mca_coll_acoll_reduce_intra(sbuf, tmp_rbuf, count, dtype, op,
soc_root, soc_comm, module);
acoll_module->red_algo = -1;
}

if (MPI_SUCCESS != err) {
if (NULL != inplacebuf_free) { free(inplacebuf_free); }
return err;
}
}
/* Allreduce across socket leaders */
if (ompi_comm_size(ldr_comm) > 1 && ldr_root != -1) {
if ((MPI_IN_PLACE == sbuf)) {
err = ompi_coll_base_allreduce_intra_recursivedoubling(MPI_IN_PLACE, rbuf, count, dtype, op,
ldr_comm, module);
} else {
err = ompi_coll_base_allreduce_intra_recursivedoubling(tmp_sbuf, rbuf, count, dtype, op,
ldr_comm, module);
}
if (MPI_SUCCESS != err) {
if (NULL != inplacebuf_free) { free(inplacebuf_free); }
return err;
}
}
if (ompi_comm_size(soc_comm) > 1) {
acoll_module->disable_fallback = 1;
err = mca_coll_acoll_bcast(rbuf, count, dtype, soc_root,
soc_comm, module);
acoll_module->disable_fallback = 0;
if (MPI_SUCCESS != err) {
if (NULL != inplacebuf_free) { free(inplacebuf_free); }
return err;
}
}
if (NULL != inplacebuf_free) { free(inplacebuf_free); }
return err;
}
}

if (num_nodes == 1) {
if (total_dsize < 32) {
return ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype, op,
comm, module);
Expand Down
65 changes: 8 additions & 57 deletions ompi/mca/coll/acoll/coll_acoll_barrier.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
#include "coll_acoll_utils.h"



#define PROGRESS_COUNT 10000

int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc);
int mca_coll_acoll_barrier_shm_f(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc);

Expand Down Expand Up @@ -146,35 +143,22 @@ int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base
my_leader_shm = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_barrier
+ CACHE_LINE_SIZE * l1_gp[0]);
int ready;
int count = 0;
if (rank == root) {
ready = *leader_shm;
for (int i = 0; i < l2_gp_size; i++) {
if (l2_gp[i] == root)
continue;
volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
+ CACHE_LINE_SIZE * l2_gp[i]);
while (*val != ready + 1) {
count++;
if (count == PROGRESS_COUNT) {
count = 0;
opal_progress();
}
}
spin_wait_with_progress(val, ready + 1);
}
ready++;
for (int i = 0; i < l1_gp_size; i++) {
if (l1_gp[i] == root)
continue;
volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
+ CACHE_LINE_SIZE * l1_gp[i]);
while (*val != ready) {
count++;
if (count == PROGRESS_COUNT) {
count = 0;
opal_progress();
}
}
spin_wait_with_progress(val, ready);
}
*leader_shm = ready;
} else if (rank == l1_gp[0]) {
Expand All @@ -183,38 +167,18 @@ int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base
if (l1_gp[i] == l1_gp[0])
continue;
volatile int *vali = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_barrier
+ CACHE_LINE_SIZE
* l1_gp[i]); // do we need atomic_load here?
while (*vali != val + 1) {
count++;
if (PROGRESS_COUNT == count) {
count = 0;
opal_progress();
}
}
+ CACHE_LINE_SIZE * l1_gp[i]);
spin_wait_with_progress(vali, val + 1);
}
val++;
*root_rank_offset = val;
while (*leader_shm != val) {
count++;
if (PROGRESS_COUNT == count) {
count = 0;
opal_progress();
}
}
spin_wait_with_progress((volatile int *)leader_shm, val);
*l1_rank_offset = val;
} else {

int done = *l1_rank_offset;
done++;
*l1_rank_offset = done;
while (done != *my_leader_shm) {
count++;
if (10000 == count) {
count = 0;
opal_progress();
}
}
spin_wait_with_progress((volatile int *)my_leader_shm, done);
}
return err;
}
Expand Down Expand Up @@ -246,31 +210,18 @@ int mca_coll_acoll_barrier_shm_f(struct ompi_communicator_t *comm, mca_coll_base
+ CACHE_LINE_SIZE * root);

int ready = *leader_shm;
int count = 0;
if (rank == root) {
for (int i = 0; i < size; i++) {
if (i == root)
continue;
volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
+ CACHE_LINE_SIZE * i);
while (*val != ready + 1) {
count++;
if (count == PROGRESS_COUNT) {
count = 0;
opal_progress();
}
}
spin_wait_with_progress(val, ready + 1);
}
(*leader_shm)++;
} else {
int val = ++(*root_rank_offset);
while (*leader_shm != val) {
count++;
if (PROGRESS_COUNT == count) {
count = 0;
opal_progress();
}
}
spin_wait_with_progress((volatile int *)leader_shm, val);
}
return err;
}
Expand Down
Loading