From 07ee1f07d8bd31275ddfa1fa202d53a399ea89d3 Mon Sep 17 00:00:00 2001 From: Maneesh Gupta Date: Fri, 5 Oct 2018 16:23:08 +0530 Subject: [PATCH] Implementation for stream priority - Requires ROCm 1.9.x or higher - Requires HCC with PR#886 merged Change-Id: Id7c95ea091ee610e80c9ad815f1cb989cba570ca --- .../include/hip/hcc_detail/hip_runtime_api.h | 71 +++++- .../include/hip/nvcc_detail/hip_runtime_api.h | 14 ++ hipamd/src/hip_stream.cpp | 47 +++- .../stream/hipStreamCreateWithPriority.cpp | 220 ++++++++++++++++++ 4 files changed, 339 insertions(+), 13 deletions(-) create mode 100644 hipamd/tests/src/runtimeApi/stream/hipStreamCreateWithPriority.cpp diff --git a/hipamd/include/hip/hcc_detail/hip_runtime_api.h b/hipamd/include/hip/hcc_detail/hip_runtime_api.h index 573ae39af9..60609fd135 100644 --- a/hipamd/include/hip/hcc_detail/hip_runtime_api.h +++ b/hipamd/include/hip/hcc_detail/hip_runtime_api.h @@ -590,8 +590,6 @@ const char* hipGetErrorString(hipError_t hipError); * * The following Stream APIs are not (yet) supported in HIP: * - cudaStreamAttachMemAsync - * - cudaStreamCreateWithPriority - * - cudaStreamGetPriority */ @@ -609,7 +607,7 @@ const char* hipGetErrorString(hipError_t hipError); * * @return #hipSuccess, #hipErrorInvalidValue * - * @see hipStreamCreateWithFlags, hipStreamSynchronize, hipStreamWaitEvent, hipStreamDestroy + * @see hipStreamCreateWithFlags, hipStreamCreateWithPriority, hipStreamSynchronize, hipStreamWaitEvent, hipStreamDestroy */ hipError_t hipStreamCreate(hipStream_t* stream); @@ -628,12 +626,50 @@ hipError_t hipStreamCreate(hipStream_t* stream); * stream. See #hipStreamDefault, #hipStreamNonBlocking. * * - * @see hipStreamCreate, hipStreamSynchronize, hipStreamWaitEvent, hipStreamDestroy + * @see hipStreamCreate, hipStreamCreateWithPriority, hipStreamSynchronize, hipStreamWaitEvent, hipStreamDestroy */ hipError_t hipStreamCreateWithFlags(hipStream_t* stream, unsigned int flags); +/** + * @brief Create an asynchronous stream with the specified priority. + * + * @param[in, out] stream Pointer to new stream + * @param[in ] flags to control stream creation. + * @param[in ] priority of the stream. Lower numbers represent higher priorities. + * @return #hipSuccess, #hipErrorInvalidValue + * + * Create a new asynchronous stream with the specified priority. @p stream returns an opaque handle + * that can be used to reference the newly created stream in subsequent hipStream* commands. The + * stream is allocated on the heap and will remain allocated even if the handle goes out-of-scope. + * To release the memory used by the stream, applicaiton must call hipStreamDestroy. Flags controls + * behavior of the stream. See #hipStreamDefault, #hipStreamNonBlocking. + * + * + * @see hipStreamCreate, hipStreamSynchronize, hipStreamWaitEvent, hipStreamDestroy + */ + +hipError_t hipStreamCreateWithPriority(hipStream_t* stream, unsigned int flags, int priority); + + +/** + * @brief Returns numerical values that correspond to the least and greatest stream priority. + * + * @param[in, out] leastPriority pointer in which value corresponding to least priority is returned. + * @param[in, out] greatestPriority pointer in which value corresponding to greatest priority is returned. + * + * Returns in *leastPriority and *greatestPriority the numerical values that correspond to the least + * and greatest stream priority respectively. Stream priorities follow a convention where lower numbers + * imply greater priorities. The range of meaningful stream priorities is given by + * [*greatestPriority, *leastPriority]. If the user attempts to create a stream with a priority value + * that is outside the the meaningful range as specified by this API, the priority is automatically + * clamped to within the valid range. + */ + +hipError_t hipDeviceGetStreamPriorityRange(int* leastPriority, int* greatestPriority); + + /** * @brief Destroys the specified stream. * @@ -649,7 +685,7 @@ hipError_t hipStreamCreateWithFlags(hipStream_t* stream, unsigned int flags); * The queue may be destroyed while some commands are still inflight, or may wait for all commands * queued to the stream before destroying it. * - * @see hipStreamCreate, hipStreamCreateWithFlags, hipStreamQuery, hipStreamWaitEvent, + * @see hipStreamCreate, hipStreamCreateWithFlags, hipStreamCreateWithPriority, hipStreamQuery, hipStreamWaitEvent, * hipStreamSynchronize */ hipError_t hipStreamDestroy(hipStream_t stream); @@ -667,7 +703,7 @@ hipError_t hipStreamDestroy(hipStream_t stream); * host threads are sending work to the stream, the status may change immediately after the function * is called. It is typically used for debug. * - * @see hipStreamCreate, hipStreamCreateWithFlags, hipStreamWaitEvent, hipStreamSynchronize, + * @see hipStreamCreate, hipStreamCreateWithFlags, hipStreamCreateWithPriority, hipStreamWaitEvent, hipStreamSynchronize, * hipStreamDestroy */ hipError_t hipStreamQuery(hipStream_t stream); @@ -689,7 +725,7 @@ hipError_t hipStreamQuery(hipStream_t stream); * This command honors the hipDeviceLaunchBlocking flag, which controls whether the wait is active * or blocking. * - * @see hipStreamCreate, hipStreamCreateWithFlags, hipStreamWaitEvent, hipStreamDestroy + * @see hipStreamCreate, hipStreamCreateWithFlags, hipStreamCreateWithPriority, hipStreamWaitEvent, hipStreamDestroy * */ hipError_t hipStreamSynchronize(hipStream_t stream); @@ -712,7 +748,7 @@ hipError_t hipStreamSynchronize(hipStream_t stream); * does not impliciy wait for commands in the default stream to complete, even if the specified * stream is created with hipStreamNonBlocking = 0. * - * @see hipStreamCreate, hipStreamCreateWithFlags, hipStreamSynchronize, hipStreamDestroy + * @see hipStreamCreate, hipStreamCreateWithFlags, hipStreamCreateWithPriority, hipStreamSynchronize, hipStreamDestroy */ hipError_t hipStreamWaitEvent(hipStream_t stream, hipEvent_t event, unsigned int flags); @@ -732,6 +768,23 @@ hipError_t hipStreamWaitEvent(hipStream_t stream, hipEvent_t event, unsigned int */ hipError_t hipStreamGetFlags(hipStream_t stream, unsigned int* flags); + +/** + * @brief Query the priority of a stream. + * + * @param[in] stream stream to be queried + * @param[in,out] priority Pointer to an unsigned integer in which the stream's priority is returned + * @return #hipSuccess, #hipErrorInvalidValue, #hipErrorInvalidResourceHandle + * + * @returns #hipSuccess #hipErrorInvalidValue #hipErrorInvalidResourceHandle + * + * Query the priority of a stream. The priority is returned in in priority. + * + * @see hipStreamCreateWithFlags + */ +hipError_t hipStreamGetPriority(hipStream_t stream, int* priority); + + /** * Stream CallBack struct */ @@ -749,7 +802,7 @@ typedef void (*hipStreamCallback_t)(hipStream_t stream, hipError_t status, void* * @return #hipSuccess, #hipErrorInvalidResourceHandle, #hipErrorNotSupported * * @see hipStreamCreate, hipStreamCreateWithFlags, hipStreamQuery, hipStreamSynchronize, - * hipStreamWaitEvent, hipStreamDestroy + * hipStreamWaitEvent, hipStreamDestroy, hipStreamCreateWithPriority * */ hipError_t hipStreamAddCallback(hipStream_t stream, hipStreamCallback_t callback, void* userData, diff --git a/hipamd/include/hip/nvcc_detail/hip_runtime_api.h b/hipamd/include/hip/nvcc_detail/hip_runtime_api.h index 55a3794846..e8b2f7d317 100644 --- a/hipamd/include/hip/nvcc_detail/hip_runtime_api.h +++ b/hipamd/include/hip/nvcc_detail/hip_runtime_api.h @@ -890,6 +890,13 @@ inline static hipError_t hipStreamCreateWithFlags(hipStream_t* stream, unsigned return hipCUDAErrorTohipError(cudaStreamCreateWithFlags(stream, flags)); } +inline static hipError_t hipStreamCreateWithPriority(hipStream_t* stream, unsigned int flags, int priority) { + return hipCUDAErrorTohipError(cudaStreamCreateWithPriority(stream, flags, priority)); +} + +inline static hipError_t hipDeviceGetStreamPriorityRange(int* leastPriority, int* greatestPriority) { + return hipCUDAErrorTohipError(cudaDeviceGetStreamPriorityRange(leastPriority, greatestPriority)); +} inline static hipError_t hipStreamCreate(hipStream_t* stream) { return hipCUDAErrorTohipError(cudaStreamCreate(stream)); @@ -903,6 +910,13 @@ inline static hipError_t hipStreamDestroy(hipStream_t stream) { return hipCUDAErrorTohipError(cudaStreamDestroy(stream)); } +inline static hipError_t hipStreamGetFlags(hipStream_t stream, unsigned int *flags) { + return hipCUDAErrorTohipError(cudaStreamGetFlags(stream, flags)); +} + +inline static hipError_t hipStreamGetPriority(hipStream_t stream, int *priority) { + return hipCUDAErrorTohipError(cudaStreamGetPriority(stream, priority)); +} inline static hipError_t hipStreamWaitEvent(hipStream_t stream, hipEvent_t event, unsigned int flags) { diff --git a/hipamd/src/hip_stream.cpp b/hipamd/src/hip_stream.cpp index 4170e33d63..1e239cb2fc 100644 --- a/hipamd/src/hip_stream.cpp +++ b/hipamd/src/hip_stream.cpp @@ -31,9 +31,15 @@ THE SOFTWARE. //------------------------------------------------------------------------------------------------- // Stream // +enum queue_priority +{ + priority_high = Kalmar::priority_high, + priority_normal = Kalmar::priority_normal, + priority_low = Kalmar::priority_low +}; //--- -hipError_t ihipStreamCreate(hipStream_t* stream, unsigned int flags) { +hipError_t ihipStreamCreate(hipStream_t* stream, unsigned int flags, int priority) { ihipCtx_t* ctx = ihipGetTlsDefaultCtx(); hipError_t e = hipSuccess; @@ -53,7 +59,7 @@ hipError_t ihipStreamCreate(hipStream_t* stream, unsigned int flags) { // Obtain mutex access to the device critical data, release by destructor LockedAccessor_CtxCrit_t ctxCrit(ctx->criticalData()); - auto istream = new ihipStream_t(ctx, acc.create_view(), flags); + auto istream = new ihipStream_t(ctx, acc.create_view(Kalmar::execute_in_order, Kalmar::queuing_mode_automatic, (Kalmar::queue_priority)priority), flags); ctxCrit->addStream(istream); *stream = istream; @@ -73,16 +79,33 @@ hipError_t ihipStreamCreate(hipStream_t* stream, unsigned int flags) { hipError_t hipStreamCreateWithFlags(hipStream_t* stream, unsigned int flags) { HIP_INIT_API(stream, flags); - return ihipLogStatus(ihipStreamCreate(stream, flags)); + return ihipLogStatus(ihipStreamCreate(stream, flags, priority_normal)); } //--- hipError_t hipStreamCreate(hipStream_t* stream) { HIP_INIT_API(stream); - return ihipLogStatus(ihipStreamCreate(stream, hipStreamDefault)); + return ihipLogStatus(ihipStreamCreate(stream, hipStreamDefault, priority_normal)); } +//--- +hipError_t hipStreamCreateWithPriority(hipStream_t* stream, unsigned int flags, int priority) { + HIP_INIT_API(stream, flags, priority); + + // clamp priority to range [priority_high:priority_low] + priority = (priority < priority_high ? priority_high : (priority > priority_low ? priority_low : priority)); + return ihipLogStatus(ihipStreamCreate(stream, flags, priority)); +} + +//--- +hipError_t hipDeviceGetStreamPriorityRange(int* leastPriority, int* greatestPriority) { + HIP_INIT_API(leastPriority, greatestPriority); + + if (leastPriority != NULL) *leastPriority = priority_low; + if (greatestPriority != NULL) *greatestPriority = priority_high; + return ihipLogStatus(hipSuccess); +} hipError_t hipStreamWaitEvent(hipStream_t stream, hipEvent_t event, unsigned int flags) { HIP_INIT_SPECIAL_API(TRACE_SYNC, stream, event, flags); @@ -191,6 +214,22 @@ hipError_t hipStreamGetFlags(hipStream_t stream, unsigned int* flags) { } +//-- +hipError_t hipStreamGetPriority(hipStream_t stream, int* priority) { + HIP_INIT_API(stream, priority); + + if (priority == NULL) { + return ihipLogStatus(hipErrorInvalidValue); + } else if (stream == hipStreamNull) { + return ihipLogStatus(hipErrorInvalidResourceHandle); + } else { + LockedAccessor_StreamCrit_t crit(stream->_criticalData); + *priority = crit->_av.get_queue_priority(); + return ihipLogStatus(hipSuccess); + } +} + + //--- hipError_t hipStreamAddCallback(hipStream_t stream, hipStreamCallback_t callback, void* userData, unsigned int flags) { diff --git a/hipamd/tests/src/runtimeApi/stream/hipStreamCreateWithPriority.cpp b/hipamd/tests/src/runtimeApi/stream/hipStreamCreateWithPriority.cpp new file mode 100644 index 0000000000..582da67d85 --- /dev/null +++ b/hipamd/tests/src/runtimeApi/stream/hipStreamCreateWithPriority.cpp @@ -0,0 +1,220 @@ +/* +Copyright (c) 2015-2016 Advanced Micro Devices, Inc. All rights reserved. +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +/* HIT_START + * BUILD: %t %s ../../test_common.cpp + * RUN: %t + * HIT_END + */ + +#include "hip/hip_runtime.h" +#include "test_common.h" + +#define MEMCPYSIZE 64*1024*1024 +#define NUMITERS 2 +#define GRIDSIZE 1024 +#define BLOCKSIZE 256 + +// helper rountine to initialize memory +template +void mem_init(T* buf, size_t n) +{ + for (int i = 0; i < n; i++) + { + buf[i] = i; + } +} + +// kernel to copy n elements from src to dst +template +__global__ void memcpy_kernel(T* dst, T* src, size_t n) +{ + int num = gridDim.x * blockDim.x; + int id = blockDim.x * blockIdx.x + threadIdx.x; + + for (int i = id; i < n; i += num) + { + dst[i] = src[i]; + } +} + +template +void runTest() +{ + size_t size = NUMITERS*MEMCPYSIZE; + + // get the range of priorities available + #define OP(x) \ + int priority_##x; \ + bool enable_priority_##x = false; + OP(low) + OP(normal) + OP(high) + #undef OP + HIPCHECK(hipDeviceGetStreamPriorityRange(&priority_low, &priority_high)); + printf("HIP stream priority range - low: %d to high: %d\n", priority_low, priority_high); + + // Check if priorities are indeed supported + if ((priority_low - priority_high) == 0) { passed(); } + + // Enable/disable priorities based on number of available priority levels + enable_priority_low = true; + enable_priority_high = true; + if ((priority_low - priority_high) > 1) enable_priority_normal = true; + if (enable_priority_normal) priority_normal = ((priority_low - priority_high) / 2); + + // create streams with highest and lowest available priorities + #define OP(x) \ + hipStream_t stream_##x; \ + if (enable_priority_##x) { \ + HIPCHECK(hipStreamCreateWithPriority(&stream_##x, hipStreamDefault, priority_##x)); \ + } + OP(low) + OP(normal) + OP(high) + #undef OP + + // allocate and initialise host source and destination buffers + #define OP(x) \ + T* src_h_##x; \ + T* dst_h_##x; \ + if (enable_priority_##x) { \ + src_h_##x = (T*)malloc(size); \ + if (src_h_##x == NULL) { printf("src_h_%s malloc failed!\n", #x); exit(-1); } \ + mem_init(src_h_##x, (size / sizeof(T))); \ + dst_h_##x = (T*)malloc(size); \ + if (dst_h_##x == NULL) { printf("dst_h_%s malloc failed!\n", #x); exit(-1); } \ + memset(dst_h_##x, 0, size); \ + } + OP(low) + OP(normal) + OP(high) + #undef OP + + // allocate and initialize device source and destination buffers + #define OP(x) \ + T* src_d_##x; \ + T* dst_d_##x; \ + if (enable_priority_##x) { \ + HIPCHECK(hipMalloc(&src_d_##x, size)); \ + HIPCHECK(hipMemcpy(src_d_##x, src_h_##x, size, hipMemcpyHostToDevice)); \ + HIPCHECK(hipMalloc(&dst_d_##x, size)); \ + } + OP(low) + OP(normal) + OP(high) + #undef OP + + // create events for measuring time spent in kernel execution + #define OP(x) \ + hipEvent_t event_start_##x; \ + hipEvent_t event_end_##x; \ + if (enable_priority_##x) { \ + HIPCHECK(hipEventCreate(&event_start_##x)); \ + HIPCHECK(hipEventCreate(&event_end_##x)); \ + } + OP(low) + OP(normal) + OP(high) + #undef OP + + // record start events for each of the priority streams + #define OP(x) \ + if (enable_priority_##x) { \ + HIPCHECK(hipEventRecord(event_start_##x, stream_##x)); \ + } + OP(low) + OP(normal) + OP(high) + #undef OP + + // launch kernels repeatedly on each of the prioritiy streams + for (int i = 0; i < size; i += MEMCPYSIZE) + { + int j = i / sizeof(T); + #define OP(x) \ + if (enable_priority_##x) { \ + hipLaunchKernelGGL((memcpy_kernel), dim3(GRIDSIZE), dim3(BLOCKSIZE), 0, stream_##x, dst_d_##x + j, src_d_##x + j, (MEMCPYSIZE / sizeof(T))); \ + } + OP(low) + OP(normal) + OP(high) + #undef OP + } + + // record end events for each of the priority streams + #define OP(x) \ + if (enable_priority_##x) { \ + HIPCHECK(hipEventRecord(event_end_##x, stream_##x)); \ + } + OP(low) + OP(normal) + OP(high) + #undef OP + + // synchronize events for each of the priority streams + #define OP(x) \ + if (enable_priority_##x) { \ + HIPCHECK(hipEventSynchronize(event_end_##x)); \ + } + OP(low) + OP(normal) + OP(high) + #undef OP + + // compute time spent for memcpy in each stream + #define OP(x) \ + float time_spent_##x; \ + if (enable_priority_##x) { \ + HIPCHECK(hipEventElapsedTime(&time_spent_##x, event_start_##x, event_end_##x)); \ + printf("time spent for memcpy in %6s priority stream: %.3lf ms\n", #x, time_spent_##x); \ + } + OP(low) + OP(normal) + OP(high) + #undef OP + + // sanity check + #define OP(x) \ + if (enable_priority_##x) { \ + HIPCHECK(hipMemcpy(dst_h_##x, dst_d_##x, size, hipMemcpyDeviceToHost)); \ + if (memcmp(dst_h_##x, src_h_##x, size) != 0) { printf("memcmp for %s failed!\n", #x); exit(-1); } \ + } + OP(low) + OP(normal) + OP(high) + #undef OP + + // validate that stream priorities are working as expected + #define OP(x, y) \ + if (enable_priority_##x && enable_priority_##y) { \ + if (time_spent_##x < time_spent_##y) { printf("FAILED!"); exit(-1); } \ + } + OP(low, normal) + OP(normal, high) + OP(low, high) + #undef OP + passed(); +} + +int main(int argc, char **argv) +{ + HipTest::parseStandardArguments(argc, argv, false); + runTest(); +}