diff --git a/tests/catch/stress/stream/CMakeLists.txt b/tests/catch/stress/stream/CMakeLists.txt index 357c3e5e36..ae6685da58 100644 --- a/tests/catch/stress/stream/CMakeLists.txt +++ b/tests/catch/stress/stream/CMakeLists.txt @@ -1,8 +1,10 @@ # Common Tests - Test independent of all platforms set(TEST_SRC Stress_hipStreamCreate.cc + streamEnqueue.cc ) hip_add_exe_to_target(NAME stream TEST_SRC ${TEST_SRC} - TEST_TARGET_NAME stress_test) + TEST_TARGET_NAME stress_test + COMPILE_OPTIONS -std=c++14) diff --git a/tests/catch/stress/stream/streamEnqueue.cc b/tests/catch/stress/stream/streamEnqueue.cc new file mode 100644 index 0000000000..c44ff96afc --- /dev/null +++ b/tests/catch/stress/stream/streamEnqueue.cc @@ -0,0 +1,231 @@ +/* +Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include + +#include +#include +#include +#include +#include +#include + +__global__ void addVal(unsigned long long* ptr, size_t index, unsigned long long val) { + atomicAdd(ptr + index, val); +} + +// Create a copy constructible AtomicWrap around std::atomic so that we can put it in a vector +template struct AtomicWrap { + std::atomic data; + + AtomicWrap() : data() {} + + AtomicWrap(T i) : data(i) {} + + AtomicWrap(const std::atomic& a) : data(a.load()) {} + + AtomicWrap(const AtomicWrap& other) : data(other.data.load()) {} + + AtomicWrap& operator=(const AtomicWrap& other) { + data.store(other.data.load()); + return *this; + } +}; + +// Have multiple threads and enqueue commands from them on a single stream +// Validate at the end that all commands have completed successfully +TEST_CASE("Stress_StreamEnqueue_DifferentThreads") { + auto hwThreads = std::thread::hardware_concurrency(); + hwThreads = (hwThreads >= 2) ? hwThreads : 2; // Run atleast 2 threads + + std::vector> hostData(hwThreads, 0); + + unsigned long long* dPtr{nullptr}; + HIP_CHECK(hipMalloc(&dPtr, sizeof(unsigned long long) * hwThreads)); + REQUIRE(dPtr != nullptr); + + HIP_CHECK(hipMemset(dPtr, 0, sizeof(unsigned long long) * hwThreads)); + + std::random_device device; + std::mt19937 engine(device()); + + constexpr size_t maxWork = 10000; + constexpr size_t maxVal = 10; + + std::uniform_int_distribution genIndex(0, hwThreads - 1); + std::uniform_int_distribution genWork(0, maxWork); + std::uniform_int_distribution genVal(0, maxVal); + + auto enqueueKernelThread = [&](hipStream_t stream) { + auto iter = genWork(engine); // Generate work to be done via thread + for (auto i = 0; i < iter; i++) { + auto index = genIndex(engine); // Generate Index to add to + auto val = genVal(engine); // Generate value to add to the destination + hostData[index].data += val; // Replicate it on host + addVal<<<1, 1, 0, stream>>>(dPtr, static_cast(index), + static_cast(val)); // And on device + } + }; + + hipStream_t stream{}; + HIP_CHECK(hipStreamCreate(&stream)); + std::vector threadPool{}; + threadPool.reserve(hwThreads); + + // Launch work + for (size_t i = 0; i < hwThreads; i++) { + threadPool.emplace_back(std::thread(enqueueKernelThread, stream)); + } + + // Wait for work to finish + for (auto& i : threadPool) { + i.join(); + } + + HIP_CHECK(hipStreamDestroy(stream)); + + auto hPtr = std::make_unique(hwThreads); + HIP_CHECK( + hipMemcpy(hPtr.get(), dPtr, sizeof(unsigned long long) * hwThreads, hipMemcpyDeviceToHost)); + + HIP_CHECK(hipFree(dPtr)); + + // Validate that CPU and GPU has the same results + for (size_t i = 0; i < hwThreads; i++) { + INFO("Check for Index " << i); + REQUIRE(hostData[i].data.load() == hPtr[i]); + } +} + +__global__ void doOperation(int* dPtr, size_t size, int val) { + auto i = threadIdx.x; + atomicAdd(dPtr + i, val); +} + +// Allocate mulitple stream for same device. +// Same device stream operate on same memory +TEST_CASE("Stress_StreamEnqueue_DifferentThreads_MultiGPU") { + int deviceCount{0}; + HIP_CHECK(hipGetDeviceCount(&deviceCount)); + REQUIRE(deviceCount > 0); + + // Skip the test if devices less than 2 + if (deviceCount <= 1) { + HipTest::HIP_SKIP_TEST("Skipping because devices <= 1"); + return; + } + + constexpr size_t streamPerGPU{3}; // Stream per gpu + + std::vector streamPool{}; + streamPool.reserve(deviceCount * streamPerGPU); + + std::map streamToDeviceMemory; // Map of stream and device memory + std::map> streamToHostMemory; // Map of stream and host result + std::map streamToDeviceIndex; // Map of stream and device it was created on + + constexpr size_t size = 1024; + + for (size_t i = 0; i < deviceCount; i++) { + HIP_CHECK(hipSetDevice(i)); + + for (size_t j = 0; j < streamPerGPU; j++) { + hipStream_t stream{nullptr}; + HIP_CHECK(hipStreamCreate(&stream)); + REQUIRE(stream != nullptr); + streamPool.push_back(stream); + + int* dPtr{nullptr}; + HIP_CHECK(hipMalloc(&dPtr, sizeof(int) * size)); + REQUIRE(dPtr != nullptr); + HIP_CHECK(hipMemset(dPtr, 0, sizeof(int) * size)); + + streamToDeviceMemory[stream] = dPtr; // All streams work on exclusive memory + + streamToHostMemory[stream] = AtomicWrap(0); // CPU result + + streamToDeviceIndex[stream] = i; // Capture device id for stream + } + } + + constexpr size_t maxVal = 5; + constexpr size_t maxWorkPerThread = 10000; + + // Boiler plate code to generate a random number + std::random_device device; + std::mt19937 engine(device()); + + std::uniform_int_distribution genVal(-maxVal, maxVal); + std::uniform_int_distribution genStream(0, streamPool.size() - 1); + +#if HT_NVIDIA + std::mutex ness; // On nvidia, current device needs to match stream's device +#endif + + auto enqueueKernelThread = [&]() { + for (size_t i = 0; i < maxWorkPerThread; i++) { +#if HT_NVIDIA + std::unique_lock lock(ness); // Lock on creation +#endif + hipStream_t stream = streamPool[genStream(engine)]; // Get a random stream + + // TODO use HIP_CHECK_THREAD when PR#2664 is merged + if (hipSuccess != hipSetDevice(streamToDeviceIndex[stream])) { + return; + } + + int val = genVal(engine); // Generate Value to add/sub to + + streamToHostMemory[stream].data.fetch_add(val); // Replicate result on CPU + auto dPtr = streamToDeviceMemory[stream]; + doOperation<<<1, 1024, 0, stream>>>(dPtr, size, + val); // On GPU + } + }; + + auto maxThreads = std::thread::hardware_concurrency(); + maxThreads = (maxThreads >= 2) ? maxThreads : 2; // Run atleast 2 threads + + std::vector threadPool{}; + threadPool.reserve(maxThreads); + + // Launch Threads + for (size_t i = 0; i < maxThreads; i++) { + threadPool.emplace_back(std::thread(enqueueKernelThread)); + } + + // Wait for them to stop + for (auto& i : threadPool) { + i.join(); + } + + // Sync and check results + for (auto& i : streamPool) { + HIP_CHECK(hipStreamSynchronize(i)); + auto dResult = std::make_unique(size); + HIP_CHECK(hipMemcpy(dResult.get(), streamToDeviceMemory[i], sizeof(int) * size, + hipMemcpyDeviceToHost)); + HIP_CHECK(hipFree(streamToDeviceMemory[i])); + HIP_CHECK(hipStreamDestroy(i)); + auto res = streamToHostMemory[i].data.load(); + INFO("Matching CPU: " << res << " GPU: " << dResult[0] << " Dev Ptr: " + << streamToDeviceMemory[i] << " on Device: " << streamToDeviceIndex[i]); + REQUIRE(std::all_of(dResult.get(), dResult.get() + size, [=](int r) { return r == res; })); + } +}