From 6685d0ab60f46c5938b08ea200ae9f95f63c99f3 Mon Sep 17 00:00:00 2001 From: avinashkethineedi Date: Thu, 17 Oct 2024 21:54:58 +0000 Subject: [PATCH] Add fine grained memory buffers for work/sync arrays * Add interanl put_mem/get_mem{_wave, _wg} functions to read/write to work/sync arrays * Add condition check to ensure all MPI processes are on the same compute node for IPC conduit --- src/ipc/backend_ipc.cpp | 158 +++++++++++++++++++++++----- src/ipc/backend_ipc.hpp | 47 +++++++++ src/ipc/context_ipc_device.cpp | 52 +++++++++ src/ipc/context_ipc_device.hpp | 26 +++++ src/ipc/context_ipc_device_coll.cpp | 6 +- 5 files changed, 262 insertions(+), 27 deletions(-) diff --git a/src/ipc/backend_ipc.cpp b/src/ipc/backend_ipc.cpp index 7475fb2c8b..4b5bd6ef08 100644 --- a/src/ipc/backend_ipc.cpp +++ b/src/ipc/backend_ipc.cpp @@ -66,6 +66,12 @@ IPCBackend::IPCBackend(MPI_Comm comm) initIPC(); + /** + * Check if num_pes == ipcImpl.shm_size) + * All the PEs must be with in a node for IPC conduit + */ + assert(num_pes == ipcImpl.shm_size); + auto *bp{ipc_backend_proxy.get()}; bp->heap_ptr = &heap; @@ -84,7 +90,7 @@ IPCBackend::IPCBackend(MPI_Comm comm) setup_team_world(); - TeamInfo *tinfo = team_tracker.get_team_world()->tinfo_wrt_world; + init_wrk_sync_buffer(); roc_shmem_collective_init(); @@ -92,6 +98,8 @@ IPCBackend::IPCBackend(MPI_Comm comm) teams_init(); + TeamInfo *tinfo = team_tracker.get_team_world()->tinfo_wrt_world; + default_context_proxy_ = IPCDefaultContextProxyT(this, tinfo); setup_ctxs(); @@ -119,6 +127,7 @@ IPCBackend::~IPCBackend() { * and team world */ teams_destroy(); + cleanup_wrk_sync_buffer(); auto *team_world{team_tracker.get_team_world()}; team_world->~Team(); CHECK_HIP(hipFree(team_world)); @@ -281,23 +290,111 @@ void IPCBackend::global_exit(int status) { } void IPCBackend::teams_destroy() { - roc_shmem_free(barrier_pSync_pool); - roc_shmem_free(reduce_pSync_pool); - roc_shmem_free(bcast_pSync_pool); - roc_shmem_free(alltoall_pSync_pool); - roc_shmem_free(pWrk_pool); - roc_shmem_free(pAta_pool); - free(pool_bitmask_); free(reduced_bitmask_); } +void IPCBackend::init_wrk_sync_buffer() { + /** + * calcualte work/sync buffer size + */ + auto max_num_teams{team_tracker.get_max_num_teams()}; + + /** + * size of barrier sync + */ + Wrk_Sync_buffer_size_ += sizeof(*barrier_sync) * ROC_SHMEM_BARRIER_SYNC_SIZE; + + /** + * Size of sync arrays for the teams + */ + Wrk_Sync_buffer_size_ += sizeof(long) * max_num_teams * + (ROC_SHMEM_BARRIER_SYNC_SIZE + + ROC_SHMEM_REDUCE_SYNC_SIZE + + ROC_SHMEM_BCAST_SYNC_SIZE + + ROC_SHMEM_ALLTOALL_SYNC_SIZE); + + /** + * Size of work arrays for the teams + * Accommodate largest possible data type for pWrk + */ + Wrk_Sync_buffer_size_ += sizeof(double) * max_num_teams * + (ROC_SHMEM_REDUCE_MIN_WRKDATA_SIZE + + ROC_SHMEM_ATA_MAX_WRKDATA_SIZE); + + /** + * Size of fence array + */ + Wrk_Sync_buffer_size_ += sizeof(int) * num_pes; + + /** + * Allocate a buffer of size Wrk_Sync_buffer_size_, using fine-grained + * memory allocator + */ + fine_grained_allocator_.allocate((void**)&Wrk_Sync_buffer_ptr_, + Wrk_Sync_buffer_size_); + assert(Wrk_Sync_buffer_ptr_); + temp_Wrk_Sync_buff_ptr_ = Wrk_Sync_buffer_ptr_; + + /* + * Allocate a c-array to hold the IPC handles + */ + hipIpcMemHandle_t *ipc_handle = reinterpret_cast( + malloc(num_pes * sizeof(hipIpcMemHandle_t))); + + /* + * Call into the hip runtime to get an IPC handle for the allocated + * Wrk_Sync_buffer_ and store that IPC handle + */ + CHECK_HIP(hipIpcGetMemHandle(&ipc_handle[my_pe], Wrk_Sync_buffer_ptr_)); + + /* + * all-to-all exchange with each PE to share the IPC handles. + */ + MPI_Allgather(MPI_IN_PLACE, sizeof(hipIpcMemHandle_t), MPI_CHAR, + ipc_handle, sizeof(hipIpcMemHandle_t), MPI_CHAR, thread_comm); + + /* + * Allocate device-side fine grained memory to hold IPC addresses of + * work/sync buffers + */ + fine_grained_allocator_.allocate( + reinterpret_cast(&Wrk_Sync_buffer_bases_), + num_pes * sizeof(char*)); + assert(Wrk_Sync_buffer_bases_); + + /* + * For all local processing elements, initialize the device-side array + * with the IPC work/sync buffer addresses. + */ + for (size_t i = 0; i < num_pes; i++) { + if (i != my_pe) { + CHECK_HIP(hipIpcOpenMemHandle( + reinterpret_cast(&Wrk_Sync_buffer_bases_[i]), + ipc_handle[i], + hipIpcMemLazyEnablePeerAccess)); + } else { + Wrk_Sync_buffer_bases_[i] = Wrk_Sync_buffer_ptr_; + } + } +} + +void IPCBackend::cleanup_wrk_sync_buffer() { + for (size_t i = 0; i < num_pes; i++) { + if (i != my_pe) { + CHECK_HIP(hipIpcCloseMemHandle(Wrk_Sync_buffer_bases_[i])); + } + } + fine_grained_allocator_.deallocate(Wrk_Sync_buffer_bases_); + fine_grained_allocator_.deallocate(Wrk_Sync_buffer_ptr_); +} + void IPCBackend::setup_fence_buffer() { /* - * Allocate heap space for fence + * Allocate memory for fence */ - fence_pool = reinterpret_cast(roc_shmem_malloc( - sizeof(int) * num_pes)); + fence_pool = reinterpret_cast(temp_Wrk_Sync_buff_ptr_); + temp_Wrk_Sync_buff_ptr_ += sizeof(int) * num_pes; } void IPCBackend::roc_shmem_collective_init() { @@ -306,7 +403,8 @@ void IPCBackend::roc_shmem_collective_init() { */ size_t one_sync_size_bytes{sizeof(*barrier_sync)}; size_t sync_size_bytes{one_sync_size_bytes * ROC_SHMEM_BARRIER_SYNC_SIZE}; - heap.malloc(reinterpret_cast(&barrier_sync), sync_size_bytes); + barrier_sync = reinterpret_cast(temp_Wrk_Sync_buff_ptr_); + temp_Wrk_Sync_buff_ptr_ += sync_size_bytes; /* * Initialize the barrier synchronization array with default values. @@ -327,20 +425,32 @@ void IPCBackend::teams_init() { * Allocate pools for the teams sync and work arrary from the SHEAP. */ auto max_num_teams{team_tracker.get_max_num_teams()}; - barrier_pSync_pool = reinterpret_cast(roc_shmem_malloc( - sizeof(long) * ROC_SHMEM_BARRIER_SYNC_SIZE * max_num_teams)); - reduce_pSync_pool = reinterpret_cast(roc_shmem_malloc( - sizeof(long) * ROC_SHMEM_REDUCE_SYNC_SIZE * max_num_teams)); - bcast_pSync_pool = reinterpret_cast(roc_shmem_malloc( - sizeof(long) * ROC_SHMEM_BCAST_SYNC_SIZE * max_num_teams)); - alltoall_pSync_pool = reinterpret_cast(roc_shmem_malloc( - sizeof(long) * ROC_SHMEM_ALLTOALL_SYNC_SIZE * max_num_teams)); + + barrier_pSync_pool = reinterpret_cast(temp_Wrk_Sync_buff_ptr_); + temp_Wrk_Sync_buff_ptr_ += sizeof(long) * ROC_SHMEM_BARRIER_SYNC_SIZE + * max_num_teams; + + reduce_pSync_pool = reinterpret_cast(temp_Wrk_Sync_buff_ptr_); + temp_Wrk_Sync_buff_ptr_ += sizeof(long) * ROC_SHMEM_REDUCE_SYNC_SIZE + * max_num_teams; + + bcast_pSync_pool = reinterpret_cast(temp_Wrk_Sync_buff_ptr_); + temp_Wrk_Sync_buff_ptr_ += sizeof(long) * ROC_SHMEM_BCAST_SYNC_SIZE + * max_num_teams; + + alltoall_pSync_pool = reinterpret_cast(temp_Wrk_Sync_buff_ptr_); + temp_Wrk_Sync_buff_ptr_ += sizeof(long) * ROC_SHMEM_BCAST_SYNC_SIZE + * max_num_teams; /* Accommodating for largest possible data type for pWrk */ - pWrk_pool = roc_shmem_malloc( - sizeof(double) * ROC_SHMEM_REDUCE_MIN_WRKDATA_SIZE * max_num_teams); - pAta_pool = roc_shmem_malloc(sizeof(double) * ROC_SHMEM_ATA_MAX_WRKDATA_SIZE * - max_num_teams); + pWrk_pool = reinterpret_cast(temp_Wrk_Sync_buff_ptr_); + temp_Wrk_Sync_buff_ptr_ += sizeof(double) * ROC_SHMEM_REDUCE_MIN_WRKDATA_SIZE + * max_num_teams; + + + pAta_pool = reinterpret_cast(temp_Wrk_Sync_buff_ptr_); + temp_Wrk_Sync_buff_ptr_ += sizeof(double) * ROC_SHMEM_ATA_MAX_WRKDATA_SIZE + * max_num_teams; /** * Initialize the sync arrays in the pool with default values. diff --git a/src/ipc/backend_ipc.hpp b/src/ipc/backend_ipc.hpp index de8cb9c617..2f84b9c534 100644 --- a/src/ipc/backend_ipc.hpp +++ b/src/ipc/backend_ipc.hpp @@ -114,6 +114,13 @@ class IPCBackend : public Backend { */ void team_destroy(roc_shmem_team_t team) override; + /** + * @brief Accessor for work/sync bases + * + * @return Vector containing the addresses of the work/sync bases + */ + char** get_wrk_sync_bases() { return Wrk_Sync_buffer_bases_; } + /** * @brief Handle to device memory fields. */ @@ -263,6 +270,46 @@ class IPCBackend : public Backend { */ int bitmask_size_{-1}; + /** + * Fine grained memory allocator for buffers used in collectives Routines + */ + HIPDefaultFinegrainedAllocator fine_grained_allocator_ {}; + + /** + * @brief Collective routines work/sync buffer size + */ + size_t Wrk_Sync_buffer_size_{}; + + /** + * @brief Collective routines work/sync buffer base ptr + */ + char* const Wrk_Sync_buffer_ptr_{nullptr}; + + /** + * @brief Temporary buffer pointer pointing to the same address as + * Wrk_Sync_buffer_ptr_, used to calculate the starting addresses of + * different work and sync buffers. + */ + char *temp_Wrk_Sync_buff_ptr_{nullptr}; + + /** + * @brief Array containing the addresses of the work/sync buffer bases + * of other PEs + */ + char** Wrk_Sync_buffer_bases_{nullptr}; + + /** + * @brief Initialize memory required for work/sync buffers and open IPC + * handle on PE's Wrk_Sync_buffer_ptr. + */ + void init_wrk_sync_buffer(); + + /** + * @brief Close IPC memory handles for work/sync buffers and deallocate + * work/sync buffer. + */ + void cleanup_wrk_sync_buffer(); + }; } // namespace rocshmem diff --git a/src/ipc/context_ipc_device.cpp b/src/ipc/context_ipc_device.cpp index d208a070fb..275b15fe82 100644 --- a/src/ipc/context_ipc_device.cpp +++ b/src/ipc/context_ipc_device.cpp @@ -48,6 +48,7 @@ __host__ IPCContext::IPCContext(Backend *b) g_ret = bp->g_ret; atomic_base_ptr = bp->atomic_ret->atomic_base_ptr; fence_pool = backend->fence_pool; + Wrk_Sync_buffer_bases_ = backend->get_wrk_sync_bases(); orders_.store = detail::atomic::rocshmem_memory_order::memory_order_seq_cst; @@ -165,4 +166,55 @@ __device__ void IPCContext::getmem_nbi_wave(void *dest, const void *source, getmem_wave(dest, source, nelems, pe); } +__device__ void IPCContext::internal_putmem(void *dest, const void *source, + size_t nelems, int pe) { + uint64_t L_offset = + reinterpret_cast(dest) - Wrk_Sync_buffer_bases_[my_pe]; + memcpy(Wrk_Sync_buffer_bases_[pe] + L_offset, + const_cast(source), nelems); +} + +__device__ void IPCContext::internal_getmem(void *dest, const void *source, + size_t nelems, int pe) { + const char *src_typed = reinterpret_cast(source); + uint64_t L_offset = + const_cast(src_typed) - Wrk_Sync_buffer_bases_[my_pe]; + memcpy(dest, Wrk_Sync_buffer_bases_[pe] + L_offset, nelems); +} + +__device__ void IPCContext::internal_putmem_wg(void *dest, const void *source, + size_t nelems, int pe) { + uint64_t L_offset = + reinterpret_cast(dest) - Wrk_Sync_buffer_bases_[my_pe]; + memcpy_wg(Wrk_Sync_buffer_bases_[pe] + L_offset, + const_cast(source), nelems); + __syncthreads(); +} + +__device__ void IPCContext::internal_getmem_wg(void *dest, const void *source, + size_t nelems, int pe) { + const char *src_typed = reinterpret_cast(source); + uint64_t L_offset = + const_cast(src_typed) - Wrk_Sync_buffer_bases_[my_pe]; + memcpy_wg(dest, Wrk_Sync_buffer_bases_[pe] + L_offset, nelems); + __syncthreads(); +} + +__device__ void IPCContext::internal_putmem_wave(void *dest, + const void *source, size_t nelems, int pe) { + uint64_t L_offset = + reinterpret_cast(dest) - Wrk_Sync_buffer_bases_[my_pe]; + memcpy_wave(Wrk_Sync_buffer_bases_[pe] + L_offset, + const_cast(source), nelems); +} + +__device__ void IPCContext::internal_getmem_wave(void *dest, + const void *source, size_t nelems, int pe) { + const char *src_typed = reinterpret_cast(source); + uint64_t L_offset = + const_cast(src_typed) - Wrk_Sync_buffer_bases_[my_pe]; + memcpy_wave(dest, Wrk_Sync_buffer_bases_[pe] + L_offset, + nelems); +} + } // namespace rocshmem diff --git a/src/ipc/context_ipc_device.hpp b/src/ipc/context_ipc_device.hpp index 065d37cec4..2a7aaebd62 100644 --- a/src/ipc/context_ipc_device.hpp +++ b/src/ipc/context_ipc_device.hpp @@ -239,6 +239,26 @@ class IPCContext : public Context { __device__ void internal_atomic_barrier(int pe, int PE_start, int stride, int n_pes, int64_t *pSync); + //internal functions used by collectives routines to write/read to + //work/sync buffers + __device__ void internal_putmem(void *dest, const void *source, + size_t nelems, int pe); + + __device__ void internal_getmem(void *dest, const void *source, + size_t nelems, int pe); + + __device__ void internal_putmem_wg(void *dest, const void *source, + size_t nelems, int pe); + + __device__ void internal_getmem_wg(void *dest, const void *source, + size_t nelems, int pe); + + __device__ void internal_putmem_wave(void *dest, const void *source, + size_t nelems, int pe); + + __device__ void internal_getmem_wave(void *dest, const void *source, + size_t nelems, int pe); + //Temporary scratchpad memory used by internal barrier algorithms. int64_t *barrier_sync{nullptr}; @@ -252,6 +272,12 @@ class IPCContext : public Context { //Buffer to perform Atomic store to enforce memory ordering int *fence_pool{nullptr}; + /** + * @brief Array containing the addresses of the work/sync buffer bases + * of other PEs + */ + char **Wrk_Sync_buffer_bases_{nullptr}; + public: //TODO(Avinash): //Make tinfo private variable, it requires changes to the context diff --git a/src/ipc/context_ipc_device_coll.cpp b/src/ipc/context_ipc_device_coll.cpp index f128d76d3f..7d7f39919c 100644 --- a/src/ipc/context_ipc_device_coll.cpp +++ b/src/ipc/context_ipc_device_coll.cpp @@ -46,12 +46,12 @@ __device__ void IPCContext::internal_direct_barrier(int pe, int PE_start, // Announce to other PEs that all have reached for (size_t i = 1, j = PE_start + stride; i < n_pes; ++i, j += stride) { - put_nbi(&pSync[0], &flag_val, 1, j); + internal_putmem(&pSync[0], &flag_val, sizeof(*pSync), j); } } else { // Mark current PE offset as reached size_t pe_offset = (pe - PE_start) / stride; - put_nbi(&pSync[pe_offset], &flag_val, 1, PE_start); + internal_putmem(&pSync[pe_offset], &flag_val, sizeof(*pSync), PE_start); #if defined(__gfx90a__) __threadfence_system(); #endif /* __gfx90a__ */ @@ -71,7 +71,7 @@ __device__ void IPCContext::internal_atomic_barrier(int pe, int PE_start, threadfence_system(); for (size_t i = 1, j = PE_start + stride; i < n_pes; ++i, j += stride) { - put_nbi(&pSync[0], &flag_val, 1, j); + internal_putmem(&pSync[0], &flag_val, sizeof(*pSync), j); } } else { amo_add(&pSync[0], flag_val, PE_start);