Add fine grained memory buffers for work/sync arrays

* Add interanl put_mem/get_mem{_wave, _wg} functions to read/write to work/sync arrays
* Add condition check to ensure all MPI processes are on the same compute node for IPC conduit


[ROCm/rocshmem commit: 6685d0ab60]
Tá an tiomantas seo le fáil i:
avinashkethineedi
2024-10-17 21:54:58 +00:00
tuismitheoir 5b47cf482d
tiomantas fbcba80cd3
D'athraigh 5 comhad le 262 breiseanna agus 27 scriosta
+134 -24
Féach ar an gComhad
@@ -66,6 +66,12 @@ IPCBackend::IPCBackend(MPI_Comm comm)
initIPC();
/**
* Check if num_pes == ipcImpl.shm_size)
* All the PEs must be with in a node for IPC conduit
*/
assert(num_pes == ipcImpl.shm_size);
auto *bp{ipc_backend_proxy.get()};
bp->heap_ptr = &heap;
@@ -84,7 +90,7 @@ IPCBackend::IPCBackend(MPI_Comm comm)
setup_team_world();
TeamInfo *tinfo = team_tracker.get_team_world()->tinfo_wrt_world;
init_wrk_sync_buffer();
roc_shmem_collective_init();
@@ -92,6 +98,8 @@ IPCBackend::IPCBackend(MPI_Comm comm)
teams_init();
TeamInfo *tinfo = team_tracker.get_team_world()->tinfo_wrt_world;
default_context_proxy_ = IPCDefaultContextProxyT(this, tinfo);
setup_ctxs();
@@ -119,6 +127,7 @@ IPCBackend::~IPCBackend() {
* and team world
*/
teams_destroy();
cleanup_wrk_sync_buffer();
auto *team_world{team_tracker.get_team_world()};
team_world->~Team();
CHECK_HIP(hipFree(team_world));
@@ -281,23 +290,111 @@ void IPCBackend::global_exit(int status) {
}
void IPCBackend::teams_destroy() {
roc_shmem_free(barrier_pSync_pool);
roc_shmem_free(reduce_pSync_pool);
roc_shmem_free(bcast_pSync_pool);
roc_shmem_free(alltoall_pSync_pool);
roc_shmem_free(pWrk_pool);
roc_shmem_free(pAta_pool);
free(pool_bitmask_);
free(reduced_bitmask_);
}
void IPCBackend::init_wrk_sync_buffer() {
/**
* calcualte work/sync buffer size
*/
auto max_num_teams{team_tracker.get_max_num_teams()};
/**
* size of barrier sync
*/
Wrk_Sync_buffer_size_ += sizeof(*barrier_sync) * ROC_SHMEM_BARRIER_SYNC_SIZE;
/**
* Size of sync arrays for the teams
*/
Wrk_Sync_buffer_size_ += sizeof(long) * max_num_teams *
(ROC_SHMEM_BARRIER_SYNC_SIZE +
ROC_SHMEM_REDUCE_SYNC_SIZE +
ROC_SHMEM_BCAST_SYNC_SIZE +
ROC_SHMEM_ALLTOALL_SYNC_SIZE);
/**
* Size of work arrays for the teams
* Accommodate largest possible data type for pWrk
*/
Wrk_Sync_buffer_size_ += sizeof(double) * max_num_teams *
(ROC_SHMEM_REDUCE_MIN_WRKDATA_SIZE +
ROC_SHMEM_ATA_MAX_WRKDATA_SIZE);
/**
* Size of fence array
*/
Wrk_Sync_buffer_size_ += sizeof(int) * num_pes;
/**
* Allocate a buffer of size Wrk_Sync_buffer_size_, using fine-grained
* memory allocator
*/
fine_grained_allocator_.allocate((void**)&Wrk_Sync_buffer_ptr_,
Wrk_Sync_buffer_size_);
assert(Wrk_Sync_buffer_ptr_);
temp_Wrk_Sync_buff_ptr_ = Wrk_Sync_buffer_ptr_;
/*
* Allocate a c-array to hold the IPC handles
*/
hipIpcMemHandle_t *ipc_handle = reinterpret_cast<hipIpcMemHandle_t*>(
malloc(num_pes * sizeof(hipIpcMemHandle_t)));
/*
* Call into the hip runtime to get an IPC handle for the allocated
* Wrk_Sync_buffer_ and store that IPC handle
*/
CHECK_HIP(hipIpcGetMemHandle(&ipc_handle[my_pe], Wrk_Sync_buffer_ptr_));
/*
* all-to-all exchange with each PE to share the IPC handles.
*/
MPI_Allgather(MPI_IN_PLACE, sizeof(hipIpcMemHandle_t), MPI_CHAR,
ipc_handle, sizeof(hipIpcMemHandle_t), MPI_CHAR, thread_comm);
/*
* Allocate device-side fine grained memory to hold IPC addresses of
* work/sync buffers
*/
fine_grained_allocator_.allocate(
reinterpret_cast<void**>(&Wrk_Sync_buffer_bases_),
num_pes * sizeof(char*));
assert(Wrk_Sync_buffer_bases_);
/*
* For all local processing elements, initialize the device-side array
* with the IPC work/sync buffer addresses.
*/
for (size_t i = 0; i < num_pes; i++) {
if (i != my_pe) {
CHECK_HIP(hipIpcOpenMemHandle(
reinterpret_cast<void**>(&Wrk_Sync_buffer_bases_[i]),
ipc_handle[i],
hipIpcMemLazyEnablePeerAccess));
} else {
Wrk_Sync_buffer_bases_[i] = Wrk_Sync_buffer_ptr_;
}
}
}
void IPCBackend::cleanup_wrk_sync_buffer() {
for (size_t i = 0; i < num_pes; i++) {
if (i != my_pe) {
CHECK_HIP(hipIpcCloseMemHandle(Wrk_Sync_buffer_bases_[i]));
}
}
fine_grained_allocator_.deallocate(Wrk_Sync_buffer_bases_);
fine_grained_allocator_.deallocate(Wrk_Sync_buffer_ptr_);
}
void IPCBackend::setup_fence_buffer() {
/*
* Allocate heap space for fence
* Allocate memory for fence
*/
fence_pool = reinterpret_cast<int *>(roc_shmem_malloc(
sizeof(int) * num_pes));
fence_pool = reinterpret_cast<int *>(temp_Wrk_Sync_buff_ptr_);
temp_Wrk_Sync_buff_ptr_ += sizeof(int) * num_pes;
}
void IPCBackend::roc_shmem_collective_init() {
@@ -306,7 +403,8 @@ void IPCBackend::roc_shmem_collective_init() {
*/
size_t one_sync_size_bytes{sizeof(*barrier_sync)};
size_t sync_size_bytes{one_sync_size_bytes * ROC_SHMEM_BARRIER_SYNC_SIZE};
heap.malloc(reinterpret_cast<void **>(&barrier_sync), sync_size_bytes);
barrier_sync = reinterpret_cast<int64_t*>(temp_Wrk_Sync_buff_ptr_);
temp_Wrk_Sync_buff_ptr_ += sync_size_bytes;
/*
* Initialize the barrier synchronization array with default values.
@@ -327,20 +425,32 @@ void IPCBackend::teams_init() {
* Allocate pools for the teams sync and work arrary from the SHEAP.
*/
auto max_num_teams{team_tracker.get_max_num_teams()};
barrier_pSync_pool = reinterpret_cast<long *>(roc_shmem_malloc(
sizeof(long) * ROC_SHMEM_BARRIER_SYNC_SIZE * max_num_teams));
reduce_pSync_pool = reinterpret_cast<long *>(roc_shmem_malloc(
sizeof(long) * ROC_SHMEM_REDUCE_SYNC_SIZE * max_num_teams));
bcast_pSync_pool = reinterpret_cast<long *>(roc_shmem_malloc(
sizeof(long) * ROC_SHMEM_BCAST_SYNC_SIZE * max_num_teams));
alltoall_pSync_pool = reinterpret_cast<long *>(roc_shmem_malloc(
sizeof(long) * ROC_SHMEM_ALLTOALL_SYNC_SIZE * max_num_teams));
barrier_pSync_pool = reinterpret_cast<long *>(temp_Wrk_Sync_buff_ptr_);
temp_Wrk_Sync_buff_ptr_ += sizeof(long) * ROC_SHMEM_BARRIER_SYNC_SIZE
* max_num_teams;
reduce_pSync_pool = reinterpret_cast<long *>(temp_Wrk_Sync_buff_ptr_);
temp_Wrk_Sync_buff_ptr_ += sizeof(long) * ROC_SHMEM_REDUCE_SYNC_SIZE
* max_num_teams;
bcast_pSync_pool = reinterpret_cast<long *>(temp_Wrk_Sync_buff_ptr_);
temp_Wrk_Sync_buff_ptr_ += sizeof(long) * ROC_SHMEM_BCAST_SYNC_SIZE
* max_num_teams;
alltoall_pSync_pool = reinterpret_cast<long *>(temp_Wrk_Sync_buff_ptr_);
temp_Wrk_Sync_buff_ptr_ += sizeof(long) * ROC_SHMEM_BCAST_SYNC_SIZE
* max_num_teams;
/* Accommodating for largest possible data type for pWrk */
pWrk_pool = roc_shmem_malloc(
sizeof(double) * ROC_SHMEM_REDUCE_MIN_WRKDATA_SIZE * max_num_teams);
pAta_pool = roc_shmem_malloc(sizeof(double) * ROC_SHMEM_ATA_MAX_WRKDATA_SIZE *
max_num_teams);
pWrk_pool = reinterpret_cast<void *>(temp_Wrk_Sync_buff_ptr_);
temp_Wrk_Sync_buff_ptr_ += sizeof(double) * ROC_SHMEM_REDUCE_MIN_WRKDATA_SIZE
* max_num_teams;
pAta_pool = reinterpret_cast<void *>(temp_Wrk_Sync_buff_ptr_);
temp_Wrk_Sync_buff_ptr_ += sizeof(double) * ROC_SHMEM_ATA_MAX_WRKDATA_SIZE
* max_num_teams;
/**
* Initialize the sync arrays in the pool with default values.
+47
Féach ar an gComhad
@@ -114,6 +114,13 @@ class IPCBackend : public Backend {
*/
void team_destroy(roc_shmem_team_t team) override;
/**
* @brief Accessor for work/sync bases
*
* @return Vector containing the addresses of the work/sync bases
*/
char** get_wrk_sync_bases() { return Wrk_Sync_buffer_bases_; }
/**
* @brief Handle to device memory fields.
*/
@@ -263,6 +270,46 @@ class IPCBackend : public Backend {
*/
int bitmask_size_{-1};
/**
* Fine grained memory allocator for buffers used in collectives Routines
*/
HIPDefaultFinegrainedAllocator fine_grained_allocator_ {};
/**
* @brief Collective routines work/sync buffer size
*/
size_t Wrk_Sync_buffer_size_{};
/**
* @brief Collective routines work/sync buffer base ptr
*/
char* const Wrk_Sync_buffer_ptr_{nullptr};
/**
* @brief Temporary buffer pointer pointing to the same address as
* Wrk_Sync_buffer_ptr_, used to calculate the starting addresses of
* different work and sync buffers.
*/
char *temp_Wrk_Sync_buff_ptr_{nullptr};
/**
* @brief Array containing the addresses of the work/sync buffer bases
* of other PEs
*/
char** Wrk_Sync_buffer_bases_{nullptr};
/**
* @brief Initialize memory required for work/sync buffers and open IPC
* handle on PE's Wrk_Sync_buffer_ptr.
*/
void init_wrk_sync_buffer();
/**
* @brief Close IPC memory handles for work/sync buffers and deallocate
* work/sync buffer.
*/
void cleanup_wrk_sync_buffer();
};
} // namespace rocshmem
+52
Féach ar an gComhad
@@ -48,6 +48,7 @@ __host__ IPCContext::IPCContext(Backend *b)
g_ret = bp->g_ret;
atomic_base_ptr = bp->atomic_ret->atomic_base_ptr;
fence_pool = backend->fence_pool;
Wrk_Sync_buffer_bases_ = backend->get_wrk_sync_bases();
orders_.store = detail::atomic::rocshmem_memory_order::memory_order_seq_cst;
@@ -165,4 +166,55 @@ __device__ void IPCContext::getmem_nbi_wave(void *dest, const void *source,
getmem_wave(dest, source, nelems, pe);
}
__device__ void IPCContext::internal_putmem(void *dest, const void *source,
size_t nelems, int pe) {
uint64_t L_offset =
reinterpret_cast<char *>(dest) - Wrk_Sync_buffer_bases_[my_pe];
memcpy(Wrk_Sync_buffer_bases_[pe] + L_offset,
const_cast<void *>(source), nelems);
}
__device__ void IPCContext::internal_getmem(void *dest, const void *source,
size_t nelems, int pe) {
const char *src_typed = reinterpret_cast<const char *>(source);
uint64_t L_offset =
const_cast<char *>(src_typed) - Wrk_Sync_buffer_bases_[my_pe];
memcpy(dest, Wrk_Sync_buffer_bases_[pe] + L_offset, nelems);
}
__device__ void IPCContext::internal_putmem_wg(void *dest, const void *source,
size_t nelems, int pe) {
uint64_t L_offset =
reinterpret_cast<char *>(dest) - Wrk_Sync_buffer_bases_[my_pe];
memcpy_wg(Wrk_Sync_buffer_bases_[pe] + L_offset,
const_cast<void *>(source), nelems);
__syncthreads();
}
__device__ void IPCContext::internal_getmem_wg(void *dest, const void *source,
size_t nelems, int pe) {
const char *src_typed = reinterpret_cast<const char *>(source);
uint64_t L_offset =
const_cast<char *>(src_typed) - Wrk_Sync_buffer_bases_[my_pe];
memcpy_wg(dest, Wrk_Sync_buffer_bases_[pe] + L_offset, nelems);
__syncthreads();
}
__device__ void IPCContext::internal_putmem_wave(void *dest,
const void *source, size_t nelems, int pe) {
uint64_t L_offset =
reinterpret_cast<char *>(dest) - Wrk_Sync_buffer_bases_[my_pe];
memcpy_wave(Wrk_Sync_buffer_bases_[pe] + L_offset,
const_cast<void *>(source), nelems);
}
__device__ void IPCContext::internal_getmem_wave(void *dest,
const void *source, size_t nelems, int pe) {
const char *src_typed = reinterpret_cast<const char *>(source);
uint64_t L_offset =
const_cast<char *>(src_typed) - Wrk_Sync_buffer_bases_[my_pe];
memcpy_wave(dest, Wrk_Sync_buffer_bases_[pe] + L_offset,
nelems);
}
} // namespace rocshmem
+26
Féach ar an gComhad
@@ -239,6 +239,26 @@ class IPCContext : public Context {
__device__ void internal_atomic_barrier(int pe, int PE_start, int stride,
int n_pes, int64_t *pSync);
//internal functions used by collectives routines to write/read to
//work/sync buffers
__device__ void internal_putmem(void *dest, const void *source,
size_t nelems, int pe);
__device__ void internal_getmem(void *dest, const void *source,
size_t nelems, int pe);
__device__ void internal_putmem_wg(void *dest, const void *source,
size_t nelems, int pe);
__device__ void internal_getmem_wg(void *dest, const void *source,
size_t nelems, int pe);
__device__ void internal_putmem_wave(void *dest, const void *source,
size_t nelems, int pe);
__device__ void internal_getmem_wave(void *dest, const void *source,
size_t nelems, int pe);
//Temporary scratchpad memory used by internal barrier algorithms.
int64_t *barrier_sync{nullptr};
@@ -252,6 +272,12 @@ class IPCContext : public Context {
//Buffer to perform Atomic store to enforce memory ordering
int *fence_pool{nullptr};
/**
* @brief Array containing the addresses of the work/sync buffer bases
* of other PEs
*/
char **Wrk_Sync_buffer_bases_{nullptr};
public:
//TODO(Avinash):
//Make tinfo private variable, it requires changes to the context
@@ -46,12 +46,12 @@ __device__ void IPCContext::internal_direct_barrier(int pe, int PE_start,
// Announce to other PEs that all have reached
for (size_t i = 1, j = PE_start + stride; i < n_pes; ++i, j += stride) {
put_nbi(&pSync[0], &flag_val, 1, j);
internal_putmem(&pSync[0], &flag_val, sizeof(*pSync), j);
}
} else {
// Mark current PE offset as reached
size_t pe_offset = (pe - PE_start) / stride;
put_nbi(&pSync[pe_offset], &flag_val, 1, PE_start);
internal_putmem(&pSync[pe_offset], &flag_val, sizeof(*pSync), PE_start);
#if defined(__gfx90a__)
__threadfence_system();
#endif /* __gfx90a__ */
@@ -71,7 +71,7 @@ __device__ void IPCContext::internal_atomic_barrier(int pe, int PE_start,
threadfence_system();
for (size_t i = 1, j = PE_start + stride; i < n_pes; ++i, j += stride) {
put_nbi(&pSync[0], &flag_val, 1, j);
internal_putmem(&pSync[0], &flag_val, sizeof(*pSync), j);
}
} else {
amo_add<int64_t>(&pSync[0], flag_val, PE_start);