diff --git a/ompi/mca/coll/acoll/coll_acoll.h b/ompi/mca/coll/acoll/coll_acoll.h index 2aaa7537c1b..fe1b44081e1 100644 --- a/ompi/mca/coll/acoll/coll_acoll.h +++ b/ompi/mca/coll/acoll/coll_acoll.h @@ -89,6 +89,13 @@ END_C_DECLS #define MCA_COLL_ACOLL_ROOT_CHANGE_THRESH 10 #define MCA_COLL_ACOLL_SPLIT_FACTOR_LIST_LEN 6 #define MCA_COLL_ACOLL_SPLIT_FACTOR_LIST {2, 4, 8, 16, 32, 64} +#define MCA_COLL_ACOLL_PROGRESS_COUNT 10000 + +/* Hybrid backoff spin-wait thresholds for intra-node synchronization */ +#define MCA_COLL_ACOLL_SPIN_FAST_PATH_ITERS 200 /* Pure spinning iterations */ +#define MCA_COLL_ACOLL_SPIN_MEDIUM_PATH_ITERS 300 /* Moderate progress iterations */ +#define MCA_COLL_ACOLL_SPIN_MEDIUM_PATH_FREQ 20 /* Progress call frequency in medium path */ +#define MCA_COLL_ACOLL_SPIN_SLOW_PATH_MAX_FREQ 3 /* Max progress call frequency in slow path */ typedef enum MCA_COLL_ACOLL_SG_SIZES { MCA_COLL_ACOLL_SG_SIZE_1 = 8, @@ -234,6 +241,8 @@ struct mca_coll_acoll_module_t { int force_numa; int use_dyn_rules; int disable_shmbcast; + int disable_fallback; + int red_algo; // Todo: Use substructure for every API related ones int use_mnode; int use_lin0; diff --git a/ompi/mca/coll/acoll/coll_acoll_allreduce.c b/ompi/mca/coll/acoll/coll_acoll_allreduce.c index 133b01af0f9..95523c24e1f 100644 --- a/ompi/mca/coll/acoll/coll_acoll_allreduce.c +++ b/ompi/mca/coll/acoll/coll_acoll_allreduce.c @@ -509,7 +509,94 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count, alg = coll_allreduce_decision_fixed(size, total_dsize); - if (1 == num_nodes) { + /* Try with socket/node based split */ + if (num_nodes > 1) { + if (total_dsize > 16384) { + return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op, + comm, module); + } + int use_socket = acoll_module->use_socket != -1 ? acoll_module->use_socket : 0; + coll_acoll_subcomms_t *soc_subc = NULL; + ompi_communicator_t *soc_comm = use_socket ? subc->socket_comm : subc->local_comm; + ompi_communicator_t *ldr_comm = use_socket ? subc->socket_ldr_comm : subc->leader_comm; + int ldr_root = use_socket ? subc->socket_ldr_root : subc->outer_grp_root; + int soc_root = use_socket ? subc->local_root[MCA_COLL_ACOLL_LYR_SOCKET] : subc->local_root[MCA_COLL_ACOLL_LYR_NODE]; + + /* Validate communicator hierarchy before proceeding */ + if (NULL == soc_comm || NULL == ldr_comm) { + return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op, + comm, module); + } + + err = check_and_create_subc(soc_comm, acoll_module, &soc_subc); + if (NULL != soc_subc) { + if (!soc_subc->initialized || (soc_root != soc_subc->prev_init_root)) { + err = mca_coll_acoll_comm_split_init(soc_comm, acoll_module, soc_subc, soc_root); + if (MPI_SUCCESS != err) + return err; + } + char *inplacebuf_free = NULL, *inplacebuf = NULL; + void *tmp_rbuf = rbuf; + void *tmp_sbuf = (void *)sbuf; + /* Socket level reduce */ + if (ompi_comm_size(soc_comm) > 1) { + ptrdiff_t span, gap = 0; + span = opal_datatype_span(&dtype->super, count, &gap); + if (ompi_comm_rank(soc_comm) == soc_root) { + inplacebuf_free = (char*) malloc(span); + if (NULL == inplacebuf_free) { err = -1; return err; } + inplacebuf = inplacebuf_free - gap; + tmp_rbuf = (void *)inplacebuf; + tmp_sbuf = tmp_rbuf; + } + + if((total_dsize > 8192) && + ((subc->smsc_use_sr_buf != 0) || (subc->smsc_buf_size > 2 * total_dsize)) && + (subc->without_smsc != 1) && is_opt) { + err = mca_coll_acoll_reduce_smsc_h(sbuf, tmp_rbuf, count, dtype, op, + soc_comm, module, soc_subc); + } else { + acoll_module->red_algo = total_dsize <= 8192 ? 0 : 1; + err = mca_coll_acoll_reduce_intra(sbuf, tmp_rbuf, count, dtype, op, + soc_root, soc_comm, module); + acoll_module->red_algo = -1; + } + + if (MPI_SUCCESS != err) { + if (NULL != inplacebuf_free) { free(inplacebuf_free); } + return err; + } + } + /* Allreduce across socket leaders */ + if (ompi_comm_size(ldr_comm) > 1 && ldr_root != -1) { + if ((MPI_IN_PLACE == sbuf)) { + err = ompi_coll_base_allreduce_intra_recursivedoubling(MPI_IN_PLACE, rbuf, count, dtype, op, + ldr_comm, module); + } else { + err = ompi_coll_base_allreduce_intra_recursivedoubling(tmp_sbuf, rbuf, count, dtype, op, + ldr_comm, module); + } + if (MPI_SUCCESS != err) { + if (NULL != inplacebuf_free) { free(inplacebuf_free); } + return err; + } + } + if (ompi_comm_size(soc_comm) > 1) { + acoll_module->disable_fallback = 1; + err = mca_coll_acoll_bcast(rbuf, count, dtype, soc_root, + soc_comm, module); + acoll_module->disable_fallback = 0; + if (MPI_SUCCESS != err) { + if (NULL != inplacebuf_free) { free(inplacebuf_free); } + return err; + } + } + if (NULL != inplacebuf_free) { free(inplacebuf_free); } + return err; + } + } + + if (num_nodes == 1) { if (total_dsize < 32) { return ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype, op, comm, module); diff --git a/ompi/mca/coll/acoll/coll_acoll_barrier.c b/ompi/mca/coll/acoll/coll_acoll_barrier.c index 02f3a6500cb..970562232da 100644 --- a/ompi/mca/coll/acoll/coll_acoll_barrier.c +++ b/ompi/mca/coll/acoll/coll_acoll_barrier.c @@ -23,9 +23,6 @@ #include "coll_acoll_utils.h" - -#define PROGRESS_COUNT 10000 - int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc); int mca_coll_acoll_barrier_shm_f(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc); @@ -146,7 +143,6 @@ int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base my_leader_shm = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_barrier + CACHE_LINE_SIZE * l1_gp[0]); int ready; - int count = 0; if (rank == root) { ready = *leader_shm; for (int i = 0; i < l2_gp_size; i++) { @@ -154,13 +150,7 @@ int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base continue; volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier + CACHE_LINE_SIZE * l2_gp[i]); - while (*val != ready + 1) { - count++; - if (count == PROGRESS_COUNT) { - count = 0; - opal_progress(); - } - } + spin_wait_with_progress(val, ready + 1); } ready++; for (int i = 0; i < l1_gp_size; i++) { @@ -168,13 +158,7 @@ int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base continue; volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier + CACHE_LINE_SIZE * l1_gp[i]); - while (*val != ready) { - count++; - if (count == PROGRESS_COUNT) { - count = 0; - opal_progress(); - } - } + spin_wait_with_progress(val, ready); } *leader_shm = ready; } else if (rank == l1_gp[0]) { @@ -183,38 +167,18 @@ int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base if (l1_gp[i] == l1_gp[0]) continue; volatile int *vali = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_barrier - + CACHE_LINE_SIZE - * l1_gp[i]); // do we need atomic_load here? - while (*vali != val + 1) { - count++; - if (PROGRESS_COUNT == count) { - count = 0; - opal_progress(); - } - } + + CACHE_LINE_SIZE * l1_gp[i]); + spin_wait_with_progress(vali, val + 1); } val++; *root_rank_offset = val; - while (*leader_shm != val) { - count++; - if (PROGRESS_COUNT == count) { - count = 0; - opal_progress(); - } - } + spin_wait_with_progress((volatile int *)leader_shm, val); *l1_rank_offset = val; } else { - int done = *l1_rank_offset; done++; *l1_rank_offset = done; - while (done != *my_leader_shm) { - count++; - if (10000 == count) { - count = 0; - opal_progress(); - } - } + spin_wait_with_progress((volatile int *)my_leader_shm, done); } return err; } @@ -246,31 +210,18 @@ int mca_coll_acoll_barrier_shm_f(struct ompi_communicator_t *comm, mca_coll_base + CACHE_LINE_SIZE * root); int ready = *leader_shm; - int count = 0; if (rank == root) { for (int i = 0; i < size; i++) { if (i == root) continue; volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier + CACHE_LINE_SIZE * i); - while (*val != ready + 1) { - count++; - if (count == PROGRESS_COUNT) { - count = 0; - opal_progress(); - } - } + spin_wait_with_progress(val, ready + 1); } (*leader_shm)++; } else { int val = ++(*root_rank_offset); - while (*leader_shm != val) { - count++; - if (PROGRESS_COUNT == count) { - count = 0; - opal_progress(); - } - } + spin_wait_with_progress((volatile int *)leader_shm, val); } return err; } diff --git a/ompi/mca/coll/acoll/coll_acoll_bcast.c b/ompi/mca/coll/acoll/coll_acoll_bcast.c index 2b06692fe98..b0116fb2361 100644 --- a/ompi/mca/coll/acoll/coll_acoll_bcast.c +++ b/ompi/mca/coll/acoll/coll_acoll_bcast.c @@ -143,8 +143,22 @@ static inline void coll_bcast_decision_fixed(int size, size_t total_dsize, int n if (size <= node_size) { if (total_dsize <= 8192 && size >= 16 && !acoll_module->disable_shmbcast) { *use_shm = 1; + if (-1 != acoll_module->use_socket) { + *use_socket = acoll_module->use_socket; + } return; } + if (size >= 256 && total_dsize <= 8192) { + if (total_dsize <= 64) { + *sg_cnt = size; + SET_BCAST_PARAMS(0, 0, 0) + return; + } else if (!acoll_module->disable_shmbcast) { + *use_shm = 1; + *use_socket = (-1 != acoll_module->use_socket) ? acoll_module->use_socket : 0; + return; + } + } if (acoll_module->use_dyn_rules) { *sg_cnt = (acoll_module->mnode_sg_size == acoll_module->sg_cnt) ? acoll_module->sg_cnt : node_size; *use_0 = 0; @@ -254,9 +268,11 @@ static inline void coll_bcast_decision_fixed(int size, size_t total_dsize, int n if (total_dsize <= 64) { *use_socket = 1; SET_BCAST_PARAMS(1, 1, 0) - } else if (total_dsize <= 512) { + } else if (total_dsize <= 2048) { *use_shm = 1; SET_BCAST_PARAMS(1, 1, 0) + } else if (total_dsize <= 131072) { + SET_BCAST_PARAMS(1, 1, 1) } else if (total_dsize <= 2097152) { *use_socket = 1; SET_BCAST_PARAMS(1, 1, 1) @@ -479,7 +495,7 @@ int mca_coll_acoll_bcast_shm(void *buff, size_t count, struct ompi_datatype_t *d coll_acoll_subcomms_t *subc = NULL; err = check_and_create_subc(comm, acoll_module, &subc); - if (!subc->initialized) { + if (!subc->initialized || (root != subc->prev_init_root)) { err = mca_coll_acoll_comm_split_init(comm, acoll_module, subc, root); if (MPI_SUCCESS != err) { return err; @@ -498,7 +514,7 @@ int mca_coll_acoll_bcast_shm(void *buff, size_t count, struct ompi_datatype_t *d int *l2_gp = data->l2_gp; int l2_gp_size = data->l2_gp_size; /* 16 * 1024 + 2 * 64 * size + 8 * 1024 * size */ - int offset_bcast = LEADER_SHM_SIZE + 2*CACHE_LINE_SIZE*size + PER_RANK_SHM_SIZE*size; + int offset_bcast = LEADER_SHM_SIZE + 2*CACHE_LINE_SIZE*size + PER_RANK_SHM_SIZE*size; volatile int *leader_shm; if (rank == l1_gp[0]) { @@ -524,73 +540,96 @@ int mca_coll_acoll_bcast_shm(void *buff, size_t count, struct ompi_datatype_t *d int ready; if (rank == root) { memcpy((char *) data->allshmmmap_sbuf[root], buff, count * dsize); - ready = __atomic_load_n(leader_shm, __ATOMIC_RELAXED); // we don't need atomic hear! + /* Ensure data copy completes before setting ready flag */ + opal_atomic_wmb(); + + ready = __atomic_load_n(leader_shm, __ATOMIC_ACQUIRE); ready++; - __atomic_store_n(leader_shm, ready, __ATOMIC_RELAXED); + /* Use RELEASE to ensure data is visible before flag update */ + __atomic_store_n(leader_shm, ready, __ATOMIC_RELEASE); + + /* Memory barrier to ensure flag store is visible before checking responses */ + opal_atomic_mb(); + for (int i = 0; i < l2_gp_size; i++) { if (l2_gp[i] == root) continue; volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_bcast + CACHE_LINE_SIZE * l2_gp[i]); - while (*val != ready) { - ; - } + spin_wait_with_progress(val, ready); } for (int i = 0; i < l1_gp_size; i++) { if (l1_gp[i] == root) continue; volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_bcast + CACHE_LINE_SIZE * l1_gp[i]); - while (*val != ready) { - ; - } + spin_wait_with_progress(val, ready); } } else if (rank == l1_gp[0]) { - volatile int leader_ready = __atomic_load_n(leader_shm, __ATOMIC_RELAXED); int done = __atomic_load_n((int *) ((char *) data->allshmmmap_sbuf[root] + offset_bcast + CACHE_LINE_SIZE * rank), - __ATOMIC_RELAXED); - while (done == leader_ready) { - leader_ready = __atomic_load_n(leader_shm, __ATOMIC_RELAXED); - } + __ATOMIC_ACQUIRE); + /* Use ACQUIRE to ensure we see root's data writes */ + done++; + spin_wait_with_progress((volatile int *)leader_shm, done); + + /* Memory barrier to ensure flag read completes before data copy */ + opal_atomic_rmb(); + memcpy(buff, (char *) data->allshmmmap_sbuf[root], count * dsize); memcpy((char *) data->allshmmmap_sbuf[rank], (char *) data->allshmmmap_sbuf[root], count * dsize); + + /* Ensure data copies complete before updating flags */ + opal_atomic_wmb(); + int val = __atomic_load_n((int *) ((char *) data->allshmmmap_sbuf[rank] + offset_bcast + CACHE_LINE_SIZE * rank), - __ATOMIC_RELAXED); // do we need atomic load? + __ATOMIC_ACQUIRE); val++; int local_val = val; + + /* Use RELEASE to ensure data is visible before flag updates */ __atomic_store_n((int *) ((char *) data->allshmmmap_sbuf[root] + offset_bcast + CACHE_LINE_SIZE * rank), - val, __ATOMIC_RELAXED); // do we need atomic store? + val, __ATOMIC_RELEASE); __atomic_store_n((int *) ((char *) data->allshmmmap_sbuf[rank] + offset_bcast + CACHE_LINE_SIZE * rank), - val, __ATOMIC_RELAXED); // do we need atomic store? - // do we need wmb() here? + val, __ATOMIC_RELEASE); + + /* Memory barrier to ensure flag stores are visible */ + opal_atomic_mb(); + for (int i = 0; i < l1_gp_size; i++) { if (l1_gp[i] == l1_gp[0]) continue; volatile int *vali = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_bcast - + CACHE_LINE_SIZE * l1_gp[i]); // do we need atomic_load here? - while (*vali != local_val) { - ; // can we use a more specific condition than "!=" ? - } + + CACHE_LINE_SIZE * l1_gp[i]); + spin_wait_with_progress(vali, local_val); } } else { int done = __atomic_load_n((int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_bcast + CACHE_LINE_SIZE * rank), - __ATOMIC_RELAXED); - while (done == *leader_shm) { - ; - } + __ATOMIC_ACQUIRE); + /* Use ACQUIRE to ensure we see leader's data writes */ + done++; + spin_wait_with_progress((volatile int *)leader_shm, done); + + /* Memory barrier to ensure flag read completes before data copy */ + opal_atomic_rmb(); + memcpy(buff, (char *) data->allshmmmap_sbuf[l1_gp[0]], count * dsize); + + /* Ensure data copy completes before updating flag */ + opal_atomic_wmb(); + int val = __atomic_load_n((int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_bcast + CACHE_LINE_SIZE * rank), - __ATOMIC_RELAXED); // do we need atomic load? + __ATOMIC_ACQUIRE); val++; + + /* Use RELEASE to ensure data is visible before flag update */ __atomic_store_n((int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_bcast + CACHE_LINE_SIZE * rank), - val, __ATOMIC_RELAXED); // do we need atomic store? - // do we need wmb() here? + val, __ATOMIC_RELEASE); } return err; } @@ -648,7 +687,11 @@ int mca_coll_acoll_bcast(void *buff, size_t count, struct ompi_datatype_t *datat /* Fallback to knomial if no. of root changes is beyond a threshold */ if ((subc->num_root_change > MCA_COLL_ACOLL_ROOT_CHANGE_THRESH) && (root != subc->prev_init_root)) { - return ompi_coll_base_bcast_intra_knomial(buff, count, datatype, root, comm, module, 0, 4); + if (acoll_module->disable_fallback) { + return ompi_coll_base_bcast_intra_basic_linear(buff, count, datatype, root, comm, module); + } else { + return ompi_coll_base_bcast_intra_knomial(buff, count, datatype, root, comm, module, 0, 4); + } } if ((!subc->initialized || (root != subc->prev_init_root)) && size > 2) { err = mca_coll_acoll_comm_split_init(comm, acoll_module, subc, root); @@ -670,8 +713,9 @@ int mca_coll_acoll_bcast(void *buff, size_t count, struct ompi_datatype_t *datat } /* Use knomial for nodes 8 and above and non-large messages */ - if ((num_nodes >= 8 && total_dsize <= 65536) - || (1 == num_nodes && size >= 256 && total_dsize < 16384)) { + if (((num_nodes >= 8 && total_dsize <= 65536) + || (1 == num_nodes && size >= 256 && total_dsize < 16384)) && + !acoll_module->disable_fallback) { return ompi_coll_base_bcast_intra_knomial(buff, count, datatype, root, comm, module, 0, 4); } @@ -683,8 +727,9 @@ int mca_coll_acoll_bcast(void *buff, size_t count, struct ompi_datatype_t *datat &use_numa, &use_socket, &use_shm, &lin_0, &lin_1, &lin_2, num_nodes, acoll_module, subc); no_sg = (sg_cnt == node_size) ? 1 : 0; - if (size <= 2) + if (size <= 2) { no_sg = 1; + } /* Disable shm based bcast if: */ /* - datatype is not a predefined type */ diff --git a/ompi/mca/coll/acoll/coll_acoll_gather.c b/ompi/mca/coll/acoll/coll_acoll_gather.c index 429b61296aa..fadc0b67f02 100644 --- a/ompi/mca/coll/acoll/coll_acoll_gather.c +++ b/ompi/mca/coll/acoll/coll_acoll_gather.c @@ -74,7 +74,7 @@ int mca_coll_acoll_gather_intra(const void *sbuf, size_t scount, struct ompi_dat ompi_datatype_type_extent(rdtype, &rextent); /* Just use the recv buffer */ wkg = (char *) rbuf; - if (sbuf != MPI_IN_PLACE) { + if (MPI_IN_PLACE != sbuf) { MPI_Aint root_ofst = rextent * (ptrdiff_t) (rcount * root); err = ompi_datatype_sndrcv((void *) sbuf, scount, sdtype, wkg + (ptrdiff_t) root_ofst, rcount, rdtype); @@ -160,6 +160,7 @@ int mca_coll_acoll_gather_intra(const void *sbuf, size_t scount, struct ompi_dat int local_root = (root_node == cur_node) ? root : startn; for (i = startn; i < endn; i += sg_cnt) { int i_sg = i / sg_cnt; + int i_node = i / node_cnt; if ((rank != local_root) && (rank == i) && is_base) { err = MCA_PML_CALL(send(workbuf - sgap, total_recv, sdtype, local_root, MCA_COLL_BASE_TAG_GATHER, MCA_PML_BASE_SEND_STANDARD, @@ -167,7 +168,12 @@ int mca_coll_acoll_gather_intra(const void *sbuf, size_t scount, struct ompi_dat } if ((rank == local_root) && (rank != i) && (i_sg != root_sg)) { size_t recv_amt = (i + sg_cnt > size) ? rcount * (size - i) : rcount * sg_cnt; - MPI_Aint rcv_ofst = rextent * (ptrdiff_t) (rcount * (i - startn)); + MPI_Aint rcv_ofst; + if (rank == root) { + rcv_ofst = rextent * (ptrdiff_t) (rcount * (i_node * node_cnt + i - startn)); + } else { + rcv_ofst = rextent * (ptrdiff_t) (rcount * (i - startn)); + } err = MCA_PML_CALL(recv(wkg + (ptrdiff_t) rcv_ofst, recv_amt, rdtype, i, MCA_COLL_BASE_TAG_GATHER, comm, &status)); @@ -182,11 +188,11 @@ int mca_coll_acoll_gather_intra(const void *sbuf, size_t scount, struct ompi_dat } } - /* All local roots ranks send to root */ + /* All local roots send to root */ if (node_cnt < size && num_nodes > 1) { for (i = 0; i < size; i += node_cnt) { int i_node = i / node_cnt; - if ((rank != root) && (rank == i) && is_base) { + if ((rank != root) && (rank == i) && is_local_root) { err = MCA_PML_CALL(send(workbuf - sgap, total_recv, sdtype, root, MCA_COLL_BASE_TAG_GATHER, MCA_PML_BASE_SEND_STANDARD, comm)); diff --git a/ompi/mca/coll/acoll/coll_acoll_module.c b/ompi/mca/coll/acoll/coll_acoll_module.c index 8d54830085d..fcae061d37c 100644 --- a/ompi/mca/coll/acoll/coll_acoll_module.c +++ b/ompi/mca/coll/acoll/coll_acoll_module.c @@ -163,6 +163,9 @@ mca_coll_base_module_t *mca_coll_acoll_comm_query(struct ompi_communicator_t *co acoll_module->allg_lin = mca_coll_acoll_allgather_lin; acoll_module->allg_ring = mca_coll_acoll_allgather_ring_1; + acoll_module->disable_fallback = 0; + acoll_module->red_algo = -1; + /* Choose whether to use [intra|inter], and [subgroup|normal]-based * algorithms. */ acoll_module->super.coll_module_enable = acoll_module_enable; diff --git a/ompi/mca/coll/acoll/coll_acoll_reduce.c b/ompi/mca/coll/acoll/coll_acoll_reduce.c index 7913af1678d..b01e7f84f59 100644 --- a/ompi/mca/coll/acoll/coll_acoll_reduce.c +++ b/ompi/mca/coll/acoll/coll_acoll_reduce.c @@ -63,6 +63,8 @@ static inline int coll_acoll_reduce_topo(const void *sbuf, void *rbuf, size_t co rank = ompi_comm_rank(comm); + int use_socket = 1; + tmp_sbuf = (char *) sbuf; if ((MPI_IN_PLACE == sbuf) && (rank == root)) { tmp_sbuf = (char *) rbuf; @@ -70,8 +72,9 @@ static inline int coll_acoll_reduce_topo(const void *sbuf, void *rbuf, size_t co int i; int ind1 = MCA_COLL_ACOLL_L3CACHE; - int ind2 = MCA_COLL_ACOLL_LYR_NODE; - int is_base = rank == subc->base_rank[ind1][ind2] ? 1 : 0; + int ind2 = use_socket ? MCA_COLL_ACOLL_LYR_SOCKET : MCA_COLL_ACOLL_LYR_NODE; + int cur_rank = use_socket ? ompi_comm_rank(subc->socket_comm) : rank; + int is_base = cur_rank == subc->base_rank[ind1][ind2] ? 1 : 0; int bound = subc->subgrp_size; sz = ompi_comm_size(subc->base_comm[ind1][ind2]); @@ -114,7 +117,9 @@ static inline int coll_acoll_reduce_topo(const void *sbuf, void *rbuf, size_t co } /* perform reduction at root */ if (is_base && (sz > 1)) { - if (rank != root) { + int ldr_root = use_socket ? subc->socket_rank : root; + int local_rank = (use_socket && subc->num_nodes > 1) ? ompi_comm_rank(subc->local_comm) : rank; + if (local_rank != ldr_root) { ret = MCA_PML_CALL(send(tmp_rbuf, count, dtype, subc->base_root[ind1][ind2], MCA_COLL_BASE_TAG_REDUCE, MCA_PML_BASE_SEND_STANDARD, subc->base_comm[ind1][ind2])); @@ -126,7 +131,7 @@ static inline int coll_acoll_reduce_topo(const void *sbuf, void *rbuf, size_t co return ret; } } - if (rank == root) { + if (local_rank == ldr_root) { for (i = 0; i < sz; i++) { if (i == subc->base_root[ind1][ind2]) { continue; @@ -137,12 +142,44 @@ static inline int coll_acoll_reduce_topo(const void *sbuf, void *rbuf, size_t co free(free_buffer); return ret; } - ompi_op_reduce(op, pml_buffer, rbuf, count, dtype); + ompi_op_reduce(op, pml_buffer, tmp_rbuf, count, dtype); + } + } + } + + if (use_socket) { + int soc_sz = ompi_comm_size(subc->socket_ldr_comm); + if (soc_sz > 1 && -1 != subc->socket_ldr_root) { + if (rank != root) { + ret = MCA_PML_CALL(send(tmp_rbuf, count, dtype, subc->socket_ldr_root, + MCA_COLL_BASE_TAG_REDUCE, MCA_PML_BASE_SEND_STANDARD, + subc->socket_ldr_comm)); + if (ret != MPI_SUCCESS) { + free(free_buffer); + if (NULL != tmp_rbuf) { + coll_acoll_buf_free(reserve_mem_rbuf_reduce, tmp_rbuf); + } + return ret; + } + } + if (rank == root) { + for (i = 0; i < soc_sz; i++) { + if (i == subc->socket_ldr_root) { + continue; + } + ret = MCA_PML_CALL(recv(pml_buffer, count, dtype, i, MCA_COLL_BASE_TAG_REDUCE, + subc->socket_ldr_comm, MPI_STATUS_IGNORE)); + if (ret != MPI_SUCCESS) { + free(free_buffer); + return ret; + } + ompi_op_reduce(op, pml_buffer, rbuf, count, dtype); + } } } } - /* if local root, reduce at root */ + /* if local root, free the scratch buffers */ if (is_base) { free(free_buffer); if (rank != root && NULL != tmp_rbuf) { @@ -346,8 +383,11 @@ int mca_coll_acoll_reduce_intra(const void *sbuf, void *rbuf, size_t count, ompi_datatype_type_size(dtype, &dsize); total_dsize = dsize * count; - - alg = coll_reduce_decision_fixed(size, total_dsize); + if (-1 == acoll_module->red_algo) { + alg = coll_reduce_decision_fixed(size, total_dsize); + } else { + alg = acoll_module->red_algo; + } /* Obtain the subcomms structure */ coll_acoll_subcomms_t *subc = NULL; @@ -367,10 +407,14 @@ int mca_coll_acoll_reduce_intra(const void *sbuf, void *rbuf, size_t count, } num_nodes = subc->num_nodes; - if (1 == num_nodes) { - if (total_dsize < 262144) { - if (-1 == alg /* interaction with xpmem implementation causing issues 0*/) { + int is_dsize_lt_thresh = total_dsize < 262144 ? 1 : 0; + if (-1 != acoll_module->red_algo) { + is_dsize_lt_thresh = 1; + alg = acoll_module->red_algo; + } + if (is_dsize_lt_thresh) { + if (0 == alg) { return coll_acoll_reduce_topo(sbuf, rbuf, count, dtype, op, root, comm, module, subc); } else if (1 == alg) { @@ -379,7 +423,7 @@ int mca_coll_acoll_reduce_intra(const void *sbuf, void *rbuf, size_t count, } else if (2 == alg) { return ompi_coll_base_reduce_intra_binomial(sbuf, rbuf, count, dtype, op, root, comm, module, 0, 0); - } else { /*(alg == 3)*/ + } else { /* either alg == 3 or acoll_module->red_algo is not 0, 1, 2*/ return ompi_coll_base_reduce_intra_in_order_binary(sbuf, rbuf, count, dtype, op, root, comm, module, 0, 0); } @@ -397,8 +441,13 @@ int mca_coll_acoll_reduce_intra(const void *sbuf, void *rbuf, size_t count, } } } else { - return ompi_coll_base_reduce_intra_binomial(sbuf, rbuf, count, dtype, op, root, comm, - module, 0, 0); + if (total_dsize <= 4096) { + return coll_acoll_reduce_topo(sbuf, rbuf, count, dtype, op, root, comm, module, + subc); + } else { + return ompi_coll_base_reduce_intra_binomial(sbuf, rbuf, count, dtype, op, root, comm, + module, 0, 0); + } } return MPI_SUCCESS; } diff --git a/ompi/mca/coll/acoll/coll_acoll_utils.h b/ompi/mca/coll/acoll/coll_acoll_utils.h index 5707165c0a1..49586ffafc6 100644 --- a/ompi/mca/coll/acoll/coll_acoll_utils.h +++ b/ompi/mca/coll/acoll/coll_acoll_utils.h @@ -32,6 +32,41 @@ extern int mca_coll_acoll_without_smsc; extern int mca_coll_acoll_smsc_use_sr_buf; extern int mca_coll_acoll_barrier_algo; +/* + * Hybrid backoff spin-wait with adaptive progress calls. + * Optimized for intra-node shared memory synchronization. + * + * Three-phase strategy: + * - Fast path (0-MCA_COLL_ACOLL_SPIN_FAST_PATH_ITERS): Pure spinning for typical shared memory case + * - Medium path (FAST_PATH-MCA_COLL_ACOLL_SPIN_MEDIUM_PATH_ITERS): Moderate progress for NUMA delays + * - Slow path (MEDIUM_PATH+): Aggressive progress for unexpected contention + */ +static inline void spin_wait_with_progress(volatile int *flag, int expected_value) +{ + int pcount = 0; + int progress_freq = 1; + int observed; + + while ((observed = __atomic_load_n(flag, __ATOMIC_ACQUIRE)) != expected_value) { + pcount++; + if (pcount < MCA_COLL_ACOLL_SPIN_FAST_PATH_ITERS) { + /* Fast path: pure spinning for intra-node shared memory (typical case) */ + continue; + } else if (pcount < MCA_COLL_ACOLL_SPIN_MEDIUM_PATH_ITERS) { + /* Medium path: moderate progress for NUMA delays or contention */ + if (pcount % MCA_COLL_ACOLL_SPIN_MEDIUM_PATH_FREQ == 0) { + opal_progress(); + } + } else { + /* Slow path: aggressive progress for unexpected delays */ + for (int j = 0; j < progress_freq; j++) { + opal_progress(); + } + if (progress_freq < MCA_COLL_ACOLL_SPIN_SLOW_PATH_MAX_FREQ) progress_freq++; + } + } +} + /* Function to allocate scratch buffer */ static inline void *coll_acoll_buf_alloc(coll_acoll_reserve_mem_t *reserve_mem_ptr, uint64_t size)