diff --git a/projects/hip/src/hip_hcc.cpp b/projects/hip/src/hip_hcc.cpp index afc57e4b64..377ce0aad4 100644 --- a/projects/hip/src/hip_hcc.cpp +++ b/projects/hip/src/hip_hcc.cpp @@ -1497,18 +1497,21 @@ hipError_t ihipStreamSynchronize(TlsData *tls, hipStream_t stream) { return e; } -void ihipStreamCallbackHandler(ihipStreamCallback_t* cb) { +bool ihipStreamCallbackHandler(hsa_signal_value_t value, void* cbArgs) { hipError_t e = hipSuccess; - // Synchronize stream - tprintf(DB_SYNC, "ihipStreamCallbackHandler wait on stream %s\n", - ToString(cb->_stream).c_str()); - GET_TLS(); - e = ihipStreamSynchronize(tls, cb->_stream); + ihipStreamCallback_t* cb = static_cast (cbArgs); + + if(cb->comFuture.valid()) + cb->comFuture.wait(); // Call registered callback function cb->_callback(cb->_stream, e, cb->_userData); + + hsa_signal_store_screlease(cb->_signal,0); + delete cb; + return false; } //--- diff --git a/projects/hip/src/hip_hcc_internal.h b/projects/hip/src/hip_hcc_internal.h index 8ee47eba4a..6337edd370 100644 --- a/projects/hip/src/hip_hcc_internal.h +++ b/projects/hip/src/hip_hcc_internal.h @@ -650,7 +650,9 @@ class ihipStreamCallback_t { : _stream(stream), _callback(callback), _userData(userData) { }; hipStream_t _stream; + hsa_signal_t _signal; hipStreamCallback_t _callback; + hc::completion_future comFuture; void* _userData; }; @@ -968,7 +970,7 @@ hipError_t hipModuleGetFunctionEx(hipFunction_t* hfunc, hipModule_t hmod, hipStream_t ihipSyncAndResolveStream(hipStream_t, bool lockAcquired = 0); hipError_t ihipStreamSynchronize(TlsData *tls, hipStream_t stream); -void ihipStreamCallbackHandler(ihipStreamCallback_t* cb); +bool ihipStreamCallbackHandler(hsa_signal_value_t value, void* cbArgs); // Stream printf functions: inline std::ostream& operator<<(std::ostream& os, const ihipStream_t& s) { diff --git a/projects/hip/src/hip_stream.cpp b/projects/hip/src/hip_stream.cpp index e3e4975b7e..bf3289bb49 100644 --- a/projects/hip/src/hip_stream.cpp +++ b/projects/hip/src/hip_stream.cpp @@ -254,14 +254,137 @@ hipError_t hipStreamGetPriority(hipStream_t stream, int* priority) { //--- +void setCallbackPacket(hsa_queue_t* queue, + uint64_t& index, uint64_t& nextIndex, + hsa_barrier_and_packet_t** barrier1, + hsa_barrier_and_packet_t** barrier2){ + + uint64_t tempIndex = 0; + uint32_t mask = queue->size - 1; + hsa_barrier_and_packet_t* tempBarrier; + + // Check for empty packets + do{ + tempIndex = hsa_queue_load_write_index_scacquire(queue); + tempBarrier = &(((hsa_barrier_and_packet_t*)(queue->base_address))[tempIndex & mask]); + }while(!(tempBarrier->header & HSA_PACKET_TYPE_INVALID)); + + // Reserve two packets for two barriers + index = hsa_queue_add_write_index_scacquire(queue, 2); + + if(index > mask) + { + index = 0; + nextIndex = 1; + } + else if(index == mask) + nextIndex = 0; + else + nextIndex = index + 1; + + tempBarrier = new hsa_barrier_and_packet_t; + memset(tempBarrier, 0, sizeof(hsa_barrier_and_packet_t)); + tempBarrier->header = HSA_PACKET_TYPE_INVALID; + + // Barrier 1 + *barrier1 = &(((hsa_barrier_and_packet_t*)(queue->base_address))[index & mask]); + memcpy(*barrier1,tempBarrier,sizeof(hsa_barrier_and_packet_t)); + + // Barrier 2 + *barrier2 = &(((hsa_barrier_and_packet_t*)(queue->base_address))[nextIndex & mask]); + memcpy(*barrier2,tempBarrier,sizeof(hsa_barrier_and_packet_t)); + + delete tempBarrier; +} + hipError_t hipStreamAddCallback(hipStream_t stream, hipStreamCallback_t callback, void* userData, unsigned int flags) { + HIP_INIT_API(hipStreamAddCallback, stream, callback, userData, flags); hipError_t e = hipSuccess; - // Create a thread in detached mode to handle callback + if(stream == hipStreamNull) + { + ihipCtx_t* device = ihipGetTlsDefaultCtx(); + stream = device->_defaultStream; + } + + stream = ihipSyncAndResolveStream(stream); + + // Lock the stream + LockedAccessor_StreamCrit_t crit(stream->criticalData()); + + // Device synchronization + hc::completion_future marker = crit->_av.create_marker(hc::system_scope); + + // 1. Lock the queue + hsa_queue_t* lockedQ = static_cast (crit->_av.acquire_locked_hsa_queue()); + + if(lockedQ == nullptr) + { + // No queue attached to stream hence exiting early + return ihipLogStatus(hipErrorMissingConfiguration); + } + + // 2. Allocate a singals + hsa_signal_t signal; + hsa_status_t status = hsa_signal_create(1, 0, NULL, &signal); + + if(status != HSA_STATUS_SUCCESS) + { + crit->_av.release_locked_hsa_queue(); + return ihipLogStatus(hipErrorInvalidValue); + } + + hsa_signal_t depSignal; + status = hsa_signal_create(1, 0, NULL, &depSignal); + + if(status != HSA_STATUS_SUCCESS) + { + crit->_av.release_locked_hsa_queue(); + return ihipLogStatus(hipErrorInvalidValue); + } + + // 3. Store callback details, will destroy allocation in callback handler ihipStreamCallback_t* cb = new ihipStreamCallback_t(stream, callback, userData); - std::thread(ihipStreamCallbackHandler, cb).detach(); + if(cb == nullptr) + { + crit->_av.release_locked_hsa_queue(); + return ihipLogStatus(hipErrorMemoryAllocation); + } + cb->_signal = depSignal; + cb->comFuture = marker ; + + // 4. Create barrier packets + uint64_t index ; + uint64_t nextIndex; + + hsa_barrier_and_packet_t* barrier; + hsa_barrier_and_packet_t* depBarrier; + + setCallbackPacket(lockedQ, index, nextIndex, &barrier, &depBarrier); + + barrier->completion_signal = signal; + + depBarrier->dep_signal[0] = depSignal; + + uint16_t header = (HSA_PACKET_TYPE_BARRIER_AND << HSA_PACKET_HEADER_TYPE)| 1 << HSA_PACKET_HEADER_BARRIER; + + // 5. Update packet header, + // Intentionally updated second barrier header before first in order to avoid race + depBarrier->header = header; + barrier->header = header; + + // 6. Trigger the doorbell + nextIndex = nextIndex + 1; + hsa_queue_store_write_index_screlease(lockedQ, nextIndex); + hsa_signal_store_relaxed(lockedQ->doorbell_signal, index+1); + + // 7. Release queue + crit->_av.release_locked_hsa_queue(); + + // 8. Register signal callback + hsa_amd_signal_async_handler(signal, HSA_SIGNAL_CONDITION_EQ, 0, ihipStreamCallbackHandler, cb); return ihipLogStatus(e); } diff --git a/projects/hip/tests/src/runtimeApi/stream/StreamAddCallback.cpp b/projects/hip/tests/src/runtimeApi/stream/StreamAddCallback.cpp new file mode 100644 index 0000000000..e6492c7ce2 --- /dev/null +++ b/projects/hip/tests/src/runtimeApi/stream/StreamAddCallback.cpp @@ -0,0 +1,145 @@ +#include +#include +#include +#include "test_common.h" +#include + +/* HIT_START + * BUILD: %t %s ../../test_common.cpp NVCC_OPTIONS -std=c++11 + * TEST: %t + * HIT_END + */ + +enum class ExecState +{ + EXEC_NOT_STARTED, + EXEC_STARTED, + EXEC_CB_STARTED, + EXEC_CB_FINISHED, + EXEC_FINISHED +}; + +struct UserData +{ + size_t size; + int* ptr; +}; + +// Global variable to check exection order +std::atomic gData(ExecState::EXEC_NOT_STARTED); + + +void myCallback(hipStream_t stream, hipError_t status, void* user_data) +{ + if(gData.load() != ExecState::EXEC_STARTED) + return; // Error hence return early + + gData.store(ExecState::EXEC_CB_STARTED); + + UserData* data = reinterpret_cast(user_data); + printf("Callback started\n"); + + sleep(1); + + printf("Callback ending.\n"); + gData.store(ExecState::EXEC_CB_FINISHED); +} + +bool test(int count) +{ + printf("\n============ Test iteration %d =============\n",count); + // Stream + hipStream_t stream; + bool result = true; + + gData.store(ExecState::EXEC_STARTED); + + HIPCHECK(hipStreamCreate(&stream)); + + // Array size + size_t size = 10000; + + // Device array + int *data = NULL; + HIPCHECK(hipMalloc((void**)&data, sizeof(int) * size)); + + // Initialize device array to -1 + HIPCHECK(hipMemset(data, -1, sizeof(int) * size)); + + // Host array + int *host = NULL; + HIPCHECK(hipHostMalloc((void**)&host, sizeof(int) * size)); + + // Print host ptr address + printf("In main thread\n"); + + // Initialize user_data for callback + UserData arg; + arg.size = size; + arg.ptr = host; + + // Synchronize device + HIPCHECK(hipDeviceSynchronize()); + + // Asynchronous copy from device to host + HIPCHECK(hipMemcpyAsync(host, data, sizeof(int) * size, hipMemcpyDeviceToHost, stream)); + + // Asynchronous memset on device + HIPCHECK(hipMemsetAsync(data, 0, sizeof(int) * size, stream)); + + // Add callback - should happen after hipMemsetAsync() + HIPCHECK(hipStreamAddCallback(stream, myCallback, &arg, 0)); + + printf("Will wait in main thread until callback completes\n"); + + //This should synchronize the stream (including the callback) + HIPCHECK(hipStreamSynchronize(stream)); + + if(gData.load() != ExecState::EXEC_CB_FINISHED) + { + std::cout<<"Callback is not finished\n"; + return false; + } + printf("Callback completed will resume main thread execution\n"); + + if(host[size/2] != -1) + { + // Print some host data that just got copied + printf("Pseudo host data printing (should be -1): %d\n", host[size/2]); + result = false; + } + + HIPCHECK(hipMemcpy(host, data, sizeof(int)*size, hipMemcpyDeviceToHost)); + + if(host[size-1] != 0) + { + printf("Pseudo host data printing (should be 0): %d\n", host[size-1]); + result = false; + } + + HIPCHECK(hipFree(data)); + HIPCHECK(hipHostFree(host)); + HIPCHECK(hipStreamDestroy(stream)); + + gData.store(ExecState::EXEC_FINISHED); + return result; +} + +int main() +{ + // Test involves multithreading hence running multiple times + // to make sure consitency in the behavior + bool status = true; + + for(int i=0; i < 10; i++){ + status = test(i+1); + if(status == false) + { + failed("Test Failed!\n"); + break; + } + } + + if(status == true) passed(); + return 0; +} diff --git a/projects/hip/tests/src/runtimeApi/stream/hipStreamAddCallbackCrossStream.cpp b/projects/hip/tests/src/runtimeApi/stream/hipStreamAddCallbackCrossStream.cpp new file mode 100644 index 0000000000..b2fb168fbc --- /dev/null +++ b/projects/hip/tests/src/runtimeApi/stream/hipStreamAddCallbackCrossStream.cpp @@ -0,0 +1,133 @@ +/* + Copyright (c) 2019-present Advanced Micro Devices, Inc. All rights reserved. + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +*/ + +/* HIT_START + * BUILD: %t %s ../../test_common.cpp NVCC_OPTIONS -std=c++11 + * TEST: %t + * HIT_END +*/ +#include "test_common.h" +#include +#include +#include + + +//Globals +const int workloadCount = 1000000; +std::mutex gMutx; +bool callbackCompleted = false; +std::condition_variable condVar; + +// Device function +__global__ void increment(int *data,int N) +{ + int i = blockIdx.x*blockDim.x + threadIdx.x; + + if(i < N) + data[i] = 1 + data[i]; +} + +struct USER_DATA +{ + int* result; // Data received from device + int* copyOfOriginalData; // Copy of initial data which will be used for validation +}; + +// Callback +void callback(hipStream_t event, hipError_t status, void *userData) +{ + USER_DATA *data = (USER_DATA *)userData; + + if(!(data == nullptr || data->result == nullptr || data->copyOfOriginalData == nullptr)) + { + for(int i=0;iresult[i] != data->copyOfOriginalData[i]+1) + { + std::cout<<"Error value : "<result[i]<<"| Expected value :"<copyOfOriginalData[i]+1<result = hResultData; + inputParam->copyOfOriginalData = hData; + + HIPCHECK(hipStreamCreate(&stream)); + HIPCHECK(hipStreamAddCallback(stream,callback,inputParam,0)); + + // Wait for stream add callback to complete + std::unique_lock l(gMutx); + + while(!callbackCompleted) + condVar.wait(l); + + // Will destroy device memory hence explicite hipFree is not needed + HIPCHECK(hipDeviceReset()); + + free(hData); + free(hResultData); + passed(); +} diff --git a/projects/hip/tests/src/runtimeApi/stream/hipStreamAddCallbackMultiThread.cpp b/projects/hip/tests/src/runtimeApi/stream/hipStreamAddCallbackMultiThread.cpp new file mode 100644 index 0000000000..3babf5869e --- /dev/null +++ b/projects/hip/tests/src/runtimeApi/stream/hipStreamAddCallbackMultiThread.cpp @@ -0,0 +1,178 @@ +/* + Copyright (c) 2019-present Advanced Micro Devices, Inc. All rights reserved. + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +*/ + +/* HIT_START + * BUILD: %t %s ../../test_common.cpp NVCC_OPTIONS -std=c++11 + * TEST: %t + * HIT_END +*/ + +#include +#include +#include +#include +#include "test_common.h" + +// Will indicate completion of callback added as part of hipStreamAddCallback +struct signal +{ + int completedThreads; + std::mutex mu; + std::condition_variable cv; +}; + +struct workload +{ + int _workloadId; + int _deviceID; + + int *copyOf_hData; // copy of host data which will be used to validation + int *hData; // will contain host data + int *dData; // device data will be stored + hipStream_t _stream;// stream on which data will be processed + + bool success; // start will be stored +}; + +// Global data +int numWorkloads = 8; +const int perWorkloadSize = 1000000; +signal completionSignal; + +// Kernel run on device +__global__ void increment(int *data, int N) +{ + int i = blockIdx.x * blockDim.x + threadIdx.x; + + if (i < N) + data[i] = data[i]+1; +} + +/* + * Method validates processed data array with saved copy and notifies via conditional variable to all waiting threads +*/ +void Analyze(hipStream_t event, hipError_t status, void *data) +{ + HIPCHECK(status); + + workload *W = (workload *) data; + + if(W != NULL) + { + W->success = true; + + for (int i=0; i< perWorkloadSize; ++i) + { + W->success &= (W->copyOf_hData[i] == (W->hData[i]+1)); + if(!W->success) + { + std::cout<<"\nExpected Data :"<<(W->hData[i]+1)<<" Current Data :"<copyOf_hData[i]< guard(completionSignal.mu); + completionSignal.completedThreads += 1; + + completionSignal.cv.notify_all(); +} + +/* + * Thread routine to launch workloads into separate stream + */ +void LaunchWorkload(void *args) +{ + workload *W = (workload *) args; + + if (W == nullptr) return; + + std::srand(std::time(nullptr)); + + HIPCHECK(hipSetDevice(W->_deviceID)); + + // Allocate memory + HIPCHECK(hipMalloc(&W->dData,perWorkloadSize*sizeof(int))); + + size_t s = perWorkloadSize*sizeof(int); + HIPCHECK(hipMemset(W->dData,0,s)) + + W->hData = (int *) malloc(perWorkloadSize*sizeof(int)); + W->copyOf_hData = (int *) malloc(perWorkloadSize*sizeof(int)); + + // Initialize host array + for(int i =0;ihData[i] = W->copyOf_hData[i] = std::rand() % perWorkloadSize; + } + + HIPCHECK(hipStreamCreate(&W->_stream)); + + dim3 block(256); + dim3 grid((perWorkloadSize + block.x-1) / block.x); + + HIPCHECK(hipMemcpyAsync(W->dData,W->hData,perWorkloadSize*sizeof(int),hipMemcpyHostToDevice,W->_stream)); + + hipLaunchKernelGGL(increment, grid, block,0, W->_stream,W->dData, perWorkloadSize); + + HIPCHECK(hipMemcpyAsync(W->copyOf_hData,W->dData,perWorkloadSize*sizeof(int),hipMemcpyDeviceToHost,W->_stream)); + + HIPCHECK(hipStreamAddCallback(W->_stream, Analyze, W,0)) +} + +int main(int argc, char* argv[]) { + + int numDevice = 0; + + HIPCHECK(hipGetDeviceCount(&numDevice)); + + std::thread workerThread[numWorkloads]; + + workload *workloads; + workloads = (workload *) malloc(numWorkloads * sizeof(workload)); + + for(int i =0;i l(completionSignal.mu); + while(completionSignal.completedThreads != numWorkloads) + { + completionSignal.cv.wait(l); + } + + //clean-up + hipDeviceReset(); + free(workloads); + + passed(); + return 0; +}