diff --git a/projects/hip/src/hip_hcc.cpp b/projects/hip/src/hip_hcc.cpp index cc83ca0657..e93490a08e 100644 --- a/projects/hip/src/hip_hcc.cpp +++ b/projects/hip/src/hip_hcc.cpp @@ -1499,21 +1499,18 @@ hipError_t ihipStreamSynchronize(TlsData *tls, hipStream_t stream) { return e; } -bool ihipStreamCallbackHandler(hsa_signal_value_t value, void* cbArgs) { +void ihipStreamCallbackHandler(ihipStreamCallback_t* cb) { hipError_t e = hipSuccess; - ihipStreamCallback_t* cb = static_cast (cbArgs); - - if(cb->comFuture.valid()) - cb->comFuture.wait(); + // Synchronize stream + tprintf(DB_SYNC, "ihipStreamCallbackHandler wait on stream %s\n", + ToString(cb->_stream).c_str()); + GET_TLS(); + e = ihipStreamSynchronize(tls, cb->_stream); // Call registered callback function cb->_callback(cb->_stream, e, cb->_userData); - - hsa_signal_store_screlease(cb->_signal,0); - delete cb; - return false; } //--- diff --git a/projects/hip/src/hip_hcc_internal.h b/projects/hip/src/hip_hcc_internal.h index 6337edd370..8ee47eba4a 100644 --- a/projects/hip/src/hip_hcc_internal.h +++ b/projects/hip/src/hip_hcc_internal.h @@ -650,9 +650,7 @@ class ihipStreamCallback_t { : _stream(stream), _callback(callback), _userData(userData) { }; hipStream_t _stream; - hsa_signal_t _signal; hipStreamCallback_t _callback; - hc::completion_future comFuture; void* _userData; }; @@ -970,7 +968,7 @@ hipError_t hipModuleGetFunctionEx(hipFunction_t* hfunc, hipModule_t hmod, hipStream_t ihipSyncAndResolveStream(hipStream_t, bool lockAcquired = 0); hipError_t ihipStreamSynchronize(TlsData *tls, hipStream_t stream); -bool ihipStreamCallbackHandler(hsa_signal_value_t value, void* cbArgs); +void ihipStreamCallbackHandler(ihipStreamCallback_t* cb); // Stream printf functions: inline std::ostream& operator<<(std::ostream& os, const ihipStream_t& s) { diff --git a/projects/hip/src/hip_stream.cpp b/projects/hip/src/hip_stream.cpp index bf3289bb49..e3e4975b7e 100644 --- a/projects/hip/src/hip_stream.cpp +++ b/projects/hip/src/hip_stream.cpp @@ -254,137 +254,14 @@ hipError_t hipStreamGetPriority(hipStream_t stream, int* priority) { //--- -void setCallbackPacket(hsa_queue_t* queue, - uint64_t& index, uint64_t& nextIndex, - hsa_barrier_and_packet_t** barrier1, - hsa_barrier_and_packet_t** barrier2){ - - uint64_t tempIndex = 0; - uint32_t mask = queue->size - 1; - hsa_barrier_and_packet_t* tempBarrier; - - // Check for empty packets - do{ - tempIndex = hsa_queue_load_write_index_scacquire(queue); - tempBarrier = &(((hsa_barrier_and_packet_t*)(queue->base_address))[tempIndex & mask]); - }while(!(tempBarrier->header & HSA_PACKET_TYPE_INVALID)); - - // Reserve two packets for two barriers - index = hsa_queue_add_write_index_scacquire(queue, 2); - - if(index > mask) - { - index = 0; - nextIndex = 1; - } - else if(index == mask) - nextIndex = 0; - else - nextIndex = index + 1; - - tempBarrier = new hsa_barrier_and_packet_t; - memset(tempBarrier, 0, sizeof(hsa_barrier_and_packet_t)); - tempBarrier->header = HSA_PACKET_TYPE_INVALID; - - // Barrier 1 - *barrier1 = &(((hsa_barrier_and_packet_t*)(queue->base_address))[index & mask]); - memcpy(*barrier1,tempBarrier,sizeof(hsa_barrier_and_packet_t)); - - // Barrier 2 - *barrier2 = &(((hsa_barrier_and_packet_t*)(queue->base_address))[nextIndex & mask]); - memcpy(*barrier2,tempBarrier,sizeof(hsa_barrier_and_packet_t)); - - delete tempBarrier; -} - hipError_t hipStreamAddCallback(hipStream_t stream, hipStreamCallback_t callback, void* userData, unsigned int flags) { - HIP_INIT_API(hipStreamAddCallback, stream, callback, userData, flags); hipError_t e = hipSuccess; - if(stream == hipStreamNull) - { - ihipCtx_t* device = ihipGetTlsDefaultCtx(); - stream = device->_defaultStream; - } - - stream = ihipSyncAndResolveStream(stream); - - // Lock the stream - LockedAccessor_StreamCrit_t crit(stream->criticalData()); - - // Device synchronization - hc::completion_future marker = crit->_av.create_marker(hc::system_scope); - - // 1. Lock the queue - hsa_queue_t* lockedQ = static_cast (crit->_av.acquire_locked_hsa_queue()); - - if(lockedQ == nullptr) - { - // No queue attached to stream hence exiting early - return ihipLogStatus(hipErrorMissingConfiguration); - } - - // 2. Allocate a singals - hsa_signal_t signal; - hsa_status_t status = hsa_signal_create(1, 0, NULL, &signal); - - if(status != HSA_STATUS_SUCCESS) - { - crit->_av.release_locked_hsa_queue(); - return ihipLogStatus(hipErrorInvalidValue); - } - - hsa_signal_t depSignal; - status = hsa_signal_create(1, 0, NULL, &depSignal); - - if(status != HSA_STATUS_SUCCESS) - { - crit->_av.release_locked_hsa_queue(); - return ihipLogStatus(hipErrorInvalidValue); - } - - // 3. Store callback details, will destroy allocation in callback handler + // Create a thread in detached mode to handle callback ihipStreamCallback_t* cb = new ihipStreamCallback_t(stream, callback, userData); - if(cb == nullptr) - { - crit->_av.release_locked_hsa_queue(); - return ihipLogStatus(hipErrorMemoryAllocation); - } - cb->_signal = depSignal; - cb->comFuture = marker ; - - // 4. Create barrier packets - uint64_t index ; - uint64_t nextIndex; - - hsa_barrier_and_packet_t* barrier; - hsa_barrier_and_packet_t* depBarrier; - - setCallbackPacket(lockedQ, index, nextIndex, &barrier, &depBarrier); - - barrier->completion_signal = signal; - - depBarrier->dep_signal[0] = depSignal; - - uint16_t header = (HSA_PACKET_TYPE_BARRIER_AND << HSA_PACKET_HEADER_TYPE)| 1 << HSA_PACKET_HEADER_BARRIER; - - // 5. Update packet header, - // Intentionally updated second barrier header before first in order to avoid race - depBarrier->header = header; - barrier->header = header; - - // 6. Trigger the doorbell - nextIndex = nextIndex + 1; - hsa_queue_store_write_index_screlease(lockedQ, nextIndex); - hsa_signal_store_relaxed(lockedQ->doorbell_signal, index+1); - - // 7. Release queue - crit->_av.release_locked_hsa_queue(); - - // 8. Register signal callback - hsa_amd_signal_async_handler(signal, HSA_SIGNAL_CONDITION_EQ, 0, ihipStreamCallbackHandler, cb); + std::thread(ihipStreamCallbackHandler, cb).detach(); return ihipLogStatus(e); } diff --git a/projects/hip/tests/src/runtimeApi/stream/StreamAddCallback.cpp b/projects/hip/tests/src/runtimeApi/stream/StreamAddCallback.cpp deleted file mode 100644 index e6492c7ce2..0000000000 --- a/projects/hip/tests/src/runtimeApi/stream/StreamAddCallback.cpp +++ /dev/null @@ -1,145 +0,0 @@ -#include -#include -#include -#include "test_common.h" -#include - -/* HIT_START - * BUILD: %t %s ../../test_common.cpp NVCC_OPTIONS -std=c++11 - * TEST: %t - * HIT_END - */ - -enum class ExecState -{ - EXEC_NOT_STARTED, - EXEC_STARTED, - EXEC_CB_STARTED, - EXEC_CB_FINISHED, - EXEC_FINISHED -}; - -struct UserData -{ - size_t size; - int* ptr; -}; - -// Global variable to check exection order -std::atomic gData(ExecState::EXEC_NOT_STARTED); - - -void myCallback(hipStream_t stream, hipError_t status, void* user_data) -{ - if(gData.load() != ExecState::EXEC_STARTED) - return; // Error hence return early - - gData.store(ExecState::EXEC_CB_STARTED); - - UserData* data = reinterpret_cast(user_data); - printf("Callback started\n"); - - sleep(1); - - printf("Callback ending.\n"); - gData.store(ExecState::EXEC_CB_FINISHED); -} - -bool test(int count) -{ - printf("\n============ Test iteration %d =============\n",count); - // Stream - hipStream_t stream; - bool result = true; - - gData.store(ExecState::EXEC_STARTED); - - HIPCHECK(hipStreamCreate(&stream)); - - // Array size - size_t size = 10000; - - // Device array - int *data = NULL; - HIPCHECK(hipMalloc((void**)&data, sizeof(int) * size)); - - // Initialize device array to -1 - HIPCHECK(hipMemset(data, -1, sizeof(int) * size)); - - // Host array - int *host = NULL; - HIPCHECK(hipHostMalloc((void**)&host, sizeof(int) * size)); - - // Print host ptr address - printf("In main thread\n"); - - // Initialize user_data for callback - UserData arg; - arg.size = size; - arg.ptr = host; - - // Synchronize device - HIPCHECK(hipDeviceSynchronize()); - - // Asynchronous copy from device to host - HIPCHECK(hipMemcpyAsync(host, data, sizeof(int) * size, hipMemcpyDeviceToHost, stream)); - - // Asynchronous memset on device - HIPCHECK(hipMemsetAsync(data, 0, sizeof(int) * size, stream)); - - // Add callback - should happen after hipMemsetAsync() - HIPCHECK(hipStreamAddCallback(stream, myCallback, &arg, 0)); - - printf("Will wait in main thread until callback completes\n"); - - //This should synchronize the stream (including the callback) - HIPCHECK(hipStreamSynchronize(stream)); - - if(gData.load() != ExecState::EXEC_CB_FINISHED) - { - std::cout<<"Callback is not finished\n"; - return false; - } - printf("Callback completed will resume main thread execution\n"); - - if(host[size/2] != -1) - { - // Print some host data that just got copied - printf("Pseudo host data printing (should be -1): %d\n", host[size/2]); - result = false; - } - - HIPCHECK(hipMemcpy(host, data, sizeof(int)*size, hipMemcpyDeviceToHost)); - - if(host[size-1] != 0) - { - printf("Pseudo host data printing (should be 0): %d\n", host[size-1]); - result = false; - } - - HIPCHECK(hipFree(data)); - HIPCHECK(hipHostFree(host)); - HIPCHECK(hipStreamDestroy(stream)); - - gData.store(ExecState::EXEC_FINISHED); - return result; -} - -int main() -{ - // Test involves multithreading hence running multiple times - // to make sure consitency in the behavior - bool status = true; - - for(int i=0; i < 10; i++){ - status = test(i+1); - if(status == false) - { - failed("Test Failed!\n"); - break; - } - } - - if(status == true) passed(); - return 0; -} diff --git a/projects/hip/tests/src/runtimeApi/stream/hipStreamAddCallbackCrossStream.cpp b/projects/hip/tests/src/runtimeApi/stream/hipStreamAddCallbackCrossStream.cpp deleted file mode 100644 index b2fb168fbc..0000000000 --- a/projects/hip/tests/src/runtimeApi/stream/hipStreamAddCallbackCrossStream.cpp +++ /dev/null @@ -1,133 +0,0 @@ -/* - Copyright (c) 2019-present Advanced Micro Devices, Inc. All rights reserved. - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - The above copyright notice and this permission notice shall be included in - all copies or substantial portions of the Software. - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - THE SOFTWARE. -*/ - -/* HIT_START - * BUILD: %t %s ../../test_common.cpp NVCC_OPTIONS -std=c++11 - * TEST: %t - * HIT_END -*/ -#include "test_common.h" -#include -#include -#include - - -//Globals -const int workloadCount = 1000000; -std::mutex gMutx; -bool callbackCompleted = false; -std::condition_variable condVar; - -// Device function -__global__ void increment(int *data,int N) -{ - int i = blockIdx.x*blockDim.x + threadIdx.x; - - if(i < N) - data[i] = 1 + data[i]; -} - -struct USER_DATA -{ - int* result; // Data received from device - int* copyOfOriginalData; // Copy of initial data which will be used for validation -}; - -// Callback -void callback(hipStream_t event, hipError_t status, void *userData) -{ - USER_DATA *data = (USER_DATA *)userData; - - if(!(data == nullptr || data->result == nullptr || data->copyOfOriginalData == nullptr)) - { - for(int i=0;iresult[i] != data->copyOfOriginalData[i]+1) - { - std::cout<<"Error value : "<result[i]<<"| Expected value :"<copyOfOriginalData[i]+1<result = hResultData; - inputParam->copyOfOriginalData = hData; - - HIPCHECK(hipStreamCreate(&stream)); - HIPCHECK(hipStreamAddCallback(stream,callback,inputParam,0)); - - // Wait for stream add callback to complete - std::unique_lock l(gMutx); - - while(!callbackCompleted) - condVar.wait(l); - - // Will destroy device memory hence explicite hipFree is not needed - HIPCHECK(hipDeviceReset()); - - free(hData); - free(hResultData); - passed(); -} diff --git a/projects/hip/tests/src/runtimeApi/stream/hipStreamAddCallbackMultiThread.cpp b/projects/hip/tests/src/runtimeApi/stream/hipStreamAddCallbackMultiThread.cpp deleted file mode 100644 index 3babf5869e..0000000000 --- a/projects/hip/tests/src/runtimeApi/stream/hipStreamAddCallbackMultiThread.cpp +++ /dev/null @@ -1,178 +0,0 @@ -/* - Copyright (c) 2019-present Advanced Micro Devices, Inc. All rights reserved. - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - The above copyright notice and this permission notice shall be included in - all copies or substantial portions of the Software. - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - THE SOFTWARE. -*/ - -/* HIT_START - * BUILD: %t %s ../../test_common.cpp NVCC_OPTIONS -std=c++11 - * TEST: %t - * HIT_END -*/ - -#include -#include -#include -#include -#include "test_common.h" - -// Will indicate completion of callback added as part of hipStreamAddCallback -struct signal -{ - int completedThreads; - std::mutex mu; - std::condition_variable cv; -}; - -struct workload -{ - int _workloadId; - int _deviceID; - - int *copyOf_hData; // copy of host data which will be used to validation - int *hData; // will contain host data - int *dData; // device data will be stored - hipStream_t _stream;// stream on which data will be processed - - bool success; // start will be stored -}; - -// Global data -int numWorkloads = 8; -const int perWorkloadSize = 1000000; -signal completionSignal; - -// Kernel run on device -__global__ void increment(int *data, int N) -{ - int i = blockIdx.x * blockDim.x + threadIdx.x; - - if (i < N) - data[i] = data[i]+1; -} - -/* - * Method validates processed data array with saved copy and notifies via conditional variable to all waiting threads -*/ -void Analyze(hipStream_t event, hipError_t status, void *data) -{ - HIPCHECK(status); - - workload *W = (workload *) data; - - if(W != NULL) - { - W->success = true; - - for (int i=0; i< perWorkloadSize; ++i) - { - W->success &= (W->copyOf_hData[i] == (W->hData[i]+1)); - if(!W->success) - { - std::cout<<"\nExpected Data :"<<(W->hData[i]+1)<<" Current Data :"<copyOf_hData[i]< guard(completionSignal.mu); - completionSignal.completedThreads += 1; - - completionSignal.cv.notify_all(); -} - -/* - * Thread routine to launch workloads into separate stream - */ -void LaunchWorkload(void *args) -{ - workload *W = (workload *) args; - - if (W == nullptr) return; - - std::srand(std::time(nullptr)); - - HIPCHECK(hipSetDevice(W->_deviceID)); - - // Allocate memory - HIPCHECK(hipMalloc(&W->dData,perWorkloadSize*sizeof(int))); - - size_t s = perWorkloadSize*sizeof(int); - HIPCHECK(hipMemset(W->dData,0,s)) - - W->hData = (int *) malloc(perWorkloadSize*sizeof(int)); - W->copyOf_hData = (int *) malloc(perWorkloadSize*sizeof(int)); - - // Initialize host array - for(int i =0;ihData[i] = W->copyOf_hData[i] = std::rand() % perWorkloadSize; - } - - HIPCHECK(hipStreamCreate(&W->_stream)); - - dim3 block(256); - dim3 grid((perWorkloadSize + block.x-1) / block.x); - - HIPCHECK(hipMemcpyAsync(W->dData,W->hData,perWorkloadSize*sizeof(int),hipMemcpyHostToDevice,W->_stream)); - - hipLaunchKernelGGL(increment, grid, block,0, W->_stream,W->dData, perWorkloadSize); - - HIPCHECK(hipMemcpyAsync(W->copyOf_hData,W->dData,perWorkloadSize*sizeof(int),hipMemcpyDeviceToHost,W->_stream)); - - HIPCHECK(hipStreamAddCallback(W->_stream, Analyze, W,0)) -} - -int main(int argc, char* argv[]) { - - int numDevice = 0; - - HIPCHECK(hipGetDeviceCount(&numDevice)); - - std::thread workerThread[numWorkloads]; - - workload *workloads; - workloads = (workload *) malloc(numWorkloads * sizeof(workload)); - - for(int i =0;i l(completionSignal.mu); - while(completionSignal.completedThreads != numWorkloads) - { - completionSignal.cv.wait(l); - } - - //clean-up - hipDeviceReset(); - free(workloads); - - passed(); - return 0; -}