Revert [HIP] Fixed hipStreamAddCallback (#1674)

This reverts commit fa1e44aa0e.
Addresses SWDEV#212675.

[ROCm/hip commit: 153a959280]
This commit is contained in:
Sarbojit2019
2019-11-20 11:55:46 +05:30
committed by Maneesh Gupta
parent 492248ef12
commit 6679cd3998
6 changed files with 9 additions and 593 deletions
+6 -9
View File
@@ -1499,21 +1499,18 @@ hipError_t ihipStreamSynchronize(TlsData *tls, hipStream_t stream) {
return e;
}
bool ihipStreamCallbackHandler(hsa_signal_value_t value, void* cbArgs) {
void ihipStreamCallbackHandler(ihipStreamCallback_t* cb) {
hipError_t e = hipSuccess;
ihipStreamCallback_t* cb = static_cast<ihipStreamCallback_t*> (cbArgs);
if(cb->comFuture.valid())
cb->comFuture.wait();
// Synchronize stream
tprintf(DB_SYNC, "ihipStreamCallbackHandler wait on stream %s\n",
ToString(cb->_stream).c_str());
GET_TLS();
e = ihipStreamSynchronize(tls, cb->_stream);
// Call registered callback function
cb->_callback(cb->_stream, e, cb->_userData);
hsa_signal_store_screlease(cb->_signal,0);
delete cb;
return false;
}
//---
+1 -3
View File
@@ -650,9 +650,7 @@ class ihipStreamCallback_t {
: _stream(stream), _callback(callback), _userData(userData) {
};
hipStream_t _stream;
hsa_signal_t _signal;
hipStreamCallback_t _callback;
hc::completion_future comFuture;
void* _userData;
};
@@ -970,7 +968,7 @@ hipError_t hipModuleGetFunctionEx(hipFunction_t* hfunc, hipModule_t hmod,
hipStream_t ihipSyncAndResolveStream(hipStream_t, bool lockAcquired = 0);
hipError_t ihipStreamSynchronize(TlsData *tls, hipStream_t stream);
bool ihipStreamCallbackHandler(hsa_signal_value_t value, void* cbArgs);
void ihipStreamCallbackHandler(ihipStreamCallback_t* cb);
// Stream printf functions:
inline std::ostream& operator<<(std::ostream& os, const ihipStream_t& s) {
+2 -125
View File
@@ -254,137 +254,14 @@ hipError_t hipStreamGetPriority(hipStream_t stream, int* priority) {
//---
void setCallbackPacket(hsa_queue_t* queue,
uint64_t& index, uint64_t& nextIndex,
hsa_barrier_and_packet_t** barrier1,
hsa_barrier_and_packet_t** barrier2){
uint64_t tempIndex = 0;
uint32_t mask = queue->size - 1;
hsa_barrier_and_packet_t* tempBarrier;
// Check for empty packets
do{
tempIndex = hsa_queue_load_write_index_scacquire(queue);
tempBarrier = &(((hsa_barrier_and_packet_t*)(queue->base_address))[tempIndex & mask]);
}while(!(tempBarrier->header & HSA_PACKET_TYPE_INVALID));
// Reserve two packets for two barriers
index = hsa_queue_add_write_index_scacquire(queue, 2);
if(index > mask)
{
index = 0;
nextIndex = 1;
}
else if(index == mask)
nextIndex = 0;
else
nextIndex = index + 1;
tempBarrier = new hsa_barrier_and_packet_t;
memset(tempBarrier, 0, sizeof(hsa_barrier_and_packet_t));
tempBarrier->header = HSA_PACKET_TYPE_INVALID;
// Barrier 1
*barrier1 = &(((hsa_barrier_and_packet_t*)(queue->base_address))[index & mask]);
memcpy(*barrier1,tempBarrier,sizeof(hsa_barrier_and_packet_t));
// Barrier 2
*barrier2 = &(((hsa_barrier_and_packet_t*)(queue->base_address))[nextIndex & mask]);
memcpy(*barrier2,tempBarrier,sizeof(hsa_barrier_and_packet_t));
delete tempBarrier;
}
hipError_t hipStreamAddCallback(hipStream_t stream, hipStreamCallback_t callback, void* userData,
unsigned int flags) {
HIP_INIT_API(hipStreamAddCallback, stream, callback, userData, flags);
hipError_t e = hipSuccess;
if(stream == hipStreamNull)
{
ihipCtx_t* device = ihipGetTlsDefaultCtx();
stream = device->_defaultStream;
}
stream = ihipSyncAndResolveStream(stream);
// Lock the stream
LockedAccessor_StreamCrit_t crit(stream->criticalData());
// Device synchronization
hc::completion_future marker = crit->_av.create_marker(hc::system_scope);
// 1. Lock the queue
hsa_queue_t* lockedQ = static_cast<hsa_queue_t*> (crit->_av.acquire_locked_hsa_queue());
if(lockedQ == nullptr)
{
// No queue attached to stream hence exiting early
return ihipLogStatus(hipErrorMissingConfiguration);
}
// 2. Allocate a singals
hsa_signal_t signal;
hsa_status_t status = hsa_signal_create(1, 0, NULL, &signal);
if(status != HSA_STATUS_SUCCESS)
{
crit->_av.release_locked_hsa_queue();
return ihipLogStatus(hipErrorInvalidValue);
}
hsa_signal_t depSignal;
status = hsa_signal_create(1, 0, NULL, &depSignal);
if(status != HSA_STATUS_SUCCESS)
{
crit->_av.release_locked_hsa_queue();
return ihipLogStatus(hipErrorInvalidValue);
}
// 3. Store callback details, will destroy allocation in callback handler
// Create a thread in detached mode to handle callback
ihipStreamCallback_t* cb = new ihipStreamCallback_t(stream, callback, userData);
if(cb == nullptr)
{
crit->_av.release_locked_hsa_queue();
return ihipLogStatus(hipErrorMemoryAllocation);
}
cb->_signal = depSignal;
cb->comFuture = marker ;
// 4. Create barrier packets
uint64_t index ;
uint64_t nextIndex;
hsa_barrier_and_packet_t* barrier;
hsa_barrier_and_packet_t* depBarrier;
setCallbackPacket(lockedQ, index, nextIndex, &barrier, &depBarrier);
barrier->completion_signal = signal;
depBarrier->dep_signal[0] = depSignal;
uint16_t header = (HSA_PACKET_TYPE_BARRIER_AND << HSA_PACKET_HEADER_TYPE)| 1 << HSA_PACKET_HEADER_BARRIER;
// 5. Update packet header,
// Intentionally updated second barrier header before first in order to avoid race
depBarrier->header = header;
barrier->header = header;
// 6. Trigger the doorbell
nextIndex = nextIndex + 1;
hsa_queue_store_write_index_screlease(lockedQ, nextIndex);
hsa_signal_store_relaxed(lockedQ->doorbell_signal, index+1);
// 7. Release queue
crit->_av.release_locked_hsa_queue();
// 8. Register signal callback
hsa_amd_signal_async_handler(signal, HSA_SIGNAL_CONDITION_EQ, 0, ihipStreamCallbackHandler, cb);
std::thread(ihipStreamCallbackHandler, cb).detach();
return ihipLogStatus(e);
}
@@ -1,145 +0,0 @@
#include <stdio.h>
#include <hip/hip_runtime.h>
#include <unistd.h>
#include "test_common.h"
#include <atomic>
/* HIT_START
* BUILD: %t %s ../../test_common.cpp NVCC_OPTIONS -std=c++11
* TEST: %t
* HIT_END
*/
enum class ExecState
{
EXEC_NOT_STARTED,
EXEC_STARTED,
EXEC_CB_STARTED,
EXEC_CB_FINISHED,
EXEC_FINISHED
};
struct UserData
{
size_t size;
int* ptr;
};
// Global variable to check exection order
std::atomic<ExecState> gData(ExecState::EXEC_NOT_STARTED);
void myCallback(hipStream_t stream, hipError_t status, void* user_data)
{
if(gData.load() != ExecState::EXEC_STARTED)
return; // Error hence return early
gData.store(ExecState::EXEC_CB_STARTED);
UserData* data = reinterpret_cast<UserData*>(user_data);
printf("Callback started\n");
sleep(1);
printf("Callback ending.\n");
gData.store(ExecState::EXEC_CB_FINISHED);
}
bool test(int count)
{
printf("\n============ Test iteration %d =============\n",count);
// Stream
hipStream_t stream;
bool result = true;
gData.store(ExecState::EXEC_STARTED);
HIPCHECK(hipStreamCreate(&stream));
// Array size
size_t size = 10000;
// Device array
int *data = NULL;
HIPCHECK(hipMalloc((void**)&data, sizeof(int) * size));
// Initialize device array to -1
HIPCHECK(hipMemset(data, -1, sizeof(int) * size));
// Host array
int *host = NULL;
HIPCHECK(hipHostMalloc((void**)&host, sizeof(int) * size));
// Print host ptr address
printf("In main thread\n");
// Initialize user_data for callback
UserData arg;
arg.size = size;
arg.ptr = host;
// Synchronize device
HIPCHECK(hipDeviceSynchronize());
// Asynchronous copy from device to host
HIPCHECK(hipMemcpyAsync(host, data, sizeof(int) * size, hipMemcpyDeviceToHost, stream));
// Asynchronous memset on device
HIPCHECK(hipMemsetAsync(data, 0, sizeof(int) * size, stream));
// Add callback - should happen after hipMemsetAsync()
HIPCHECK(hipStreamAddCallback(stream, myCallback, &arg, 0));
printf("Will wait in main thread until callback completes\n");
//This should synchronize the stream (including the callback)
HIPCHECK(hipStreamSynchronize(stream));
if(gData.load() != ExecState::EXEC_CB_FINISHED)
{
std::cout<<"Callback is not finished\n";
return false;
}
printf("Callback completed will resume main thread execution\n");
if(host[size/2] != -1)
{
// Print some host data that just got copied
printf("Pseudo host data printing (should be -1): %d\n", host[size/2]);
result = false;
}
HIPCHECK(hipMemcpy(host, data, sizeof(int)*size, hipMemcpyDeviceToHost));
if(host[size-1] != 0)
{
printf("Pseudo host data printing (should be 0): %d\n", host[size-1]);
result = false;
}
HIPCHECK(hipFree(data));
HIPCHECK(hipHostFree(host));
HIPCHECK(hipStreamDestroy(stream));
gData.store(ExecState::EXEC_FINISHED);
return result;
}
int main()
{
// Test involves multithreading hence running multiple times
// to make sure consitency in the behavior
bool status = true;
for(int i=0; i < 10; i++){
status = test(i+1);
if(status == false)
{
failed("Test Failed!\n");
break;
}
}
if(status == true) passed();
return 0;
}
@@ -1,133 +0,0 @@
/*
Copyright (c) 2019-present Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
/* HIT_START
* BUILD: %t %s ../../test_common.cpp NVCC_OPTIONS -std=c++11
* TEST: %t
* HIT_END
*/
#include "test_common.h"
#include <mutex>
#include <condition_variable>
#include <stdlib.h>
//Globals
const int workloadCount = 1000000;
std::mutex gMutx;
bool callbackCompleted = false;
std::condition_variable condVar;
// Device function
__global__ void increment(int *data,int N)
{
int i = blockIdx.x*blockDim.x + threadIdx.x;
if(i < N)
data[i] = 1 + data[i];
}
struct USER_DATA
{
int* result; // Data received from device
int* copyOfOriginalData; // Copy of initial data which will be used for validation
};
// Callback
void callback(hipStream_t event, hipError_t status, void *userData)
{
USER_DATA *data = (USER_DATA *)userData;
if(!(data == nullptr || data->result == nullptr || data->copyOfOriginalData == nullptr))
{
for(int i=0;i<workloadCount;i++)
{
if(data->result[i] != data->copyOfOriginalData[i]+1)
{
std::cout<<"Error value : "<<data->result[i]<<"| Expected value :"<<data->copyOfOriginalData[i]+1<<std::endl;
break;
}
}
}
callbackCompleted = true;
condVar.notify_all();
}
int main()
{
int *hData = nullptr;
int *dData = nullptr;
int *hResultData = nullptr;
int devCount = 0;
int size = workloadCount * sizeof(int);
// query device count
HIPCHECK(hipGetDeviceCount(&devCount));
// Allocate
// Host allocation
hData = (int *)malloc(size);
hResultData = (int *)malloc(size);
if(hData == nullptr || hResultData == nullptr)
{
HIPCHECK(hipErrorInvalidValue);
}
// Initialize host data
for(int i =0; i<workloadCount; i++)
{
hData[i] = rand()%workloadCount;
}
// Device allocation
HIPCHECK(hipMalloc(&dData,size));
HIPCHECK(hipMemcpyAsync(dData,hData,size,hipMemcpyHostToDevice,0));
dim3 block(256);
dim3 grid((workloadCount + block.x-1) / block.x);
hipLaunchKernelGGL(increment, grid, block, 0, 0, dData, workloadCount);
HIPCHECK(hipMemcpyAsync(hResultData, dData, size, hipMemcpyDeviceToHost, 0));
hipStream_t stream;
USER_DATA *inputParam = (USER_DATA*)malloc(sizeof(USER_DATA));
if(inputParam == nullptr) return 0;
inputParam->result = hResultData;
inputParam->copyOfOriginalData = hData;
HIPCHECK(hipStreamCreate(&stream));
HIPCHECK(hipStreamAddCallback(stream,callback,inputParam,0));
// Wait for stream add callback to complete
std::unique_lock<std::mutex> l(gMutx);
while(!callbackCompleted)
condVar.wait(l);
// Will destroy device memory hence explicite hipFree is not needed
HIPCHECK(hipDeviceReset());
free(hData);
free(hResultData);
passed();
}
@@ -1,178 +0,0 @@
/*
Copyright (c) 2019-present Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
/* HIT_START
* BUILD: %t %s ../../test_common.cpp NVCC_OPTIONS -std=c++11
* TEST: %t
* HIT_END
*/
#include <thread>
#include <unistd.h>
#include <mutex>
#include <condition_variable>
#include "test_common.h"
// Will indicate completion of callback added as part of hipStreamAddCallback
struct signal
{
int completedThreads;
std::mutex mu;
std::condition_variable cv;
};
struct workload
{
int _workloadId;
int _deviceID;
int *copyOf_hData; // copy of host data which will be used to validation
int *hData; // will contain host data
int *dData; // device data will be stored
hipStream_t _stream;// stream on which data will be processed
bool success; // start will be stored
};
// Global data
int numWorkloads = 8;
const int perWorkloadSize = 1000000;
signal completionSignal;
// Kernel run on device
__global__ void increment(int *data, int N)
{
int i = blockIdx.x * blockDim.x + threadIdx.x;
if (i < N)
data[i] = data[i]+1;
}
/*
* Method validates processed data array with saved copy and notifies via conditional variable to all waiting threads
*/
void Analyze(hipStream_t event, hipError_t status, void *data)
{
HIPCHECK(status);
workload *W = (workload *) data;
if(W != NULL)
{
W->success = true;
for (int i=0; i< perWorkloadSize; ++i)
{
W->success &= (W->copyOf_hData[i] == (W->hData[i]+1));
if(!W->success)
{
std::cout<<"\nExpected Data :"<<(W->hData[i]+1)<<" Current Data :"<<W->copyOf_hData[i]<<std::endl;
break;
}
}
}
std::lock_guard<std::mutex> guard(completionSignal.mu);
completionSignal.completedThreads += 1;
completionSignal.cv.notify_all();
}
/*
* Thread routine to launch workloads into separate stream
*/
void LaunchWorkload(void *args)
{
workload *W = (workload *) args;
if (W == nullptr) return;
std::srand(std::time(nullptr));
HIPCHECK(hipSetDevice(W->_deviceID));
// Allocate memory
HIPCHECK(hipMalloc(&W->dData,perWorkloadSize*sizeof(int)));
size_t s = perWorkloadSize*sizeof(int);
HIPCHECK(hipMemset(W->dData,0,s))
W->hData = (int *) malloc(perWorkloadSize*sizeof(int));
W->copyOf_hData = (int *) malloc(perWorkloadSize*sizeof(int));
// Initialize host array
for(int i =0;i<perWorkloadSize;i++)
{
W->hData[i] = W->copyOf_hData[i] = std::rand() % perWorkloadSize;
}
HIPCHECK(hipStreamCreate(&W->_stream));
dim3 block(256);
dim3 grid((perWorkloadSize + block.x-1) / block.x);
HIPCHECK(hipMemcpyAsync(W->dData,W->hData,perWorkloadSize*sizeof(int),hipMemcpyHostToDevice,W->_stream));
hipLaunchKernelGGL(increment, grid, block,0, W->_stream,W->dData, perWorkloadSize);
HIPCHECK(hipMemcpyAsync(W->copyOf_hData,W->dData,perWorkloadSize*sizeof(int),hipMemcpyDeviceToHost,W->_stream));
HIPCHECK(hipStreamAddCallback(W->_stream, Analyze, W,0))
}
int main(int argc, char* argv[]) {
int numDevice = 0;
HIPCHECK(hipGetDeviceCount(&numDevice));
std::thread workerThread[numWorkloads];
workload *workloads;
workloads = (workload *) malloc(numWorkloads * sizeof(workload));
for(int i =0;i<numWorkloads; i++)
{
workloads[i]._workloadId = i;
workloads[i]._deviceID = i%numDevice;
// launch threads
workerThread[i] = std::thread(LaunchWorkload,&workloads[i]);
}
// should wait for joining all threads
for(int i =0; i<numWorkloads; i++)
{
workerThread[i].join();
}
// wait util all callbacks are done
std::unique_lock<std::mutex> l(completionSignal.mu);
while(completionSignal.completedThreads != numWorkloads)
{
completionSignal.cv.wait(l);
}
//clean-up
hipDeviceReset();
free(workloads);
passed();
return 0;
}