Merge pull request #41 from avinashkethineedi/collective_routine_buffers
Fine grained memory buffers for work/sync arrays
Dieser Commit ist enthalten in:
+134
-24
@@ -66,6 +66,12 @@ IPCBackend::IPCBackend(MPI_Comm comm)
|
||||
|
||||
initIPC();
|
||||
|
||||
/**
|
||||
* Check if num_pes == ipcImpl.shm_size)
|
||||
* All the PEs must be with in a node for IPC conduit
|
||||
*/
|
||||
assert(num_pes == ipcImpl.shm_size);
|
||||
|
||||
auto *bp{ipc_backend_proxy.get()};
|
||||
|
||||
bp->heap_ptr = &heap;
|
||||
@@ -84,7 +90,7 @@ IPCBackend::IPCBackend(MPI_Comm comm)
|
||||
|
||||
setup_team_world();
|
||||
|
||||
TeamInfo *tinfo = team_tracker.get_team_world()->tinfo_wrt_world;
|
||||
init_wrk_sync_buffer();
|
||||
|
||||
roc_shmem_collective_init();
|
||||
|
||||
@@ -92,6 +98,8 @@ IPCBackend::IPCBackend(MPI_Comm comm)
|
||||
|
||||
teams_init();
|
||||
|
||||
TeamInfo *tinfo = team_tracker.get_team_world()->tinfo_wrt_world;
|
||||
|
||||
default_context_proxy_ = IPCDefaultContextProxyT(this, tinfo);
|
||||
|
||||
setup_ctxs();
|
||||
@@ -119,6 +127,7 @@ IPCBackend::~IPCBackend() {
|
||||
* and team world
|
||||
*/
|
||||
teams_destroy();
|
||||
cleanup_wrk_sync_buffer();
|
||||
auto *team_world{team_tracker.get_team_world()};
|
||||
team_world->~Team();
|
||||
CHECK_HIP(hipFree(team_world));
|
||||
@@ -281,23 +290,111 @@ void IPCBackend::global_exit(int status) {
|
||||
}
|
||||
|
||||
void IPCBackend::teams_destroy() {
|
||||
roc_shmem_free(barrier_pSync_pool);
|
||||
roc_shmem_free(reduce_pSync_pool);
|
||||
roc_shmem_free(bcast_pSync_pool);
|
||||
roc_shmem_free(alltoall_pSync_pool);
|
||||
roc_shmem_free(pWrk_pool);
|
||||
roc_shmem_free(pAta_pool);
|
||||
|
||||
free(pool_bitmask_);
|
||||
free(reduced_bitmask_);
|
||||
}
|
||||
|
||||
void IPCBackend::init_wrk_sync_buffer() {
|
||||
/**
|
||||
* calcualte work/sync buffer size
|
||||
*/
|
||||
auto max_num_teams{team_tracker.get_max_num_teams()};
|
||||
|
||||
/**
|
||||
* size of barrier sync
|
||||
*/
|
||||
Wrk_Sync_buffer_size_ += sizeof(*barrier_sync) * ROC_SHMEM_BARRIER_SYNC_SIZE;
|
||||
|
||||
/**
|
||||
* Size of sync arrays for the teams
|
||||
*/
|
||||
Wrk_Sync_buffer_size_ += sizeof(long) * max_num_teams *
|
||||
(ROC_SHMEM_BARRIER_SYNC_SIZE +
|
||||
ROC_SHMEM_REDUCE_SYNC_SIZE +
|
||||
ROC_SHMEM_BCAST_SYNC_SIZE +
|
||||
ROC_SHMEM_ALLTOALL_SYNC_SIZE);
|
||||
|
||||
/**
|
||||
* Size of work arrays for the teams
|
||||
* Accommodate largest possible data type for pWrk
|
||||
*/
|
||||
Wrk_Sync_buffer_size_ += sizeof(double) * max_num_teams *
|
||||
(ROC_SHMEM_REDUCE_MIN_WRKDATA_SIZE +
|
||||
ROC_SHMEM_ATA_MAX_WRKDATA_SIZE);
|
||||
|
||||
/**
|
||||
* Size of fence array
|
||||
*/
|
||||
Wrk_Sync_buffer_size_ += sizeof(int) * num_pes;
|
||||
|
||||
/**
|
||||
* Allocate a buffer of size Wrk_Sync_buffer_size_, using fine-grained
|
||||
* memory allocator
|
||||
*/
|
||||
fine_grained_allocator_.allocate((void**)&Wrk_Sync_buffer_ptr_,
|
||||
Wrk_Sync_buffer_size_);
|
||||
assert(Wrk_Sync_buffer_ptr_);
|
||||
temp_Wrk_Sync_buff_ptr_ = Wrk_Sync_buffer_ptr_;
|
||||
|
||||
/*
|
||||
* Allocate a c-array to hold the IPC handles
|
||||
*/
|
||||
hipIpcMemHandle_t *ipc_handle = reinterpret_cast<hipIpcMemHandle_t*>(
|
||||
malloc(num_pes * sizeof(hipIpcMemHandle_t)));
|
||||
|
||||
/*
|
||||
* Call into the hip runtime to get an IPC handle for the allocated
|
||||
* Wrk_Sync_buffer_ and store that IPC handle
|
||||
*/
|
||||
CHECK_HIP(hipIpcGetMemHandle(&ipc_handle[my_pe], Wrk_Sync_buffer_ptr_));
|
||||
|
||||
/*
|
||||
* all-to-all exchange with each PE to share the IPC handles.
|
||||
*/
|
||||
MPI_Allgather(MPI_IN_PLACE, sizeof(hipIpcMemHandle_t), MPI_CHAR,
|
||||
ipc_handle, sizeof(hipIpcMemHandle_t), MPI_CHAR, thread_comm);
|
||||
|
||||
/*
|
||||
* Allocate device-side fine grained memory to hold IPC addresses of
|
||||
* work/sync buffers
|
||||
*/
|
||||
fine_grained_allocator_.allocate(
|
||||
reinterpret_cast<void**>(&Wrk_Sync_buffer_bases_),
|
||||
num_pes * sizeof(char*));
|
||||
assert(Wrk_Sync_buffer_bases_);
|
||||
|
||||
/*
|
||||
* For all local processing elements, initialize the device-side array
|
||||
* with the IPC work/sync buffer addresses.
|
||||
*/
|
||||
for (size_t i = 0; i < num_pes; i++) {
|
||||
if (i != my_pe) {
|
||||
CHECK_HIP(hipIpcOpenMemHandle(
|
||||
reinterpret_cast<void**>(&Wrk_Sync_buffer_bases_[i]),
|
||||
ipc_handle[i],
|
||||
hipIpcMemLazyEnablePeerAccess));
|
||||
} else {
|
||||
Wrk_Sync_buffer_bases_[i] = Wrk_Sync_buffer_ptr_;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void IPCBackend::cleanup_wrk_sync_buffer() {
|
||||
for (size_t i = 0; i < num_pes; i++) {
|
||||
if (i != my_pe) {
|
||||
CHECK_HIP(hipIpcCloseMemHandle(Wrk_Sync_buffer_bases_[i]));
|
||||
}
|
||||
}
|
||||
fine_grained_allocator_.deallocate(Wrk_Sync_buffer_bases_);
|
||||
fine_grained_allocator_.deallocate(Wrk_Sync_buffer_ptr_);
|
||||
}
|
||||
|
||||
void IPCBackend::setup_fence_buffer() {
|
||||
/*
|
||||
* Allocate heap space for fence
|
||||
* Allocate memory for fence
|
||||
*/
|
||||
fence_pool = reinterpret_cast<int *>(roc_shmem_malloc(
|
||||
sizeof(int) * num_pes));
|
||||
fence_pool = reinterpret_cast<int *>(temp_Wrk_Sync_buff_ptr_);
|
||||
temp_Wrk_Sync_buff_ptr_ += sizeof(int) * num_pes;
|
||||
}
|
||||
|
||||
void IPCBackend::roc_shmem_collective_init() {
|
||||
@@ -306,7 +403,8 @@ void IPCBackend::roc_shmem_collective_init() {
|
||||
*/
|
||||
size_t one_sync_size_bytes{sizeof(*barrier_sync)};
|
||||
size_t sync_size_bytes{one_sync_size_bytes * ROC_SHMEM_BARRIER_SYNC_SIZE};
|
||||
heap.malloc(reinterpret_cast<void **>(&barrier_sync), sync_size_bytes);
|
||||
barrier_sync = reinterpret_cast<int64_t*>(temp_Wrk_Sync_buff_ptr_);
|
||||
temp_Wrk_Sync_buff_ptr_ += sync_size_bytes;
|
||||
|
||||
/*
|
||||
* Initialize the barrier synchronization array with default values.
|
||||
@@ -327,20 +425,32 @@ void IPCBackend::teams_init() {
|
||||
* Allocate pools for the teams sync and work arrary from the SHEAP.
|
||||
*/
|
||||
auto max_num_teams{team_tracker.get_max_num_teams()};
|
||||
barrier_pSync_pool = reinterpret_cast<long *>(roc_shmem_malloc(
|
||||
sizeof(long) * ROC_SHMEM_BARRIER_SYNC_SIZE * max_num_teams));
|
||||
reduce_pSync_pool = reinterpret_cast<long *>(roc_shmem_malloc(
|
||||
sizeof(long) * ROC_SHMEM_REDUCE_SYNC_SIZE * max_num_teams));
|
||||
bcast_pSync_pool = reinterpret_cast<long *>(roc_shmem_malloc(
|
||||
sizeof(long) * ROC_SHMEM_BCAST_SYNC_SIZE * max_num_teams));
|
||||
alltoall_pSync_pool = reinterpret_cast<long *>(roc_shmem_malloc(
|
||||
sizeof(long) * ROC_SHMEM_ALLTOALL_SYNC_SIZE * max_num_teams));
|
||||
|
||||
barrier_pSync_pool = reinterpret_cast<long *>(temp_Wrk_Sync_buff_ptr_);
|
||||
temp_Wrk_Sync_buff_ptr_ += sizeof(long) * ROC_SHMEM_BARRIER_SYNC_SIZE
|
||||
* max_num_teams;
|
||||
|
||||
reduce_pSync_pool = reinterpret_cast<long *>(temp_Wrk_Sync_buff_ptr_);
|
||||
temp_Wrk_Sync_buff_ptr_ += sizeof(long) * ROC_SHMEM_REDUCE_SYNC_SIZE
|
||||
* max_num_teams;
|
||||
|
||||
bcast_pSync_pool = reinterpret_cast<long *>(temp_Wrk_Sync_buff_ptr_);
|
||||
temp_Wrk_Sync_buff_ptr_ += sizeof(long) * ROC_SHMEM_BCAST_SYNC_SIZE
|
||||
* max_num_teams;
|
||||
|
||||
alltoall_pSync_pool = reinterpret_cast<long *>(temp_Wrk_Sync_buff_ptr_);
|
||||
temp_Wrk_Sync_buff_ptr_ += sizeof(long) * ROC_SHMEM_BCAST_SYNC_SIZE
|
||||
* max_num_teams;
|
||||
|
||||
/* Accommodating for largest possible data type for pWrk */
|
||||
pWrk_pool = roc_shmem_malloc(
|
||||
sizeof(double) * ROC_SHMEM_REDUCE_MIN_WRKDATA_SIZE * max_num_teams);
|
||||
pAta_pool = roc_shmem_malloc(sizeof(double) * ROC_SHMEM_ATA_MAX_WRKDATA_SIZE *
|
||||
max_num_teams);
|
||||
pWrk_pool = reinterpret_cast<void *>(temp_Wrk_Sync_buff_ptr_);
|
||||
temp_Wrk_Sync_buff_ptr_ += sizeof(double) * ROC_SHMEM_REDUCE_MIN_WRKDATA_SIZE
|
||||
* max_num_teams;
|
||||
|
||||
|
||||
pAta_pool = reinterpret_cast<void *>(temp_Wrk_Sync_buff_ptr_);
|
||||
temp_Wrk_Sync_buff_ptr_ += sizeof(double) * ROC_SHMEM_ATA_MAX_WRKDATA_SIZE
|
||||
* max_num_teams;
|
||||
|
||||
/**
|
||||
* Initialize the sync arrays in the pool with default values.
|
||||
|
||||
@@ -114,6 +114,13 @@ class IPCBackend : public Backend {
|
||||
*/
|
||||
void team_destroy(roc_shmem_team_t team) override;
|
||||
|
||||
/**
|
||||
* @brief Accessor for work/sync bases
|
||||
*
|
||||
* @return Vector containing the addresses of the work/sync bases
|
||||
*/
|
||||
char** get_wrk_sync_bases() { return Wrk_Sync_buffer_bases_; }
|
||||
|
||||
/**
|
||||
* @brief Handle to device memory fields.
|
||||
*/
|
||||
@@ -263,6 +270,46 @@ class IPCBackend : public Backend {
|
||||
*/
|
||||
int bitmask_size_{-1};
|
||||
|
||||
/**
|
||||
* Fine grained memory allocator for buffers used in collectives Routines
|
||||
*/
|
||||
HIPDefaultFinegrainedAllocator fine_grained_allocator_ {};
|
||||
|
||||
/**
|
||||
* @brief Collective routines work/sync buffer size
|
||||
*/
|
||||
size_t Wrk_Sync_buffer_size_{};
|
||||
|
||||
/**
|
||||
* @brief Collective routines work/sync buffer base ptr
|
||||
*/
|
||||
char* const Wrk_Sync_buffer_ptr_{nullptr};
|
||||
|
||||
/**
|
||||
* @brief Temporary buffer pointer pointing to the same address as
|
||||
* Wrk_Sync_buffer_ptr_, used to calculate the starting addresses of
|
||||
* different work and sync buffers.
|
||||
*/
|
||||
char *temp_Wrk_Sync_buff_ptr_{nullptr};
|
||||
|
||||
/**
|
||||
* @brief Array containing the addresses of the work/sync buffer bases
|
||||
* of other PEs
|
||||
*/
|
||||
char** Wrk_Sync_buffer_bases_{nullptr};
|
||||
|
||||
/**
|
||||
* @brief Initialize memory required for work/sync buffers and open IPC
|
||||
* handle on PE's Wrk_Sync_buffer_ptr.
|
||||
*/
|
||||
void init_wrk_sync_buffer();
|
||||
|
||||
/**
|
||||
* @brief Close IPC memory handles for work/sync buffers and deallocate
|
||||
* work/sync buffer.
|
||||
*/
|
||||
void cleanup_wrk_sync_buffer();
|
||||
|
||||
};
|
||||
|
||||
} // namespace rocshmem
|
||||
|
||||
@@ -48,6 +48,7 @@ __host__ IPCContext::IPCContext(Backend *b)
|
||||
g_ret = bp->g_ret;
|
||||
atomic_base_ptr = bp->atomic_ret->atomic_base_ptr;
|
||||
fence_pool = backend->fence_pool;
|
||||
Wrk_Sync_buffer_bases_ = backend->get_wrk_sync_bases();
|
||||
|
||||
orders_.store = detail::atomic::rocshmem_memory_order::memory_order_seq_cst;
|
||||
|
||||
@@ -165,4 +166,55 @@ __device__ void IPCContext::getmem_nbi_wave(void *dest, const void *source,
|
||||
getmem_wave(dest, source, nelems, pe);
|
||||
}
|
||||
|
||||
__device__ void IPCContext::internal_putmem(void *dest, const void *source,
|
||||
size_t nelems, int pe) {
|
||||
uint64_t L_offset =
|
||||
reinterpret_cast<char *>(dest) - Wrk_Sync_buffer_bases_[my_pe];
|
||||
memcpy(Wrk_Sync_buffer_bases_[pe] + L_offset,
|
||||
const_cast<void *>(source), nelems);
|
||||
}
|
||||
|
||||
__device__ void IPCContext::internal_getmem(void *dest, const void *source,
|
||||
size_t nelems, int pe) {
|
||||
const char *src_typed = reinterpret_cast<const char *>(source);
|
||||
uint64_t L_offset =
|
||||
const_cast<char *>(src_typed) - Wrk_Sync_buffer_bases_[my_pe];
|
||||
memcpy(dest, Wrk_Sync_buffer_bases_[pe] + L_offset, nelems);
|
||||
}
|
||||
|
||||
__device__ void IPCContext::internal_putmem_wg(void *dest, const void *source,
|
||||
size_t nelems, int pe) {
|
||||
uint64_t L_offset =
|
||||
reinterpret_cast<char *>(dest) - Wrk_Sync_buffer_bases_[my_pe];
|
||||
memcpy_wg(Wrk_Sync_buffer_bases_[pe] + L_offset,
|
||||
const_cast<void *>(source), nelems);
|
||||
__syncthreads();
|
||||
}
|
||||
|
||||
__device__ void IPCContext::internal_getmem_wg(void *dest, const void *source,
|
||||
size_t nelems, int pe) {
|
||||
const char *src_typed = reinterpret_cast<const char *>(source);
|
||||
uint64_t L_offset =
|
||||
const_cast<char *>(src_typed) - Wrk_Sync_buffer_bases_[my_pe];
|
||||
memcpy_wg(dest, Wrk_Sync_buffer_bases_[pe] + L_offset, nelems);
|
||||
__syncthreads();
|
||||
}
|
||||
|
||||
__device__ void IPCContext::internal_putmem_wave(void *dest,
|
||||
const void *source, size_t nelems, int pe) {
|
||||
uint64_t L_offset =
|
||||
reinterpret_cast<char *>(dest) - Wrk_Sync_buffer_bases_[my_pe];
|
||||
memcpy_wave(Wrk_Sync_buffer_bases_[pe] + L_offset,
|
||||
const_cast<void *>(source), nelems);
|
||||
}
|
||||
|
||||
__device__ void IPCContext::internal_getmem_wave(void *dest,
|
||||
const void *source, size_t nelems, int pe) {
|
||||
const char *src_typed = reinterpret_cast<const char *>(source);
|
||||
uint64_t L_offset =
|
||||
const_cast<char *>(src_typed) - Wrk_Sync_buffer_bases_[my_pe];
|
||||
memcpy_wave(dest, Wrk_Sync_buffer_bases_[pe] + L_offset,
|
||||
nelems);
|
||||
}
|
||||
|
||||
} // namespace rocshmem
|
||||
|
||||
@@ -239,6 +239,26 @@ class IPCContext : public Context {
|
||||
__device__ void internal_atomic_barrier(int pe, int PE_start, int stride,
|
||||
int n_pes, int64_t *pSync);
|
||||
|
||||
//internal functions used by collectives routines to write/read to
|
||||
//work/sync buffers
|
||||
__device__ void internal_putmem(void *dest, const void *source,
|
||||
size_t nelems, int pe);
|
||||
|
||||
__device__ void internal_getmem(void *dest, const void *source,
|
||||
size_t nelems, int pe);
|
||||
|
||||
__device__ void internal_putmem_wg(void *dest, const void *source,
|
||||
size_t nelems, int pe);
|
||||
|
||||
__device__ void internal_getmem_wg(void *dest, const void *source,
|
||||
size_t nelems, int pe);
|
||||
|
||||
__device__ void internal_putmem_wave(void *dest, const void *source,
|
||||
size_t nelems, int pe);
|
||||
|
||||
__device__ void internal_getmem_wave(void *dest, const void *source,
|
||||
size_t nelems, int pe);
|
||||
|
||||
//Temporary scratchpad memory used by internal barrier algorithms.
|
||||
int64_t *barrier_sync{nullptr};
|
||||
|
||||
@@ -252,6 +272,12 @@ class IPCContext : public Context {
|
||||
//Buffer to perform Atomic store to enforce memory ordering
|
||||
int *fence_pool{nullptr};
|
||||
|
||||
/**
|
||||
* @brief Array containing the addresses of the work/sync buffer bases
|
||||
* of other PEs
|
||||
*/
|
||||
char **Wrk_Sync_buffer_bases_{nullptr};
|
||||
|
||||
public:
|
||||
//TODO(Avinash):
|
||||
//Make tinfo private variable, it requires changes to the context
|
||||
|
||||
@@ -46,12 +46,12 @@ __device__ void IPCContext::internal_direct_barrier(int pe, int PE_start,
|
||||
|
||||
// Announce to other PEs that all have reached
|
||||
for (size_t i = 1, j = PE_start + stride; i < n_pes; ++i, j += stride) {
|
||||
put_nbi(&pSync[0], &flag_val, 1, j);
|
||||
internal_putmem(&pSync[0], &flag_val, sizeof(*pSync), j);
|
||||
}
|
||||
} else {
|
||||
// Mark current PE offset as reached
|
||||
size_t pe_offset = (pe - PE_start) / stride;
|
||||
put_nbi(&pSync[pe_offset], &flag_val, 1, PE_start);
|
||||
internal_putmem(&pSync[pe_offset], &flag_val, sizeof(*pSync), PE_start);
|
||||
#if defined(__gfx90a__)
|
||||
__threadfence_system();
|
||||
#endif /* __gfx90a__ */
|
||||
@@ -71,7 +71,7 @@ __device__ void IPCContext::internal_atomic_barrier(int pe, int PE_start,
|
||||
threadfence_system();
|
||||
|
||||
for (size_t i = 1, j = PE_start + stride; i < n_pes; ++i, j += stride) {
|
||||
put_nbi(&pSync[0], &flag_val, 1, j);
|
||||
internal_putmem(&pSync[0], &flag_val, sizeof(*pSync), j);
|
||||
}
|
||||
} else {
|
||||
amo_add<int64_t>(&pSync[0], flag_val, PE_start);
|
||||
|
||||
In neuem Issue referenzieren
Einen Benutzer sperren