Update(DeviceProxy): Dynamically Determine Memory Allocation Size & Remove Compile-Time size Calculations (#48)

* Update(DeviceProxy): Dynamically Determine Memory Allocation Size & Remove Compile-Time size Calculations

- Modified the Device proxy class to determine memory allocation size at runtime.
- Updated all classes that include the Device proxy to use dynamic memory allocation.
- Removed compile-time memory size calculations.
- Ensured the allocated number of backend queue data structures matches the number of RO device contexts.

[ROCm/rocshmem commit: eb5a38e806]
This commit is contained in:
Avinash Kethineedi
2025-02-24 15:11:46 -06:00
zatwierdzone przez GitHub
rodzic 95c4c0d428
commit 1831a1b33c
23 zmienionych plików z 302 dodań i 118 usunięć
@@ -486,9 +486,19 @@ std::string to_string(const ForwardList<TYPE, ALLOC>& list);
template <typename ALLOC, typename TYPE>
class ForwardListProxy {
using ProxyT = DeviceProxy<ALLOC, ForwardList<TYPE, ALLOC>, 1>;
using ProxyT = DeviceProxy<ALLOC, ForwardList<TYPE, ALLOC>>;
public:
ForwardList(size_t num_elems = 1) : proxy_{num_elems} {}
ForwardList(const ForwardList& other) = delete;
ForwardList& operator=(const ForwardList& other) = delete;
ForwardList(ForwardList&& other) = default;
ForwardList& operator=(ForwardList&& other) = default;
__host__ __device__ ForwardList<TYPE, ALLOC>* get() { return proxy_.get(); }
private:
@@ -262,7 +262,17 @@ class FreeListProxy {
public:
__host__ __device__ FreeListT* get() { return proxy_.get(); }
FreeListProxy() { new (proxy_.get()) FreeListT(); }
FreeListProxy(size_t num_elems = 1) : proxy_{num_elems} {
new (proxy_.get()) FreeListT();
}
FreeListProxy(const FreeListProxy& other) = delete;
FreeListProxy& operator=(const FreeListProxy& other) = delete;
FreeListProxy(FreeListProxy&& other) = default;
FreeListProxy& operator=(FreeListProxy&& other) = default;
~FreeListProxy() {
auto free_list = proxy_.get();
+21 -6
Wyświetl plik
@@ -30,21 +30,28 @@
namespace rocshmem {
template <typename ALLOCATOR, typename T, size_t SIZE_IN = 1>
template <typename ALLOCATOR, typename T>
class DeviceProxy {
public:
DeviceProxy() {
DeviceProxy() = default;
DeviceProxy(size_t num_elems) : num_elems_ {num_elems} {
/**
* @brief The allocation size of the internal memory
*
*/
size_t size_bytes = sizeof(T) * num_elems_;
/*
* Allocate memory and verify that the allocation worked.
*/
T* temp{nullptr};
allocator_.allocate(reinterpret_cast<void**>(&temp), SIZE_BYTES_);
allocator_.allocate(reinterpret_cast<void**>(&temp), size_bytes);
assert(temp);
/*
* Default memory provided by the allocation to recognizable bytes.
*/
memset(static_cast<void*>(temp), 0xBC, SIZE_BYTES_);
memset(static_cast<void*>(temp), 0xBC, size_bytes);
/*
* Pass the memory into a unique ptr for tracking.
@@ -58,6 +65,14 @@ class DeviceProxy {
ptr_ = up_.get();
}
DeviceProxy(const DeviceProxy& other) = delete;
DeviceProxy& operator=(const DeviceProxy& other) = delete;
DeviceProxy(DeviceProxy&& other) = default;
DeviceProxy& operator=(DeviceProxy&& other) = default;
/**
* @brief Return internal storage tracked by the Proxy.
*
@@ -98,9 +113,9 @@ class DeviceProxy {
T* ptr_{nullptr};
/**
* @brief The allocation size for the internal memory
* @brief Number of elements of type T to be allocated
*/
static constexpr size_t SIZE_BYTES_{sizeof(T) * SIZE_IN};
size_t num_elems_{};
};
} // namespace rocshmem
+9 -1
Wyświetl plik
@@ -36,10 +36,18 @@ class HdpProxy {
/*
* Placement new the memory which is allocated by proxy_
*/
HdpProxy() {
HdpProxy(size_t num_elems = 1) : proxy_{num_elems} {
new (proxy_.get()) HdpPolicy();
}
HdpProxy(const HdpProxy& other) = delete;
HdpProxy& operator=(const HdpProxy& other) = delete;
HdpProxy(HdpProxy&& other) = default;
HdpProxy& operator=(HdpProxy&& other) = default;
/*
* Since placement new is called in the constructor, then
* delete must be called manually.
@@ -221,7 +221,7 @@ void IPCBackend::create_new_team([[maybe_unused]] Team *parent_team,
/**
* Allocate device-side memory for team_world and
* construct a GPU_IB team in it
* construct a IPC team in it
*/
GPUIBTeam *new_team_obj;
CHECK_HIP(hipMalloc(&new_team_obj, sizeof(IPCTeam)));
@@ -41,8 +41,9 @@ class IPCDefaultContextProxy {
/*
* Placement new the memory which is allocated by proxy_
*/
explicit IPCDefaultContextProxy(IPCBackend* backend, TeamInfo *tinfo)
: constructed_{true} {
explicit IPCDefaultContextProxy(IPCBackend* backend, TeamInfo *tinfo,
size_t num_elems = 1)
: constructed_{true}, proxy_{num_elems} {
auto ctx{proxy_.get()};
new (ctx) IPCContext(reinterpret_cast<Backend*>(backend));
ctx->tinfo = tinfo;
@@ -101,10 +101,18 @@ class NotifierProxy {
using ProxyT = DeviceProxy<ALLOCATOR, Notifier<scope>>;
public:
NotifierProxy() {
NotifierProxy(size_t num_elems = 1) : proxy_{num_elems} {
new (proxy_.get()) Notifier<scope>();
}
NotifierProxy(const NotifierProxy& other) = delete;
NotifierProxy& operator=(const NotifierProxy& other) = delete;
NotifierProxy(NotifierProxy&& other) = default;
NotifierProxy& operator=(NotifierProxy&& other) = default;
~NotifierProxy() {
proxy_.get()->~Notifier<scope>();
}
@@ -160,13 +160,23 @@ class SlabHeap {
template <typename ALLOCATOR>
class SlabHeapProxy {
using ProxyT = DeviceProxy<ALLOCATOR, SlabHeap, 1>;
using ProxyT = DeviceProxy<ALLOCATOR, SlabHeap>;
public:
/*
* Placement new the memory which is allocated by proxy_
*/
SlabHeapProxy() { new (proxy_.get()) SlabHeap(); }
SlabHeapProxy(size_t num_elems = 1) : proxy_{num_elems} {
new (proxy_.get()) SlabHeap();
}
SlabHeapProxy(const SlabHeapProxy& other) = delete;
SlabHeapProxy& operator=(const SlabHeapProxy& other) = delete;
SlabHeapProxy(SlabHeapProxy&& other) = default;
SlabHeapProxy& operator=(SlabHeapProxy&& other) = default;
/*
* Since placement new is called in the constructor, then
@@ -51,7 +51,9 @@ class BackendProxy {
/*
* Placement new the memory which is allocated by proxy_
*/
BackendProxy() { new (proxy_.get()) BackendRegister(); }
BackendProxy(size_t num_elems = 1) : proxy_{num_elems} {
new (proxy_.get()) BackendRegister();
}
/*
* Since placement new is called in the constructor, then
@@ -45,7 +45,7 @@ namespace rocshmem {
extern rocshmem_ctx_t ROCSHMEM_HOST_CTX_DEFAULT;
ROBackend::ROBackend(MPI_Comm comm)
: profiler_proxy_(MAX_NUM_BLOCKS), Backend() {
: Backend() {
type = BackendType::RO_BACKEND;
if (auto maximum_num_contexts_str = getenv("ROCSHMEM_MAX_NUM_CONTEXTS")) {
@@ -54,6 +54,18 @@ ROBackend::ROBackend(MPI_Comm comm)
}
poll_block_count_ = maximum_num_contexts_;
profiler_proxy_ = ProfilerProxyT(maximum_num_contexts_);
int device_id;
hipDeviceProp_t device_props;
CHECK_HIP(hipGetDevice(&device_id));
CHECK_HIP(hipGetDeviceProperties(&device_props, device_id));
max_wg_size_ = device_props.maxThreadsPerBlock;
queue_ = Queue(maximum_num_contexts_, max_wg_size_, queue_size_);
transport_ = new MPITransport(comm, &queue_);
num_pes = transport_->getNumPes();
my_pe = transport_->getMyPe();
@@ -68,16 +80,18 @@ ROBackend::ROBackend(MPI_Comm comm)
bp->heap_ptr = &heap;
ro_window_proxy_ = new WindowProxyT(&heap, transport_->get_world_comm());
ro_window_proxy_ = new WindowProxyT(&heap, transport_->get_world_comm(),
num_windows_);
bp->heap_window_info = ro_window_proxy_->get();
initIPC();
init_g_ret(&heap, transport_->get_world_comm(), MAX_NUM_BLOCKS, &bp->g_ret);
init_g_ret(&heap, transport_->get_world_comm(), maximum_num_contexts_, &bp->g_ret);
allocate_atomic_region(&bp->atomic_ret, MAX_NUM_BLOCKS);
allocate_atomic_region(&bp->atomic_ret, maximum_num_contexts_);
transport_->initTransport(MAX_NUM_BLOCKS, &backend_proxy);
transport_->initTransport(maximum_num_contexts_, &backend_proxy);
host_interface = transport_->host_interface;
@@ -99,7 +113,8 @@ ROBackend::ROBackend(MPI_Comm comm)
default_context_proxy_ = DefaultContextProxyT(this, tinfo);
block_handle_proxy_ = BlockHandleProxyT(bp->g_ret, bp->atomic_ret, &queue_,
&ipcImpl, hdp_proxy_.get());
&ipcImpl, hdp_proxy_.get(),
maximum_num_contexts_);
setup_ctxs();
worker_thread = std::thread(&ROBackend::ro_net_poll, this);
@@ -255,6 +255,21 @@ class ROBackend : public Backend {
* @brief Holds maximum number of contexts used in library
*/
size_t maximum_num_contexts_{1024};
/**
* @brief Holds maximum threads per work-group
*/
int max_wg_size_{};
/**
* @brief Holds the queue size for each context
*/
size_t queue_size_{512};
/**
* @brief Number of MPI windows used for device contexts in RO Backend
*/
size_t num_windows_{32};
};
} // namespace rocshmem
@@ -33,7 +33,7 @@ namespace rocshmem {
struct BlockHandle {
ROStats profiler{};
queue_element_t *queue{nullptr};
uint64_t queue_size{QUEUE_SIZE};
uint64_t queue_size{};
volatile uint64_t read_index{};
volatile uint64_t write_index{};
volatile uint64_t *host_read_index{};
@@ -53,7 +53,10 @@ class DefaultBlockHandleProxy {
DefaultBlockHandleProxy() = default;
DefaultBlockHandleProxy(char *g_ret, atomic_ret_t *atomic_ret, Queue *queue,
IpcImpl *ipc_policy, HdpPolicy *hdp_policy) {
IpcImpl *ipc_policy, HdpPolicy *hdp_policy,
size_t num_elems = 1)
: proxy_{num_elems} {
// TODO(bpotter): create a default queue for this queue descriptor
auto queue_descriptor{queue->descriptor(0)};
auto block_handle{proxy_.get()};
@@ -73,6 +76,14 @@ class DefaultBlockHandleProxy {
block_handle->lock = 0;
}
DefaultBlockHandleProxy(const DefaultBlockHandleProxy& other) = delete;
DefaultBlockHandleProxy& operator=(const DefaultBlockHandleProxy& other) = delete;
DefaultBlockHandleProxy(DefaultBlockHandleProxy&& other) = default;
DefaultBlockHandleProxy& operator=(DefaultBlockHandleProxy&& other) = default;
__host__ __device__ BlockHandle *get() { return proxy_.get(); }
private:
@@ -83,15 +94,17 @@ using DefaultBlockHandleProxyT = DefaultBlockHandleProxy<HIPAllocator>;
template <typename ALLOCATOR>
class BlockHandleProxy {
static constexpr size_t MAX_NUM_BLOCKS{65536};
using ProxyT = DeviceProxy<ALLOCATOR, BlockHandle, MAX_NUM_BLOCKS>;
using ProxyT = DeviceProxy<ALLOCATOR, BlockHandle>;
public:
BlockHandleProxy() = default;
BlockHandleProxy(char *g_ret, atomic_ret_t *atomic_ret, Queue *queue,
IpcImpl *ipc_policy, HdpPolicy *hdp_policy) {
for (size_t i{0}; i < MAX_NUM_BLOCKS; i++) {
IpcImpl *ipc_policy, HdpPolicy *hdp_policy,
size_t max_blocks)
: proxy_{max_blocks} {
for (size_t i{0}; i < max_blocks; i++) {
auto queue_descriptor{queue->descriptor(i)};
auto block_handle{&proxy_.get()[i]};
block_handle->profiler.resetStats();
@@ -111,10 +124,20 @@ class BlockHandleProxy {
}
}
BlockHandleProxy(const BlockHandleProxy& other) = delete;
BlockHandleProxy& operator=(const BlockHandleProxy& other) = delete;
BlockHandleProxy(BlockHandleProxy&& other) = default;
BlockHandleProxy& operator=(BlockHandleProxy&& other) = default;
__host__ __device__ BlockHandle *get() { return proxy_.get(); }
private:
ProxyT proxy_{};
size_t num_blocks_{};
};
using BlockHandleProxyT = BlockHandleProxy<HIPAllocator>;
@@ -42,8 +42,9 @@ class DefaultContextProxy {
/*
* Placement new the memory which is allocated by proxy_
*/
explicit DefaultContextProxy(ROBackend* backend, TeamInfo *tinfo)
: constructed_{true} {
explicit DefaultContextProxy(ROBackend* backend, TeamInfo *tinfo,
size_t num_elems = 1)
: constructed_{true}, proxy_{num_elems} {
auto ctx{proxy_.get()};
new (ctx) ROContext(reinterpret_cast<Backend*>(backend), -1);
rocshmem_ctx_t local{ctx, tinfo};
@@ -87,54 +88,6 @@ class DefaultContextProxy {
using DefaultContextProxyT = DefaultContextProxy<HIPAllocator>;
template <typename ALLOCATOR>
class BlockContextProxy {
static constexpr size_t MAX_NUM_BLOCKS{65536};
using ProxyT = DeviceProxy<ALLOCATOR, ROContext, MAX_NUM_BLOCKS>;
public:
BlockContextProxy() = default;
explicit BlockContextProxy(ROBackend* backend) : constructed_{true} {
auto* ctx{proxy_.get()};
for (size_t i{0}; i < MAX_NUM_BLOCKS; i++) {
auto ctx_p{&ctx[i]};
new (ctx_p) ROContext(reinterpret_cast<Backend*>(backend), i);
}
}
~BlockContextProxy() {
if (constructed_) {
auto* ctx{proxy_.get()};
for (size_t i{0}; i < MAX_NUM_BLOCKS; i++) {
auto ctx_p{&ctx[i]};
ctx_p->~ROContext();
}
}
}
BlockContextProxy(const BlockContextProxy& other) = delete;
BlockContextProxy& operator=(const BlockContextProxy& other) = delete;
BlockContextProxy(BlockContextProxy&& other) = default;
BlockContextProxy& operator=(BlockContextProxy&& other) = default;
__host__ __device__ Context* get() { return proxy_.get(); }
private:
/*
* @brief Memory managed by the lifetime of this object
*/
ProxyT proxy_{};
/*
* @brief denotes if an objects was constructed in proxy
*/
bool constructed_{false};
};
} // namespace rocshmem
#endif // LIBRARY_SRC_REVERSE_OFFLOAD_CONTEXT_PROXY_HPP_
@@ -50,7 +50,7 @@ __host__ ROContext::ROContext(Backend *b, size_t block_id)
auto block_base{backend->block_handle_proxy_.get()};
block_handle = &block_base[block_id];
}
ro_net_win_id = block_id % backend->ro_window_proxy_->MAX_NUM_WINDOWS;
ro_net_win_id = block_id % backend->ro_window_proxy_->get_num_MPI_windows();
ipcImpl_.ipc_bases = b->ipcImpl.ipc_bases;
ipcImpl_.shm_size = b->ipcImpl.shm_size;
@@ -51,13 +51,13 @@ typedef NullStats<RO_NUM_STATS> ROStats;
template <typename ALLOCATOR>
class ProfilerProxy {
static constexpr size_t MAX_NUM_BLOCKS{65536};
using ProxyT = DeviceProxy<ALLOCATOR, ROStats, MAX_NUM_BLOCKS>;
using ProxyT = DeviceProxy<ALLOCATOR, ROStats>;
public:
explicit ProfilerProxy(size_t num_blocks) : num_elem_{num_blocks} {
assert(num_blocks <= MAX_NUM_BLOCKS);
ProfilerProxy() = default;
explicit ProfilerProxy(size_t num_blocks)
: num_elem_{num_blocks}, proxy_{num_blocks} {
auto *stat{proxy_.get()};
assert(stat);
@@ -68,6 +68,14 @@ class ProfilerProxy {
}
}
ProfilerProxy(const ProfilerProxy& other) = delete;
ProfilerProxy& operator=(const ProfilerProxy& other) = delete;
ProfilerProxy(ProfilerProxy&& other) = default;
ProfilerProxy& operator=(ProfilerProxy&& other) = default;
~ProfilerProxy() {
auto *stat{proxy_.get()};
assert(stat);
@@ -33,8 +33,22 @@ Queue::Queue() {
}
}
Queue::Queue(size_t max_queues, size_t max_wg_size, size_t queue_size)
: max_queues_{max_queues},
max_wg_size_{max_wg_size},
queue_size_{queue_size},
queue_proxy_{max_queues, queue_size},
queue_desc_proxy_{max_queues, max_wg_size} {
gpu_queue = true;
char *value{nullptr};
if ((value = getenv("RO_NET_CPU_QUEUE")) != nullptr) {
gpu_queue = false;
}
}
uint64_t Queue::get_read_index(uint64_t queue_index) {
return descriptor(queue_index)->read_index % QUEUE_SIZE;
return descriptor(queue_index)->read_index % queue_size_;
}
void Queue::increment_read_index(uint64_t queue_index) {
@@ -92,7 +106,7 @@ void Queue::notify(int blockId, int threadId) {
}
uint64_t Queue::size() {
return QUEUE_SIZE;
return queue_size_;
}
__host__ __device__ queue_desc_t* Queue::descriptor(uint64_t index) {
@@ -35,6 +35,8 @@ class Queue {
public:
Queue();
Queue(size_t max_queues, size_t max_threads_per_block, size_t queue_size);
bool process(uint64_t queue_index, MPITransport* transport);
uint64_t get_read_index(uint64_t queue_index);
@@ -67,6 +69,12 @@ class Queue {
HdpProxy<HIPHostAllocator> hdp_proxy_{};
bool gpu_queue{false};
size_t max_queues_{};
size_t max_wg_size_{};
size_t queue_size_{};
};
} // namespace rocshmem
@@ -58,32 +58,48 @@ typedef struct queue_desc {
template <typename ALLOCATOR>
class QueueDescProxy {
static constexpr size_t MAX_NUM_BLOCKS{65536};
static constexpr size_t MAX_THREADS_PER_BLOCK{1024};
static constexpr size_t MAX_THREADS{MAX_NUM_BLOCKS * MAX_THREADS_PER_BLOCK};
using ProxyT = DeviceProxy<ALLOCATOR, queue_desc_t, MAX_NUM_BLOCKS>;
using ProxyStatusT = DeviceProxy<ALLOCATOR, char, MAX_THREADS>;
using ProxyT = DeviceProxy<ALLOCATOR, queue_desc_t>;
using ProxyStatusT = DeviceProxy<ALLOCATOR, char>;
public:
QueueDescProxy() {
QueueDescProxy() = default;
QueueDescProxy(size_t max_queues, size_t max_threads_per_queue)
: max_queues_{max_queues}, max_threads_per_queue_{max_threads_per_queue},
max_threads_{max_queues * max_threads_per_queue}, proxy_{max_queues},
proxy_status_{max_queues * max_threads_per_queue} {
auto *status{proxy_status_.get()};
size_t status_bytes{sizeof(char) * MAX_THREADS};
size_t status_bytes{sizeof(char) * max_threads_};
memset(status, 0, status_bytes);
auto *queue_descs{proxy_.get()};
for (size_t i{0}; i < MAX_NUM_BLOCKS; i++) {
for (size_t i{0}; i < max_queues_; i++) {
queue_descs[i].read_index = 0;
queue_descs[i].write_index = 0;
queue_descs[i].status = status + i * MAX_THREADS_PER_BLOCK;
queue_descs[i].status = status + i * max_threads_per_queue_;
}
}
QueueDescProxy(const QueueDescProxy& other) = delete;
QueueDescProxy& operator=(const QueueDescProxy& other) = delete;
QueueDescProxy(QueueDescProxy&& other) = default;
QueueDescProxy& operator=(QueueDescProxy&& other) = default;
__host__ __device__ queue_desc_t *get() { return proxy_.get(); }
private:
ProxyT proxy_{};
ProxyStatusT proxy_status_{};
size_t max_queues_{};
size_t max_threads_per_queue_{};
size_t max_threads_{};
};
using QueueDescProxyT = QueueDescProxy<HIPDefaultFinegrainedAllocator>;
@@ -35,8 +35,6 @@
namespace rocshmem {
constexpr size_t QUEUE_SIZE{512};
struct cacheline_t {
volatile char valid;
volatile char padding[63];
@@ -82,7 +80,17 @@ class QueueElementProxy {
using ProxyT = DeviceProxy<ALLOCATOR, queue_element_t>;
public:
QueueElementProxy() { new (proxy_.get()) queue_element_t(); }
QueueElementProxy(size_t num_elems = 1) : proxy_{num_elems} {
new (proxy_.get()) queue_element_t();
}
QueueElementProxy(const QueueElementProxy& other) = delete;
QueueElementProxy& operator=(const QueueElementProxy& other) = delete;
QueueElementProxy(QueueElementProxy&& other) = default;
QueueElementProxy& operator=(QueueElementProxy&& other) = default;
~QueueElementProxy() { proxy_.get()->~queue_element_t(); }
@@ -96,11 +104,8 @@ using QueueElementProxyT = QueueElementProxy<PosixAligned64Allocator>;
template <typename ALLOCATOR>
class QueueProxy {
static constexpr size_t MAX_NUM_BLOCKS{65536};
static constexpr size_t TOTAL_QUEUE_ELEMENTS{QUEUE_SIZE * MAX_NUM_BLOCKS};
using ProxyT = DeviceProxy<ALLOCATOR, queue_element_t *, MAX_NUM_BLOCKS>;
using ProxyPerBlockT =
DeviceProxy<ALLOCATOR, queue_element_t, TOTAL_QUEUE_ELEMENTS>;
using ProxyT = DeviceProxy<ALLOCATOR, queue_element_t *>;
using ProxyPerBlockT = DeviceProxy<ALLOCATOR, queue_element_t>;
public:
/**
@@ -109,23 +114,44 @@ class QueueProxy {
* The circular queues are indexed using the device block-id so that each
* each block has its own queue.
*/
QueueProxy() {
QueueProxy() = default;
QueueProxy(size_t max_queues, size_t queue_size)
: max_queues_{max_queues}, queue_size_{queue_size},
total_queue_elements_{queue_size * max_queues},
queue_proxy_{max_queues},
per_block_queue_proxy_{queue_size * max_queues} {
auto **queue_array{queue_proxy_.get()};
auto *per_block_queue{per_block_queue_proxy_.get()};
for (size_t i{0}; i < MAX_NUM_BLOCKS; i++) {
queue_array[i] = per_block_queue + i * QUEUE_SIZE;
for (size_t i{0}; i < max_queues_; i++) {
queue_array[i] = per_block_queue + i * queue_size;
}
size_t total_queue_element_bytes{sizeof(queue_element_t) *
TOTAL_QUEUE_ELEMENTS};
total_queue_elements_};
memset(per_block_queue, 0, total_queue_element_bytes);
}
QueueProxy(const QueueProxy& other) = delete;
QueueProxy& operator=(const QueueProxy& other) = delete;
QueueProxy(QueueProxy&& other) = default;
QueueProxy& operator=(QueueProxy&& other) = default;
__host__ __device__ queue_element_t **get() { return queue_proxy_.get(); }
private:
ProxyT queue_proxy_{};
ProxyPerBlockT per_block_queue_proxy_{};
size_t max_queues_{};
size_t queue_size_{};
size_t total_queue_elements_{};
};
using QueueProxyT = QueueProxy<HIPHostAllocator>;
@@ -39,14 +39,24 @@ class ROTeamProxy {
/*
* Placement new the memory which is allocated by proxy_
*/
ROTeamProxy(Backend* backend, MPI_Comm comm, int pe, int npes)
: my_pe_(pe), team_size_(npes) {
ROTeamProxy(Backend* backend, MPI_Comm comm, int pe, int npes,
size_t num_elems = 1)
: my_pe_(pe), team_size_(npes), proxy_{num_elems} {
MPI_Comm_dup(comm, &team_world_comm_);
new (proxy_.get()) ROTeam(backend, wrt_parent_.get(), wrt_world_.get(),
team_size_, my_pe_, team_world_comm_);
}
ROTeamProxy(const ROTeamProxy& other) = delete;
ROTeamProxy& operator=(const ROTeamProxy& other) = delete;
ROTeamProxy(ROTeamProxy&& other) = default;
ROTeamProxy& operator=(ROTeamProxy&& other) = default;
/*
* Since placement new is called in the constructor, then
* delete must be called manually.
@@ -36,10 +36,19 @@ class TeamInfoProxy {
/*
* Placement new the memory which is allocated by proxy_
*/
TeamInfoProxy(Team* parent_team, int pe_start, int stride, int size) {
TeamInfoProxy(Team* parent_team, int pe_start, int stride, int size,
size_t num_elems = 1) : proxy_{num_elems} {
new (proxy_.get()) TeamInfo(parent_team, pe_start, stride, size);
}
TeamInfoProxy(const TeamInfoProxy& other) = delete;
TeamInfoProxy& operator=(const TeamInfoProxy& other) = delete;
TeamInfoProxy(TeamInfoProxy&& other) = default;
TeamInfoProxy& operator=(TeamInfoProxy&& other) = default;
/*
* Since placement new is called in the constructor, then
* delete must be called manually.
@@ -31,25 +31,32 @@ namespace rocshmem {
template <typename ALLOCATOR>
class WindowProxy {
public:
static constexpr size_t MAX_NUM_WINDOWS{32};
private:
using ProxyT = DeviceProxy<ALLOCATOR, WindowInfo *, MAX_NUM_WINDOWS>;
using ProxyT = DeviceProxy<ALLOCATOR, WindowInfo *>;
public:
/*
* Placement new the memory which is allocated by proxy_
*/
WindowProxy(SymmetricHeap *heap, MPI_Comm comm) {
WindowProxy(SymmetricHeap *heap, MPI_Comm comm, size_t num_windows)
: num_windows_{num_windows}, proxy_{num_windows} {
auto *window_info{proxy_.get()};
for (size_t i{0}; i < MAX_NUM_WINDOWS; i++) {
for (size_t i{0}; i < num_windows_; i++) {
window_info[i] =
new WindowInfo(comm, heap->get_local_heap_base(), heap->get_size());
}
}
WindowProxy(const WindowProxy& other) = delete;
WindowProxy& operator=(const WindowProxy& other) = delete;
WindowProxy(WindowProxy&& other) = default;
WindowProxy& operator=(WindowProxy&& other) = default;
/*
* Since placement new is called in the constructor, then
* delete must be called manually.
@@ -57,7 +64,7 @@ class WindowProxy {
~WindowProxy() {
auto *window_info{proxy_.get()};
for (size_t i{0}; i < MAX_NUM_WINDOWS; i++) {
for (size_t i{0}; i < num_windows_; i++) {
delete window_info[i];
}
}
@@ -67,11 +74,17 @@ class WindowProxy {
*/
__host__ __device__ WindowInfo **get() { return proxy_.get(); }
__host__ size_t get_num_MPI_windows() { return num_windows_; }
private:
/*
* @brief Memory managed by the lifetime of this object
*/
ProxyT proxy_{};
/**
* @brief Number of MPI windows used for device contexts in RO Backend
*/
size_t num_windows_{32};
};
using WindowProxyT = WindowProxy<HostAllocator>;
@@ -111,9 +111,19 @@ class ABQLBlockMutex {
template <typename ALLOCATOR>
class ABQLBlockMutexProxy {
using ProxyT = DeviceProxy<ALLOCATOR, ABQLBlockMutex, 1>;
using ProxyT = DeviceProxy<ALLOCATOR, ABQLBlockMutex>;
public:
ABQLBlockMutexProxy(size_t num_elems = 1) : proxy_{num_elems} {}
ABQLBlockMutexProxy(const ABQLBlockMutexProxy& other) = delete;
ABQLBlockMutexProxy& operator=(const ABQLBlockMutexProxy& other) = delete;
ABQLBlockMutexProxy(ABQLBlockMutexProxy&& other) = default;
ABQLBlockMutexProxy& operator=(ABQLBlockMutexProxy&& other) = default;
__host__ __device__ ABQLBlockMutex* get() { return proxy_.get(); }
private: