Add AtomicWFQueue implementation and tests (#62)

* feat: Add AtomicWFQueue implementation
  - Implemented wavefront-safe atomic FIFO queue ensuring first-come, first-serve order
  - Added efficient synchronization using atomics
  - Enhanced `dequeue` to wait until an element is available

* test: Add GTest for AtomicWFQueue
  - Implemented unit tests for AtomicWFQueue using GoogleTest framework
  - Added tests for `enqueue`, `dequeue`, and edge cases
  - Ensured synchronization behavior and correctness under concurrent conditions

* Add assert in `enqueue` and update atomics
  - Added an assert in the `enqueue` function to ensure it fails if the queue is full

[ROCm/rocshmem commit: b84b5638cf]
Este commit está contenido en:
Avinash Kethineedi
2025-03-25 00:45:19 -05:00
cometido por GitHub
padre baca5fd7a1
commit 370e2dda09
Se han modificado 5 ficheros con 891 adiciones y 0 borrados
@@ -0,0 +1,358 @@
/******************************************************************************
* Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*****************************************************************************/
#ifndef LIBRARY_SRC_CONTAINERS_ATOMIC_WF_QUEUE_HPP_
#define LIBRARY_SRC_CONTAINERS_ATOMIC_WF_QUEUE_HPP_
#include <hip/hip_runtime.h>
#include "../memory/hip_allocator.hpp"
#include "../sync/abql_block_mutex.hpp"
#include "../src/util.hpp"
namespace rocshmem {
/*****************************************************************************
******************************* WAVE FREE LIST ******************************
*****************************************************************************/
template <typename TYPE, typename ALLOCATOR = HIPDefaultFinegrainedAllocator>
class AtomicWFQueue {
using MutexProxyType = ABQLBlockMutexProxy<ALLOCATOR>;
using MutexType = ABQLBlockMutex;
/**
* @brief A lock guard for ticket-based locks that follows the design of
* `std::lock_guard`.
*
* @tparam MUTEX The type of the ticket-based mutex to lock.
*/
template <typename MUTEX>
struct TicketLockGuard {
/**
* @brief Constructs the `TicketLockGuard` and locks the mutex.
*
* @param m Mutex to take ownership of.
*/
__device__ explicit TicketLockGuard(MUTEX& m) : mutex_{m} {
ticket_ = mutex_.lock();
__threadfence();
}
/**
* @brief Lock guards are not copyable
*/
__device__ TicketLockGuard(const TicketLockGuard&) = delete;
/**
* @brief Lock guards are not moveable
*/
__device__ TicketLockGuard(TicketLockGuard&&) = delete;
/**
* @brief Destructor the unlocks the mutex.
*/
__device__ ~TicketLockGuard() {
__threadfence();
mutex_.unlock(ticket_);
}
private:
using TicketT = uint64_t;
MUTEX& mutex_;
TicketT ticket_;
};
public:
/**
* @brief Construct a new AtomicWFQueue object
*
* @param allocator Allocator to use for allocating internal structures of the
* AtomicWFQueue.
*/
explicit AtomicWFQueue(const ALLOCATOR& allocator = ALLOCATOR());
/**
* @brief Destroy the AtomicWFQueue object
*/
~AtomicWFQueue();
/**
* @brief Enqueues an element into the AtomicWFQueue.
*
* This function inserts the specified value at the position indicated by
* the `tail_` of the AtomicWFQueue and increases the AtomicWFQueue size
* by one. The enqueue operation follows a first-come, first-serve
* execution order.
*
* @param val The value to be inserted into the AtomicWFQueue.
*/
__device__ void enqueue(const TYPE& val);
/**
* @brief Dequeues an element from the AtomicWFQueue.
*
* This function dequeues the element pointed to by the `head_` of the
* AtomicWFQueue and decreases the AtomicWFQueue size by one. If the
* AtomicWFQueue is empty, the function waits until an element becomes
* available. The dequeue operation follows a first-come, first-serve
* execution order.
*
* @return The dequeued element from the AtomicWFQueue.
*/
__device__ TYPE dequeue();
/**
* @brief Inserts a new element at the end of the AtomicWFQueue.
*
* This function adds the specified value to the end of the AtomicWFQueue,
* updating the `tail_` and `curr_size_` accordingly. It is intended for
* initializing the AtomicWFQueue with initial values.
*
* @note This function is not thread-safe and should only be used during
* the AtomicWFQueue initialization phase or in scenarios where thread
* safety is not a concern.
*
* @param val The value to be inserted into the AtomicWFQueue.
*/
__host__ void push(const TYPE& val);
/**
* @brief Allocates and initializes the AtomicWFQueue.
*
* This function allocates memory for the AtomicWFQueue with the specified
* size and initializes the AtomicWFQueue's head, tail, current size, and
* maximum size variables to their appropriate starting values.
*
* @param size The maximum number of elements the AtomicWFQueue can hold.
*/
__host__ void allocate_queue(unsigned int size);
/**
* @brief Deallocates the AtomicWFQueue and resets its internal variables.
*
* This function frees the memory allocated for the AtomicWFQueue and resets
* the AtomicWFQueue's internal variables such as head, tail, current size,
* and maximum size to their default or zero-initialized values.
*/
__host__ void deallocate_queue();
/**
* @brief Retrieves the logical lane ID of the calling thread.
*
* This function returns the active logical lane ID of the current thread
* within the wavefront. The logical lane ID uniquely identifies
* the thread's position among active threads in the wavefront.
*
* @return The logical lane ID of the active thread within the wavefront.
*/
__device__ unsigned int active_logical_lane_id();
/**
* @brief Broadcasts a value to other threads in the wavefront.
*
* This function broadcasts the specified value to all active threads
* in the wavefront. If `lowest_active` is true, the value is broadcasted
* from the thread with the lowest active lane ID.
*
* @param lowest_active If true, broadcasting starts from the lowest
* active thread in the wavefront.
* @param val The value to be broadcasted.
*
* @return The broadcasted value received by each thread in the wavefront.
*/
__device__ TYPE broadcast_lds(bool lowest_active, TYPE val);
/**
* @brief Retrieves the maximum capacity of the AtomicWFQueue.
*
* This function returns the total size of the AtomicWFQueue, representing
* the maximum number of elements it can hold.
*
* @return The maximum capacity of the AtomicWFQueue.
*/
__host__ __device__ int get_queue_size() {
return size_;
}
/**
* @brief Retrieves the current number of elements in the AtomicWFQueue.
*
* This function returns the current size of the AtomicWFQueue, representing
* the total number of elements currently stored.
*
* @return The current number of elements in the AtomicWFQueue.
*/
__host__ __device__ int get_curr_size() {
return curr_size_;
}
/**
* @brief Retrieves the tail index of the AtomicWFQueue.
*
* This function returns the current index of the tail in the
* AtomicWFQueue, which represents the position where the next
* element will be enqueued.
*
* @return The index of the tail in the AtomicWFQueue.
*/
__host__ __device__ int get_tail() {
return tail_;
}
/**
* @brief Retrieves the head index of the AtomicWFQueue.
*
* This function returns the current index of the head in the
* AtomicWFQueue, which represents the position of the next element
* to be dequeued.
*
* @return The index of the head in the AtomicWFQueue.
*/
__host__ __device__ int get_head() {
return head_;
}
private:
__device__ int atomic_load(const int* address) {
return __hip_atomic_load(address, __ATOMIC_SEQ_CST,
__HIP_MEMORY_SCOPE_AGENT);
}
__device__ void atomic_store(int* address, const int val) {
__hip_atomic_store(address, val, __ATOMIC_SEQ_CST,
__HIP_MEMORY_SCOPE_AGENT);
}
__device__ void atomic_add(int* address, const int val) {
__hip_atomic_fetch_add(address, val, __ATOMIC_SEQ_CST,
__HIP_MEMORY_SCOPE_AGENT);
}
__device__ void atomic_sub(int* address, const int val) {
__hip_atomic_fetch_sub(address, val, __ATOMIC_SEQ_CST,
__HIP_MEMORY_SCOPE_AGENT);
}
/**
* @brief Checks if the AtomicWFQueue is full.
*
* This function determines whether the AtomicWFQueue has reached its
* maximum capacity. It is used to prevent overflow conditions during
* enqueue operations.
*
* @return true if the AtomicWFQueue is full, false otherwise.
*/
__device__ bool is_full() {
return atomic_load(&curr_size_) == size_;
}
/**
* @brief Checks if the AtomicWFQueue is empty.
*
* This function determines whether the AtomicWFQueue has no elements
* available for dequeue operations. It is used to prevent underflow
* conditions.
*
* @return true if the AtomicWFQueue is empty, false otherwise.
*/
__device__ bool is_empty() {
return atomic_load(&curr_size_) == 0;
}
/**
* @brief Internal memory allocator used to create internal structures of
* the AtomicWFQueue.
*/
ALLOCATOR allocator_{};
/**
* @brief Points to the index of first element in the AtomicWFQueue.
*/
int head_{};
/**
* @brief Points to the next empty slot in the AtomicWFQueue.
*/
int tail_{};
/**
* @brief Size of the AtomicWFQueue.
*/
int size_{};
/**
* @brief Current size of the AtomicWFQueue.
*/
int curr_size_{};
/**
* @brief Pointer to AtomicWFQueue memory
*/
TYPE *queue_{nullptr};
/**
* @brief Mutex protecting the AtomicWFQueue mutations during dequeue.
*/
MutexProxyType dequeue_mutex_;
/**
* @brief Mutex protecting the AtomicWFQueue mutations during enqueue_mutex.
*/
MutexProxyType enqueue_mutex_;
};
template <typename ALLOCATOR, typename TYPE>
class AtomicWFQueueProxy {
using AtomicWFQueueT = AtomicWFQueue<TYPE, ALLOCATOR>;
using ProxyT = DeviceProxy<ALLOCATOR, AtomicWFQueueT>;
public:
__host__ __device__ AtomicWFQueueT* get() { return proxy_.get(); }
AtomicWFQueueProxy(size_t num_elems = 1) : proxy_{num_elems} {
new (proxy_.get()) AtomicWFQueueT();
}
AtomicWFQueueProxy(const AtomicWFQueueProxy& other) = delete;
AtomicWFQueueProxy& operator=(const AtomicWFQueueProxy& other) = delete;
AtomicWFQueueProxy(AtomicWFQueueProxy&& other) = default;
AtomicWFQueueProxy& operator=(AtomicWFQueueProxy&& other) = default;
~AtomicWFQueueProxy() {
auto atomic_wf_queue = proxy_.get();
atomic_wf_queue->deallocate_queue();
atomic_wf_queue->~AtomicWFQueue();
}
private:
ProxyT proxy_{};
};
} // namespace rocshmem
#endif // LIBRARY_SRC_CONTAINERS_ATOMIC_WF_QUEUE_HPP_
@@ -0,0 +1,163 @@
/******************************************************************************
* Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*****************************************************************************/
#include "atomic_wf_queue.hpp"
#include <iostream>
namespace rocshmem {
/*****************************************************************************
******************************* WAVE FREE LIST ******************************
*****************************************************************************/
template <typename TYPE, typename ALLOCATOR>
AtomicWFQueue<TYPE, ALLOCATOR>::~AtomicWFQueue() {}
template <typename TYPE, typename ALLOCATOR>
__host__ void AtomicWFQueue<TYPE, ALLOCATOR>::deallocate_queue() {
if (queue_ != nullptr) {
allocator_.deallocate((void*)queue_);
queue_ = nullptr;
}
size_ = 0;
curr_size_ = 0;
head_ = 0;
tail_ = 0;
}
template <typename TYPE, typename ALLOCATOR>
AtomicWFQueue<TYPE, ALLOCATOR>::AtomicWFQueue(const ALLOCATOR& allocator)
: allocator_{allocator}, size_{0}, curr_size_{0}, head_{0}, tail_{0} {}
template <typename TYPE, typename ALLOCATOR>
__host__ void AtomicWFQueue<TYPE, ALLOCATOR>::allocate_queue(
unsigned int size) {
size_ = size;
head_ = 0;
tail_ = 0;
curr_size_ = 0;
allocator_.allocate(reinterpret_cast<void**>(&queue_),
sizeof(TYPE) * size_);
}
template <typename TYPE, typename ALLOCATOR>
__host__ void AtomicWFQueue<TYPE, ALLOCATOR>::push(const TYPE& val) {
if (curr_size_ < size_) {
queue_[tail_] = val;
tail_ = (tail_ + 1) % size_;
curr_size_++;
}
else {
std::cerr << "AtomicWfQueue is full: " << curr_size_
<< " elements" << std::endl;
}
}
template <typename TYPE, typename ALLOCATOR>
__device__ unsigned int
AtomicWFQueue<TYPE, ALLOCATOR>::active_logical_lane_id() {
uint64_t ballot{__ballot(1)};
uint64_t my_physical_lane_id{__lane_id()};
uint64_t all_ones_mask = -1;
uint64_t lane_mask{all_ones_mask << my_physical_lane_id};
uint64_t inverted_mask{~lane_mask};
uint64_t lower_active_lanes{ballot & inverted_mask};
unsigned int my_logical_lane_id{__popcll(lower_active_lanes)};
return my_logical_lane_id;
}
template <typename TYPE, typename ALLOCATOR>
__device__ TYPE AtomicWFQueue<TYPE, ALLOCATOR>::broadcast_lds(
bool lowest_active, TYPE value) {
/**
* Shared array to broadcast data within each wavefront
* Max threads per block = 1024, wavefront size = 64 (in most GPUs)
* Maximum array size required = 1024/64 = 16
*/
constexpr size_t SIZE = 1024 / WF_SIZE;
__shared__ TYPE value_per_warp[SIZE];
auto wavefront_id {get_flat_block_id() / WF_SIZE};
if (lowest_active) {
value_per_warp[wavefront_id] = value;
__threadfence_block();
}
return value_per_warp[wavefront_id];
}
template <typename TYPE, typename ALLOCATOR>
__device__ void AtomicWFQueue<TYPE, ALLOCATOR>::enqueue(const TYPE& val) {
unsigned int my_active_lane_id {active_logical_lane_id()};
bool is_lowest_active_lane {my_active_lane_id == 0};
if (is_lowest_active_lane) {
/**
* Prevents multiple wavefronts from simultaneously entering the enqueue
* operation. Ensures a first-come, first-serve execution order
*/
TicketLockGuard<MutexType> guard(*enqueue_mutex_.get());
/**
* There should always be space available.
* If the queue is full, it indicates an unexpected issue.
*/
assert(!is_full());
int next_tail = (tail_ + 1) % size_;
queue_[tail_] = val;
tail_ = next_tail;
atomic_add(&curr_size_, 1);
}
}
template <typename TYPE, typename ALLOCATOR>
__device__ TYPE AtomicWFQueue<TYPE, ALLOCATOR>::dequeue() {
TYPE ret_val {TYPE()};
unsigned int my_active_lane_id {active_logical_lane_id()};
bool is_lowest_active_lane {my_active_lane_id == 0};
if (is_lowest_active_lane) {
/**
* Prevents multiple wavefronts from simultaneously entering the dequeue
* operation. Ensures a first-come, first-serve execution order
*/
TicketLockGuard<MutexType> guard(*dequeue_mutex_.get());
// queue is empty, wait until data is available
while (is_empty()) {}
int next_head = (head_ + 1) % size_;
ret_val = queue_[head_];
head_ = next_head;
atomic_sub(&curr_size_, 1);
}
ret_val = broadcast_lds(is_lowest_active_lane, ret_val);
// TYPE should support + operation
ret_val += my_active_lane_id;
return ret_val;
}
} // namespace rocshmem
@@ -92,6 +92,7 @@ target_sources(
free_list_gtest.cpp
#context_ipc_gtest.cpp
wavefront_size_gtest.cpp
atomic_wf_queue_gtest.cpp
)
if (USE_IPC)
@@ -0,0 +1,199 @@
/******************************************************************************
* Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*****************************************************************************/
#include "atomic_wf_queue_gtest.hpp"
using namespace rocshmem;
/*****************************************************************************
******************************* Fixture Tests *******************************
*****************************************************************************/
TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_4) {
get_thread_lane_ids(4);
}
TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_10) {
get_thread_lane_ids(10);
}
TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_16) {
get_thread_lane_ids(16);
}
TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_17) {
get_thread_lane_ids(17);
}
TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_64) {
get_thread_lane_ids(64);
}
TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_97) {
get_thread_lane_ids(97);
}
TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_256) {
get_thread_lane_ids(256);
}
TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_183) {
get_thread_lane_ids(183);
}
TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_1024) {
get_thread_lane_ids(1024);
}
TEST_F(AtomicWFQueueTestFixture, init_2_64) {
init_queue(2, 64);
}
TEST_F(AtomicWFQueueTestFixture, init_64_256) {
init_queue(64, 256);
}
TEST_F(AtomicWFQueueTestFixture, init_1024_96) {
init_queue(1024, 96);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_eq_qsize_2_32) {
int num_blocks {2};
int block_size {32};
int wf_size {this->wf_size};
int queue_size {num_blocks * ((block_size - 1) / wf_size + 1)};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_eq_qsize_32_192) {
int num_blocks {32};
int block_size {192};
int wf_size {this->wf_size};
int queue_size {num_blocks * ((block_size - 1) / wf_size + 1)};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_eq_qsize_64_256) {
int num_blocks {64};
int block_size {256};
int wf_size {this->wf_size};
int queue_size {num_blocks * ((block_size - 1) / wf_size + 1)};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_ge_qsize_16_32_4) {
int num_blocks {16};
int block_size {32};
int queue_size {4};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_ge_qsize_16_96_8) {
int num_blocks {16};
int block_size {96};
int queue_size {8};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_ge_qsize_16_320_8) {
int num_blocks {16};
int block_size {320};
int queue_size {8};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_ge_qsize_32_192_16) {
int num_blocks {32};
int block_size {192};
int queue_size {16};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_ge_qsize_64_256_64) {
int num_blocks {64};
int block_size {256};
int queue_size {64};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_ge_qsize_64_576_64) {
int num_blocks {64};
int block_size {576};
int queue_size {64};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_qsize_ge_threads_4_8_16) {
int num_blocks {4};
int block_size {8};
int queue_size {16};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_qsize_ge_threads_4_96_16) {
int num_blocks {4};
int block_size {96};
int queue_size {16};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_qsize_ge_threads_32_192_128) {
int num_blocks {32};
int block_size {192};
int queue_size {128};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_qsize_ge_threads_32_320_256) {
int num_blocks {32};
int block_size {320};
int queue_size {256};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_qsize_ge_threads_64_16_128) {
int num_blocks {64};
int block_size {16};
int queue_size {128};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_qsize_ge_threads_64_96_256) {
int num_blocks {64};
int block_size {96};
int queue_size {256};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_qsize_ge_threads_64_320_512) {
int num_blocks {64};
int block_size {320};
int queue_size {512};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_qsize_ge_threads_64_576_1024) {
int num_blocks {64};
int block_size {576};
int queue_size {1024};
dequeue_enqueue(num_blocks, block_size, queue_size);
}
@@ -0,0 +1,170 @@
/******************************************************************************
* Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*****************************************************************************/
#ifndef ROCSHMEM_ATOMIC_WF_QUEUE_GTEST_HPP
#define ROCSHMEM_ATOMIC_WF_QUEUE_GTEST_HPP
#include "../src/containers/atomic_wf_queue_impl.hpp"
#include "gtest/gtest.h"
#include "../src/memory/hip_allocator.hpp"
#include "../src/util.hpp"
#include <iostream>
namespace rocshmem {
template <typename AWFQueue>
__global__ void wf_lane_ids(AWFQueue* awf_queue, unsigned int* device_array,
int wf_size) {
int t_id {get_flat_id()};
int lane_id {t_id % wf_size};
device_array[t_id] = awf_queue->active_logical_lane_id();
}
template <typename AWFQueue>
__global__ void concurrent_enqueue_dequeue(
AWFQueue* awf_queue,
unsigned int* device_array) {
int t_id {get_flat_id()};
int val {awf_queue->dequeue()};
device_array[t_id] = val;
awf_queue->enqueue(val);
}
class AtomicWFQueueTestFixture : public ::testing::Test {
public:
AtomicWFQueueTestFixture() {
awf_queue = awf_queue_proxy.get();
int device_id {};
hipDeviceProp_t device_props;
CHECK_HIP(hipGetDevice(&device_id));
CHECK_HIP(hipGetDeviceProperties(&device_props, device_id));
wf_size = device_props.warpSize;
}
~AtomicWFQueueTestFixture() {}
void get_thread_lane_ids(unsigned int num_threads = 4) {
unsigned int *device_array {nullptr};
hip_allocator_.allocate(reinterpret_cast<void**>(&device_array),
sizeof(unsigned int) * num_threads);
hipLaunchKernelGGL(wf_lane_ids, 1, num_threads, 0, nullptr,
awf_queue, device_array, wf_size);
CHECK_HIP(hipDeviceSynchronize());
hip_allocator_.deallocate(device_array);
for (unsigned int i{0}; i < num_threads; i++) {
EXPECT_EQ(device_array[i], i % wf_size);
}
}
void init_queue(int num_blocks, int block_size) {
int num_elems = num_blocks * ((block_size - 1) / wf_size + 1);
awf_queue->allocate_queue(num_elems);
for (int i{0}; i < num_elems; i++) {
awf_queue->push(i);
}
EXPECT_EQ(awf_queue->get_curr_size(), num_elems);
EXPECT_EQ(awf_queue->get_queue_size(), num_elems);
EXPECT_EQ(awf_queue->get_tail(), awf_queue->get_head());
awf_queue->deallocate_queue();
}
void verify(unsigned int *arr, int num_blocks, int block_size) {
unsigned int expected_val {};
unsigned int lane_id {};
unsigned int idx {};
for (unsigned int i{0}; i < num_blocks; i++) {
for (unsigned int j{0}; j < block_size; j++) {
idx = i * block_size + j;
lane_id = j % wf_size;
if (!lane_id) {
expected_val = arr[idx];
}
EXPECT_EQ(arr[idx], expected_val + lane_id);
}
}
}
void dequeue_enqueue(int num_blocks, int block_size, int queue_size) {
int num_threads {num_blocks * block_size};
unsigned int *device_array {nullptr};
hip_allocator_.allocate(reinterpret_cast<void**>(&device_array),
sizeof(unsigned int) * num_threads);
awf_queue->allocate_queue(queue_size);
for (int i{0}; i < queue_size; i++) {
awf_queue->push(i);
}
EXPECT_EQ(awf_queue->get_curr_size(), queue_size);
EXPECT_EQ(awf_queue->get_queue_size(), queue_size);
EXPECT_EQ(awf_queue->get_tail(), awf_queue->get_head());
hipLaunchKernelGGL(concurrent_enqueue_dequeue, num_blocks, block_size, 0,
nullptr, awf_queue, device_array);
CHECK_HIP(hipDeviceSynchronize());
EXPECT_EQ(awf_queue->get_curr_size(), queue_size);
EXPECT_EQ(awf_queue->get_tail(), awf_queue->get_head());
verify(device_array, num_blocks, block_size);
hip_allocator_.deallocate(device_array);
awf_queue->deallocate_queue();
}
protected:
AtomicWFQueueProxy<HIPAllocator, int> awf_queue_proxy{};
AtomicWFQueue<int, HIPAllocator>* awf_queue{};
/**
* @brief An allocator to create objects in device memory.
*/
HIPAllocator hip_allocator_ {};
/**
* @brief Wavefront size.
*/
int wf_size {};
};
} // namespace rocshmem
#endif // ROCSHMEM_ATOMIC_WF_QUEUE_GTEST_HPP