diff --git a/projects/rocshmem/src/containers/atomic_wf_queue.hpp b/projects/rocshmem/src/containers/atomic_wf_queue.hpp new file mode 100644 index 0000000000..aa5f0cf1f1 --- /dev/null +++ b/projects/rocshmem/src/containers/atomic_wf_queue.hpp @@ -0,0 +1,358 @@ +/****************************************************************************** + * Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + *****************************************************************************/ + +#ifndef LIBRARY_SRC_CONTAINERS_ATOMIC_WF_QUEUE_HPP_ +#define LIBRARY_SRC_CONTAINERS_ATOMIC_WF_QUEUE_HPP_ + +#include + +#include "../memory/hip_allocator.hpp" +#include "../sync/abql_block_mutex.hpp" +#include "../src/util.hpp" + +namespace rocshmem { + +/***************************************************************************** + ******************************* WAVE FREE LIST ****************************** + *****************************************************************************/ + +template +class AtomicWFQueue { + + using MutexProxyType = ABQLBlockMutexProxy; + using MutexType = ABQLBlockMutex; + + /** + * @brief A lock guard for ticket-based locks that follows the design of + * `std::lock_guard`. + * + * @tparam MUTEX The type of the ticket-based mutex to lock. + */ + template + struct TicketLockGuard { + /** + * @brief Constructs the `TicketLockGuard` and locks the mutex. + * + * @param m Mutex to take ownership of. + */ + __device__ explicit TicketLockGuard(MUTEX& m) : mutex_{m} { + ticket_ = mutex_.lock(); + __threadfence(); + } + + /** + * @brief Lock guards are not copyable + */ + __device__ TicketLockGuard(const TicketLockGuard&) = delete; + + /** + * @brief Lock guards are not moveable + */ + __device__ TicketLockGuard(TicketLockGuard&&) = delete; + + /** + * @brief Destructor the unlocks the mutex. + */ + __device__ ~TicketLockGuard() { + __threadfence(); + mutex_.unlock(ticket_); + } + + private: + using TicketT = uint64_t; + MUTEX& mutex_; + TicketT ticket_; + }; + + public: + /** + * @brief Construct a new AtomicWFQueue object + * + * @param allocator Allocator to use for allocating internal structures of the + * AtomicWFQueue. + */ + explicit AtomicWFQueue(const ALLOCATOR& allocator = ALLOCATOR()); + + /** + * @brief Destroy the AtomicWFQueue object + */ + ~AtomicWFQueue(); + + /** + * @brief Enqueues an element into the AtomicWFQueue. + * + * This function inserts the specified value at the position indicated by + * the `tail_` of the AtomicWFQueue and increases the AtomicWFQueue size + * by one. The enqueue operation follows a first-come, first-serve + * execution order. + * + * @param val The value to be inserted into the AtomicWFQueue. + */ + __device__ void enqueue(const TYPE& val); + + /** + * @brief Dequeues an element from the AtomicWFQueue. + * + * This function dequeues the element pointed to by the `head_` of the + * AtomicWFQueue and decreases the AtomicWFQueue size by one. If the + * AtomicWFQueue is empty, the function waits until an element becomes + * available. The dequeue operation follows a first-come, first-serve + * execution order. + * + * @return The dequeued element from the AtomicWFQueue. + */ + __device__ TYPE dequeue(); + + /** + * @brief Inserts a new element at the end of the AtomicWFQueue. + * + * This function adds the specified value to the end of the AtomicWFQueue, + * updating the `tail_` and `curr_size_` accordingly. It is intended for + * initializing the AtomicWFQueue with initial values. + * + * @note This function is not thread-safe and should only be used during + * the AtomicWFQueue initialization phase or in scenarios where thread + * safety is not a concern. + * + * @param val The value to be inserted into the AtomicWFQueue. + */ + __host__ void push(const TYPE& val); + + /** + * @brief Allocates and initializes the AtomicWFQueue. + * + * This function allocates memory for the AtomicWFQueue with the specified + * size and initializes the AtomicWFQueue's head, tail, current size, and + * maximum size variables to their appropriate starting values. + * + * @param size The maximum number of elements the AtomicWFQueue can hold. + */ + __host__ void allocate_queue(unsigned int size); + + /** + * @brief Deallocates the AtomicWFQueue and resets its internal variables. + * + * This function frees the memory allocated for the AtomicWFQueue and resets + * the AtomicWFQueue's internal variables such as head, tail, current size, + * and maximum size to their default or zero-initialized values. + */ + __host__ void deallocate_queue(); + + /** + * @brief Retrieves the logical lane ID of the calling thread. + * + * This function returns the active logical lane ID of the current thread + * within the wavefront. The logical lane ID uniquely identifies + * the thread's position among active threads in the wavefront. + * + * @return The logical lane ID of the active thread within the wavefront. + */ + __device__ unsigned int active_logical_lane_id(); + + /** + * @brief Broadcasts a value to other threads in the wavefront. + * + * This function broadcasts the specified value to all active threads + * in the wavefront. If `lowest_active` is true, the value is broadcasted + * from the thread with the lowest active lane ID. + * + * @param lowest_active If true, broadcasting starts from the lowest + * active thread in the wavefront. + * @param val The value to be broadcasted. + * + * @return The broadcasted value received by each thread in the wavefront. + */ + __device__ TYPE broadcast_lds(bool lowest_active, TYPE val); + + /** + * @brief Retrieves the maximum capacity of the AtomicWFQueue. + * + * This function returns the total size of the AtomicWFQueue, representing + * the maximum number of elements it can hold. + * + * @return The maximum capacity of the AtomicWFQueue. + */ + __host__ __device__ int get_queue_size() { + return size_; + } + + /** + * @brief Retrieves the current number of elements in the AtomicWFQueue. + * + * This function returns the current size of the AtomicWFQueue, representing + * the total number of elements currently stored. + * + * @return The current number of elements in the AtomicWFQueue. + */ + __host__ __device__ int get_curr_size() { + return curr_size_; + } + + /** + * @brief Retrieves the tail index of the AtomicWFQueue. + * + * This function returns the current index of the tail in the + * AtomicWFQueue, which represents the position where the next + * element will be enqueued. + * + * @return The index of the tail in the AtomicWFQueue. + */ + __host__ __device__ int get_tail() { + return tail_; + } + + /** + * @brief Retrieves the head index of the AtomicWFQueue. + * + * This function returns the current index of the head in the + * AtomicWFQueue, which represents the position of the next element + * to be dequeued. + * + * @return The index of the head in the AtomicWFQueue. + */ + __host__ __device__ int get_head() { + return head_; + } + + private: + + __device__ int atomic_load(const int* address) { + return __hip_atomic_load(address, __ATOMIC_SEQ_CST, + __HIP_MEMORY_SCOPE_AGENT); + } + + __device__ void atomic_store(int* address, const int val) { + __hip_atomic_store(address, val, __ATOMIC_SEQ_CST, + __HIP_MEMORY_SCOPE_AGENT); + } + + __device__ void atomic_add(int* address, const int val) { + __hip_atomic_fetch_add(address, val, __ATOMIC_SEQ_CST, + __HIP_MEMORY_SCOPE_AGENT); + } + + __device__ void atomic_sub(int* address, const int val) { + __hip_atomic_fetch_sub(address, val, __ATOMIC_SEQ_CST, + __HIP_MEMORY_SCOPE_AGENT); + } + + /** + * @brief Checks if the AtomicWFQueue is full. + * + * This function determines whether the AtomicWFQueue has reached its + * maximum capacity. It is used to prevent overflow conditions during + * enqueue operations. + * + * @return true if the AtomicWFQueue is full, false otherwise. + */ + __device__ bool is_full() { + return atomic_load(&curr_size_) == size_; + } + + /** + * @brief Checks if the AtomicWFQueue is empty. + * + * This function determines whether the AtomicWFQueue has no elements + * available for dequeue operations. It is used to prevent underflow + * conditions. + * + * @return true if the AtomicWFQueue is empty, false otherwise. + */ + __device__ bool is_empty() { + return atomic_load(&curr_size_) == 0; + } + + + /** + * @brief Internal memory allocator used to create internal structures of + * the AtomicWFQueue. + */ + ALLOCATOR allocator_{}; + + /** + * @brief Points to the index of first element in the AtomicWFQueue. + */ + int head_{}; + + /** + * @brief Points to the next empty slot in the AtomicWFQueue. + */ + int tail_{}; + + /** + * @brief Size of the AtomicWFQueue. + */ + int size_{}; + + /** + * @brief Current size of the AtomicWFQueue. + */ + int curr_size_{}; + + /** + * @brief Pointer to AtomicWFQueue memory + */ + TYPE *queue_{nullptr}; + + /** + * @brief Mutex protecting the AtomicWFQueue mutations during dequeue. + */ + MutexProxyType dequeue_mutex_; + + /** + * @brief Mutex protecting the AtomicWFQueue mutations during enqueue_mutex. + */ + MutexProxyType enqueue_mutex_; +}; + +template +class AtomicWFQueueProxy { + using AtomicWFQueueT = AtomicWFQueue; + using ProxyT = DeviceProxy; + + public: + __host__ __device__ AtomicWFQueueT* get() { return proxy_.get(); } + + AtomicWFQueueProxy(size_t num_elems = 1) : proxy_{num_elems} { + new (proxy_.get()) AtomicWFQueueT(); + } + + AtomicWFQueueProxy(const AtomicWFQueueProxy& other) = delete; + + AtomicWFQueueProxy& operator=(const AtomicWFQueueProxy& other) = delete; + + AtomicWFQueueProxy(AtomicWFQueueProxy&& other) = default; + + AtomicWFQueueProxy& operator=(AtomicWFQueueProxy&& other) = default; + + ~AtomicWFQueueProxy() { + auto atomic_wf_queue = proxy_.get(); + atomic_wf_queue->deallocate_queue(); + atomic_wf_queue->~AtomicWFQueue(); + } + + private: + ProxyT proxy_{}; +}; +} // namespace rocshmem + +#endif // LIBRARY_SRC_CONTAINERS_ATOMIC_WF_QUEUE_HPP_ diff --git a/projects/rocshmem/src/containers/atomic_wf_queue_impl.hpp b/projects/rocshmem/src/containers/atomic_wf_queue_impl.hpp new file mode 100644 index 0000000000..2280580622 --- /dev/null +++ b/projects/rocshmem/src/containers/atomic_wf_queue_impl.hpp @@ -0,0 +1,163 @@ +/****************************************************************************** + * Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + *****************************************************************************/ + +#include "atomic_wf_queue.hpp" +#include + +namespace rocshmem { + +/***************************************************************************** + ******************************* WAVE FREE LIST ****************************** + *****************************************************************************/ + +template +AtomicWFQueue::~AtomicWFQueue() {} + +template +__host__ void AtomicWFQueue::deallocate_queue() { + if (queue_ != nullptr) { + allocator_.deallocate((void*)queue_); + queue_ = nullptr; + } + size_ = 0; + curr_size_ = 0; + head_ = 0; + tail_ = 0; +} + +template +AtomicWFQueue::AtomicWFQueue(const ALLOCATOR& allocator) + : allocator_{allocator}, size_{0}, curr_size_{0}, head_{0}, tail_{0} {} + +template +__host__ void AtomicWFQueue::allocate_queue( + unsigned int size) { + + size_ = size; + head_ = 0; + tail_ = 0; + curr_size_ = 0; + allocator_.allocate(reinterpret_cast(&queue_), + sizeof(TYPE) * size_); +} + +template +__host__ void AtomicWFQueue::push(const TYPE& val) { + if (curr_size_ < size_) { + queue_[tail_] = val; + tail_ = (tail_ + 1) % size_; + curr_size_++; + } + else { + std::cerr << "AtomicWfQueue is full: " << curr_size_ + << " elements" << std::endl; + } +} + +template +__device__ unsigned int +AtomicWFQueue::active_logical_lane_id() { + uint64_t ballot{__ballot(1)}; + uint64_t my_physical_lane_id{__lane_id()}; + uint64_t all_ones_mask = -1; + uint64_t lane_mask{all_ones_mask << my_physical_lane_id}; + uint64_t inverted_mask{~lane_mask}; + uint64_t lower_active_lanes{ballot & inverted_mask}; + unsigned int my_logical_lane_id{__popcll(lower_active_lanes)}; + return my_logical_lane_id; +} + +template +__device__ TYPE AtomicWFQueue::broadcast_lds( + bool lowest_active, TYPE value) { + + /** + * Shared array to broadcast data within each wavefront + * Max threads per block = 1024, wavefront size = 64 (in most GPUs) + * Maximum array size required = 1024/64 = 16 + */ + constexpr size_t SIZE = 1024 / WF_SIZE; + __shared__ TYPE value_per_warp[SIZE]; + auto wavefront_id {get_flat_block_id() / WF_SIZE}; + if (lowest_active) { + value_per_warp[wavefront_id] = value; + __threadfence_block(); + } + return value_per_warp[wavefront_id]; +} + +template +__device__ void AtomicWFQueue::enqueue(const TYPE& val) { + unsigned int my_active_lane_id {active_logical_lane_id()}; + bool is_lowest_active_lane {my_active_lane_id == 0}; + if (is_lowest_active_lane) { + /** + * Prevents multiple wavefronts from simultaneously entering the enqueue + * operation. Ensures a first-come, first-serve execution order + */ + TicketLockGuard guard(*enqueue_mutex_.get()); + + /** + * There should always be space available. + * If the queue is full, it indicates an unexpected issue. + */ + assert(!is_full()); + + int next_tail = (tail_ + 1) % size_; + queue_[tail_] = val; + + tail_ = next_tail; + atomic_add(&curr_size_, 1); + } +} + +template +__device__ TYPE AtomicWFQueue::dequeue() { + TYPE ret_val {TYPE()}; + unsigned int my_active_lane_id {active_logical_lane_id()}; + bool is_lowest_active_lane {my_active_lane_id == 0}; + if (is_lowest_active_lane) { + /** + * Prevents multiple wavefronts from simultaneously entering the dequeue + * operation. Ensures a first-come, first-serve execution order + */ + TicketLockGuard guard(*dequeue_mutex_.get()); + + // queue is empty, wait until data is available + while (is_empty()) {} + + int next_head = (head_ + 1) % size_; + + ret_val = queue_[head_]; + + head_ = next_head; + atomic_sub(&curr_size_, 1); + } + + ret_val = broadcast_lds(is_lowest_active_lane, ret_val); + // TYPE should support + operation + ret_val += my_active_lane_id; + + return ret_val; +} + +} // namespace rocshmem diff --git a/projects/rocshmem/tests/unit_tests/CMakeLists.txt b/projects/rocshmem/tests/unit_tests/CMakeLists.txt index fc09ff9a7f..76054b5abf 100644 --- a/projects/rocshmem/tests/unit_tests/CMakeLists.txt +++ b/projects/rocshmem/tests/unit_tests/CMakeLists.txt @@ -92,6 +92,7 @@ target_sources( free_list_gtest.cpp #context_ipc_gtest.cpp wavefront_size_gtest.cpp + atomic_wf_queue_gtest.cpp ) if (USE_IPC) diff --git a/projects/rocshmem/tests/unit_tests/atomic_wf_queue_gtest.cpp b/projects/rocshmem/tests/unit_tests/atomic_wf_queue_gtest.cpp new file mode 100644 index 0000000000..69b8a00000 --- /dev/null +++ b/projects/rocshmem/tests/unit_tests/atomic_wf_queue_gtest.cpp @@ -0,0 +1,199 @@ +/****************************************************************************** + * Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + *****************************************************************************/ + +#include "atomic_wf_queue_gtest.hpp" + +using namespace rocshmem; + +/***************************************************************************** + ******************************* Fixture Tests ******************************* + *****************************************************************************/ + +TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_4) { + get_thread_lane_ids(4); +} + +TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_10) { + get_thread_lane_ids(10); +} + +TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_16) { + get_thread_lane_ids(16); +} + +TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_17) { + get_thread_lane_ids(17); +} + +TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_64) { + get_thread_lane_ids(64); +} + +TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_97) { + get_thread_lane_ids(97); +} + +TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_256) { + get_thread_lane_ids(256); +} + +TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_183) { + get_thread_lane_ids(183); +} + +TEST_F(AtomicWFQueueTestFixture, active_logical_lane_ids_1024) { + get_thread_lane_ids(1024); +} + +TEST_F(AtomicWFQueueTestFixture, init_2_64) { + init_queue(2, 64); +} + +TEST_F(AtomicWFQueueTestFixture, init_64_256) { + init_queue(64, 256); +} + +TEST_F(AtomicWFQueueTestFixture, init_1024_96) { + init_queue(1024, 96); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_eq_qsize_2_32) { + int num_blocks {2}; + int block_size {32}; + int wf_size {this->wf_size}; + int queue_size {num_blocks * ((block_size - 1) / wf_size + 1)}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_eq_qsize_32_192) { + int num_blocks {32}; + int block_size {192}; + int wf_size {this->wf_size}; + int queue_size {num_blocks * ((block_size - 1) / wf_size + 1)}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_eq_qsize_64_256) { + int num_blocks {64}; + int block_size {256}; + int wf_size {this->wf_size}; + int queue_size {num_blocks * ((block_size - 1) / wf_size + 1)}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_ge_qsize_16_32_4) { + int num_blocks {16}; + int block_size {32}; + int queue_size {4}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_ge_qsize_16_96_8) { + int num_blocks {16}; + int block_size {96}; + int queue_size {8}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_ge_qsize_16_320_8) { + int num_blocks {16}; + int block_size {320}; + int queue_size {8}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_ge_qsize_32_192_16) { + int num_blocks {32}; + int block_size {192}; + int queue_size {16}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_ge_qsize_64_256_64) { + int num_blocks {64}; + int block_size {256}; + int queue_size {64}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_threads_ge_qsize_64_576_64) { + int num_blocks {64}; + int block_size {576}; + int queue_size {64}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_qsize_ge_threads_4_8_16) { + int num_blocks {4}; + int block_size {8}; + int queue_size {16}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_qsize_ge_threads_4_96_16) { + int num_blocks {4}; + int block_size {96}; + int queue_size {16}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_qsize_ge_threads_32_192_128) { + int num_blocks {32}; + int block_size {192}; + int queue_size {128}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_qsize_ge_threads_32_320_256) { + int num_blocks {32}; + int block_size {320}; + int queue_size {256}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_qsize_ge_threads_64_16_128) { + int num_blocks {64}; + int block_size {16}; + int queue_size {128}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_qsize_ge_threads_64_96_256) { + int num_blocks {64}; + int block_size {96}; + int queue_size {256}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_qsize_ge_threads_64_320_512) { + int num_blocks {64}; + int block_size {320}; + int queue_size {512}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} + +TEST_F(AtomicWFQueueTestFixture, dequeue_enqueue_qsize_ge_threads_64_576_1024) { + int num_blocks {64}; + int block_size {576}; + int queue_size {1024}; + dequeue_enqueue(num_blocks, block_size, queue_size); +} \ No newline at end of file diff --git a/projects/rocshmem/tests/unit_tests/atomic_wf_queue_gtest.hpp b/projects/rocshmem/tests/unit_tests/atomic_wf_queue_gtest.hpp new file mode 100644 index 0000000000..2fdb8f86ec --- /dev/null +++ b/projects/rocshmem/tests/unit_tests/atomic_wf_queue_gtest.hpp @@ -0,0 +1,170 @@ +/****************************************************************************** + * Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + *****************************************************************************/ + +#ifndef ROCSHMEM_ATOMIC_WF_QUEUE_GTEST_HPP +#define ROCSHMEM_ATOMIC_WF_QUEUE_GTEST_HPP + +#include "../src/containers/atomic_wf_queue_impl.hpp" +#include "gtest/gtest.h" +#include "../src/memory/hip_allocator.hpp" +#include "../src/util.hpp" +#include + +namespace rocshmem { + +template +__global__ void wf_lane_ids(AWFQueue* awf_queue, unsigned int* device_array, + int wf_size) { + + int t_id {get_flat_id()}; + int lane_id {t_id % wf_size}; + device_array[t_id] = awf_queue->active_logical_lane_id(); +} + +template +__global__ void concurrent_enqueue_dequeue( + AWFQueue* awf_queue, + unsigned int* device_array) { + + int t_id {get_flat_id()}; + int val {awf_queue->dequeue()}; + device_array[t_id] = val; + awf_queue->enqueue(val); +} + +class AtomicWFQueueTestFixture : public ::testing::Test { + public: + AtomicWFQueueTestFixture() { + awf_queue = awf_queue_proxy.get(); + + int device_id {}; + hipDeviceProp_t device_props; + CHECK_HIP(hipGetDevice(&device_id)); + CHECK_HIP(hipGetDeviceProperties(&device_props, device_id)); + + wf_size = device_props.warpSize; + } + + ~AtomicWFQueueTestFixture() {} + + void get_thread_lane_ids(unsigned int num_threads = 4) { + + unsigned int *device_array {nullptr}; + + hip_allocator_.allocate(reinterpret_cast(&device_array), + sizeof(unsigned int) * num_threads); + + + hipLaunchKernelGGL(wf_lane_ids, 1, num_threads, 0, nullptr, + awf_queue, device_array, wf_size); + + CHECK_HIP(hipDeviceSynchronize()); + + hip_allocator_.deallocate(device_array); + + for (unsigned int i{0}; i < num_threads; i++) { + EXPECT_EQ(device_array[i], i % wf_size); + } + } + + void init_queue(int num_blocks, int block_size) { + int num_elems = num_blocks * ((block_size - 1) / wf_size + 1); + + awf_queue->allocate_queue(num_elems); + + for (int i{0}; i < num_elems; i++) { + awf_queue->push(i); + } + + EXPECT_EQ(awf_queue->get_curr_size(), num_elems); + EXPECT_EQ(awf_queue->get_queue_size(), num_elems); + EXPECT_EQ(awf_queue->get_tail(), awf_queue->get_head()); + + awf_queue->deallocate_queue(); + } + + void verify(unsigned int *arr, int num_blocks, int block_size) { + unsigned int expected_val {}; + unsigned int lane_id {}; + unsigned int idx {}; + for (unsigned int i{0}; i < num_blocks; i++) { + for (unsigned int j{0}; j < block_size; j++) { + idx = i * block_size + j; + lane_id = j % wf_size; + if (!lane_id) { + expected_val = arr[idx]; + } + EXPECT_EQ(arr[idx], expected_val + lane_id); + } + } + } + + void dequeue_enqueue(int num_blocks, int block_size, int queue_size) { + + int num_threads {num_blocks * block_size}; + unsigned int *device_array {nullptr}; + + hip_allocator_.allocate(reinterpret_cast(&device_array), + sizeof(unsigned int) * num_threads); + + awf_queue->allocate_queue(queue_size); + + for (int i{0}; i < queue_size; i++) { + awf_queue->push(i); + } + + EXPECT_EQ(awf_queue->get_curr_size(), queue_size); + EXPECT_EQ(awf_queue->get_queue_size(), queue_size); + EXPECT_EQ(awf_queue->get_tail(), awf_queue->get_head()); + + hipLaunchKernelGGL(concurrent_enqueue_dequeue, num_blocks, block_size, 0, + nullptr, awf_queue, device_array); + CHECK_HIP(hipDeviceSynchronize()); + + EXPECT_EQ(awf_queue->get_curr_size(), queue_size); + EXPECT_EQ(awf_queue->get_tail(), awf_queue->get_head()); + + verify(device_array, num_blocks, block_size); + + hip_allocator_.deallocate(device_array); + awf_queue->deallocate_queue(); + } + + protected: + + AtomicWFQueueProxy awf_queue_proxy{}; + AtomicWFQueue* awf_queue{}; + + /** + * @brief An allocator to create objects in device memory. + */ + HIPAllocator hip_allocator_ {}; + + /** + * @brief Wavefront size. + */ + int wf_size {}; +}; + +} // namespace rocshmem + +#endif // ROCSHMEM_ATOMIC_WF_QUEUE_GTEST_HPP