Add HIP_MAX_QUEUES feature.
Includes some tricky manipulation of the locks for contexts and streams.
issue is that stealing a stream requires we lock the context to
walk the streams to find a victim. To avoid deadlock, we can't
have a stream locked when we lock the context. This implementation
releases the stream lock, then acquires the context and selects the
victim.
A more stable implemenation might be to copy the stream list
from a context so that a lock is not required to walk all streams.
Smart shared_ptr could be used to prevent the streams from being
deallocated during the walk.
[ROCm/hip commit: a3e0012567]
This commit is contained in:
@@ -265,11 +265,37 @@ ihipStream_t::~ihipStream_t()
|
||||
}
|
||||
|
||||
|
||||
inline void ihipStream_t::ensureHaveQueue(LockedAccessor_StreamCrit_t &streamCrit)
|
||||
{
|
||||
if (HIP_MAX_QUEUES && !streamCrit->_hasQueue) {
|
||||
|
||||
// To avoid deadlock, we have to release the stream lock before acquiring context lock.
|
||||
// Else we can get hung if another thread has the context lock is trying to get lock for this stream.
|
||||
// We lock it again below.
|
||||
streamCrit->munlock();
|
||||
|
||||
// Obtain mutex access to the device critical data, release by destructor
|
||||
LockedAccessor_CtxCrit_t ctxCrit(this->_ctx->criticalData());
|
||||
// TODO
|
||||
auto needyCritPtr = this->_criticalData.mlock();
|
||||
|
||||
// Second test to ensure we still need to steal the queue - another thread may have
|
||||
// snuck in here and already solved the issue.
|
||||
if (!needyCritPtr->_hasQueue) {
|
||||
needyCritPtr->_av = this->_ctx->stealActiveQueue(ctxCrit, this);
|
||||
}
|
||||
|
||||
streamCrit->_hasQueue = true;
|
||||
}
|
||||
assert(streamCrit->_hasQueue);
|
||||
}
|
||||
|
||||
|
||||
//Wait for all kernel and data copy commands in this stream to complete.
|
||||
//This signature should be used in routines that already have locked the stream mutex
|
||||
void ihipStream_t::wait(LockedAccessor_StreamCrit_t &crit, bool assertQueueEmpty)
|
||||
void ihipStream_t::wait(LockedAccessor_StreamCrit_t &crit)
|
||||
{
|
||||
if (! assertQueueEmpty) {
|
||||
if (crit->_hasQueue) {
|
||||
tprintf (DB_SYNC, "%s wait for queue-empty..\n", ToString(this).c_str());
|
||||
hc::hcWaitMode waitMode = hc::hcWaitModeActive;
|
||||
|
||||
@@ -294,6 +320,8 @@ void ihipStream_t::wait(LockedAccessor_StreamCrit_t &crit, bool assertQueueEmpty
|
||||
}
|
||||
|
||||
crit->_av.wait(waitMode);
|
||||
} else {
|
||||
tprintf (DB_SYNC, "%s wait for queue empty (done since stream has no physical queue).\n", ToString(this).c_str());
|
||||
}
|
||||
|
||||
crit->_kernelCnt = 0;
|
||||
@@ -301,11 +329,11 @@ void ihipStream_t::wait(LockedAccessor_StreamCrit_t &crit, bool assertQueueEmpty
|
||||
|
||||
//---
|
||||
//Wait for all kernel and data copy commands in this stream to complete.
|
||||
void ihipStream_t::locked_wait(bool assertQueueEmpty)
|
||||
void ihipStream_t::locked_wait()
|
||||
{
|
||||
LockedAccessor_StreamCrit_t crit(_criticalData);
|
||||
|
||||
wait(crit, assertQueueEmpty);
|
||||
wait(crit);
|
||||
|
||||
};
|
||||
|
||||
@@ -314,6 +342,8 @@ void ihipStream_t::locked_waitEvent(hipEvent_t event)
|
||||
{
|
||||
LockedAccessor_StreamCrit_t crit(_criticalData);
|
||||
|
||||
this->ensureHaveQueue(crit);
|
||||
|
||||
crit->_av.create_blocking_marker(event->_marker);
|
||||
}
|
||||
|
||||
@@ -324,6 +354,7 @@ void ihipStream_t::locked_recordEvent(hipEvent_t event)
|
||||
// Lock the stream to prevent simultaneous access
|
||||
LockedAccessor_StreamCrit_t crit(_criticalData);
|
||||
|
||||
this->ensureHaveQueue(crit);
|
||||
event->_marker = crit->_av.create_marker();
|
||||
}
|
||||
|
||||
@@ -361,19 +392,11 @@ LockedAccessor_StreamCrit_t ihipStream_t::lockopen_preKernelCommand()
|
||||
this->wait(crit);
|
||||
crit->_kernelCnt = 0;
|
||||
}
|
||||
crit->_kernelCnt++;
|
||||
|
||||
if (HIP_MAX_QUEUES && !crit->_hasQueue) {
|
||||
// Obtain mutex access to the device critical data, release by destructor
|
||||
LockedAccessor_CtxCrit_t ctxCrit(this->_ctx->criticalData());
|
||||
crit->_av = this->_ctx->stealActiveQueue(ctxCrit, this);
|
||||
crit->_hasQueue = true;
|
||||
}
|
||||
this->ensureHaveQueue(crit);
|
||||
|
||||
|
||||
|
||||
|
||||
assert(crit->_hasQueue);
|
||||
|
||||
return crit;
|
||||
}
|
||||
|
||||
@@ -896,16 +919,18 @@ std::string ihipCtx_t::toString() const
|
||||
};
|
||||
|
||||
|
||||
hc::accelerator_view
|
||||
ihipCtx_t::stealActiveQueue(LockedAccessor_CtxCrit_t &ctxCrit, ihipStream_t *needyStream )
|
||||
hc::accelerator_view
|
||||
ihipCtx_t::stealActiveQueue(LockedAccessor_CtxCrit_t &ctxCrit, ihipStream_t *needyStream)
|
||||
{
|
||||
|
||||
// TODO - review handling if queue can't be found.
|
||||
while (1) {
|
||||
|
||||
for (auto iter=ctxCrit->streams().begin(); iter != ctxCrit->streams().end(); iter++) {
|
||||
if (*iter != needyStream) {
|
||||
auto victimCritPtr = (*iter)->_criticalData.mtry_lock();
|
||||
if (victimCritPtr) {
|
||||
// try-lock succeeded:
|
||||
if (victimCritPtr->_hasQueue && (victimCritPtr->_kernelCnt == 0)) {
|
||||
|
||||
victimCritPtr->_hasQueue = false;
|
||||
@@ -913,16 +938,16 @@ ihipCtx_t::stealActiveQueue(LockedAccessor_CtxCrit_t &ctxCrit, ihipStream_t *nee
|
||||
tprintf(DB_SYNC, " stealActiveQueue from victim:%s to needy:%s\n",
|
||||
ToString(*iter).c_str(), ToString(needyStream).c_str());
|
||||
|
||||
hc::accelerator_view av = victimCritPtr->_av;
|
||||
|
||||
// TODO - cleanup to remove forced setting to N
|
||||
hc::accelerator_view av = victimCritPtr->_av;
|
||||
uint64_t *p = (uint64_t*)(&victimCritPtr->_av);
|
||||
*p = 0; // damage the victim av so attempt to use it will fault.
|
||||
|
||||
(*iter)->_criticalData.munlock();
|
||||
return av;
|
||||
} else {
|
||||
(*iter)->_criticalData.munlock();
|
||||
}
|
||||
return av;
|
||||
}
|
||||
(*iter)->_criticalData.munlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -935,7 +960,8 @@ ihipCtx_t::createOrStealQueue(LockedAccessor_CtxCrit_t &ctxCrit)
|
||||
{
|
||||
if (HIP_MAX_QUEUES && (ctxCrit->streams().size() >= HIP_MAX_QUEUES)) {
|
||||
// Steal a queue from an existing stream:
|
||||
return this->stealActiveQueue (ctxCrit, nullptr);
|
||||
hc::accelerator_view av = this->stealActiveQueue (ctxCrit, nullptr);
|
||||
return av;
|
||||
} else {
|
||||
// Create a new view
|
||||
return getWriteableDevice()->_acc.create_view();
|
||||
@@ -1838,6 +1864,7 @@ void ihipStream_t::locked_copySync(void* dst, const void* src, size_t sizeBytes,
|
||||
src, srcPtrInfo._hostPointer, srcPtrInfo._devicePointer, srcPtrInfo._sizeBytes,
|
||||
srcPtrInfo._appId, srcTracked, srcPtrInfo._isInDeviceMem);
|
||||
|
||||
this->ensureHaveQueue(crit);
|
||||
|
||||
#if USE_COPY_EXT_V2
|
||||
crit->_av.copy_ext(src, dst, sizeBytes, hcCopyDir, srcPtrInfo, dstPtrInfo, copyDevice ? ©Device->getDevice()->_acc : nullptr, forceUnpinnedCopy);
|
||||
@@ -1902,6 +1929,8 @@ void ihipStream_t::locked_copyAsync(void* dst, const void* src, size_t sizeBytes
|
||||
|
||||
// Perform fast asynchronous copy - we know copyDevice != NULL based on check above
|
||||
try {
|
||||
this->ensureHaveQueue(crit);
|
||||
|
||||
if (HIP_FORCE_SYNC_COPY) {
|
||||
#if USE_COPY_EXT_V2
|
||||
crit->_av.copy_ext (src, dst, sizeBytes, hcCopyDir, srcPtrInfo, dstPtrInfo, ©Device->getDevice()->_acc, forceUnpinnedCopy);
|
||||
@@ -1928,6 +1957,8 @@ void ihipStream_t::locked_copyAsync(void* dst, const void* src, size_t sizeBytes
|
||||
|
||||
} else {
|
||||
LockedAccessor_StreamCrit_t crit(_criticalData);
|
||||
|
||||
this->ensureHaveQueue(crit);
|
||||
#if USE_COPY_EXT_V2
|
||||
crit->_av.copy_ext(src, dst, sizeBytes, hcCopyDir, srcPtrInfo, dstPtrInfo, copyDevice ? ©Device->getDevice()->_acc : nullptr, forceUnpinnedCopy);
|
||||
#else
|
||||
@@ -1985,6 +2016,7 @@ hipError_t hipHccGetAccelerator(int deviceId, hc::accelerator *acc)
|
||||
|
||||
|
||||
//---
|
||||
// Warning - with HIP_MAX_QUEUES!=0 there is no mechanism to prevent accelerator_view from being re-assigned...
|
||||
hipError_t hipHccGetAcceleratorView(hipStream_t stream, hc::accelerator_view **av)
|
||||
{
|
||||
HIP_INIT_API(stream, av);
|
||||
@@ -1994,7 +2026,7 @@ hipError_t hipHccGetAcceleratorView(hipStream_t stream, hc::accelerator_view **a
|
||||
stream = device->_defaultStream;
|
||||
}
|
||||
|
||||
*av = stream->locked_getAv();
|
||||
*av = stream->locked_getAv(); // TODO - review.
|
||||
|
||||
hipError_t err = hipSuccess;
|
||||
return ihipLogStatus(err);
|
||||
|
||||
@@ -358,21 +358,21 @@ public:
|
||||
_autoUnlock(autoUnlock)
|
||||
|
||||
{
|
||||
tprintf(DB_SYNC, "lock criticalData=%p for %s\n", _criticalData, ToString(_criticalData->_parent).c_str());
|
||||
tprintf(DB_SYNC, "locking criticalData=%p for %s..\n", _criticalData, ToString(_criticalData->_parent).c_str());
|
||||
_criticalData->_mutex.lock();
|
||||
};
|
||||
|
||||
~LockedAccessor()
|
||||
{
|
||||
if (_autoUnlock) {
|
||||
tprintf(DB_SYNC, "auto-unlock criticalData=%p for %s\n", _criticalData, ToString(_criticalData->_parent).c_str());
|
||||
tprintf(DB_SYNC, "auto-unlocking criticalData=%p for %s...\n", _criticalData, ToString(_criticalData->_parent).c_str());
|
||||
_criticalData->_mutex.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void unlock()
|
||||
{
|
||||
tprintf(DB_SYNC, "unlock criticalData=%p for %s\n", _criticalData, ToString(_criticalData->_parent).c_str());
|
||||
tprintf(DB_SYNC, "unlocking criticalData=%p for %s...\n", _criticalData, ToString(_criticalData->_parent).c_str());
|
||||
_criticalData->_mutex.unlock();
|
||||
}
|
||||
|
||||
@@ -416,13 +416,13 @@ public:
|
||||
ihipStreamCriticalBase_t<StreamMutex> * mlock() { LockedBase<MUTEX_TYPE>::lock(); return this;};
|
||||
|
||||
void munlock() {
|
||||
tprintf(DB_SYNC, "munlock criticalData=%p for %s\n", this, ToString(this->_parent).c_str());
|
||||
tprintf(DB_SYNC, "munlocking criticalData=%p for %s...\n", this, ToString(this->_parent).c_str());
|
||||
LockedBase<MUTEX_TYPE>::unlock();
|
||||
};
|
||||
|
||||
ihipStreamCriticalBase_t<StreamMutex> * mtry_lock() {
|
||||
bool gotLock = LockedBase<MUTEX_TYPE>::try_lock() ;
|
||||
tprintf(DB_SYNC, "mtry_lock=%d criticalData=%p for %s\n", gotLock, this, ToString(this->_parent).c_str());
|
||||
tprintf(DB_SYNC, "mtry_locking=%d criticalData=%p for %s...\n", gotLock, this, ToString(this->_parent).c_str());
|
||||
return gotLock ? this: nullptr;
|
||||
};
|
||||
|
||||
@@ -476,7 +476,7 @@ public:
|
||||
void lockclose_postKernelCommand(const char *kernelName, hc::accelerator_view *av);
|
||||
|
||||
|
||||
void locked_wait(bool assertQueueEmpty=false);
|
||||
void locked_wait();
|
||||
|
||||
hc::accelerator_view* locked_getAv() { LockedAccessor_StreamCrit_t crit(_criticalData); return &(crit->_av); };
|
||||
|
||||
@@ -487,7 +487,7 @@ public:
|
||||
//---
|
||||
|
||||
// Use this if we already have the stream critical data mutex:
|
||||
void wait(LockedAccessor_StreamCrit_t &crit, bool assertQueueEmpty=false);
|
||||
void wait(LockedAccessor_StreamCrit_t &crit);
|
||||
|
||||
void launchModuleKernel(hc::accelerator_view av, hsa_signal_t signal,
|
||||
uint32_t blockDimX, uint32_t blockDimY, uint32_t blockDimZ,
|
||||
@@ -502,6 +502,7 @@ public:
|
||||
const ihipDevice_t * getDevice() const;
|
||||
ihipCtx_t * getCtx() const;
|
||||
|
||||
void ensureHaveQueue(LockedAccessor_StreamCrit_t &streamCrit);
|
||||
|
||||
public:
|
||||
//---
|
||||
@@ -693,8 +694,7 @@ public: // Functions:
|
||||
void locked_syncDefaultStream(bool waitOnSelf);
|
||||
|
||||
// Will allocate a queue and assign it to the needyStream:
|
||||
hc::accelerator_view stealActiveQueue(LockedAccessor_CtxCrit_t &ctxCrit,
|
||||
ihipStream_t *needyStream);
|
||||
hc::accelerator_view stealActiveQueue(LockedAccessor_CtxCrit_t &ctxCrit, ihipStream_t *needyStream);
|
||||
hc::accelerator_view createOrStealQueue(LockedAccessor_CtxCrit_t &ctxCrit);
|
||||
|
||||
ihipCtxCritical_t &criticalData() { return _criticalData; };
|
||||
|
||||
@@ -813,6 +813,8 @@ hipError_t hipMemsetAsync(void* dst, int value, size_t sizeBytes, hipStream_t s
|
||||
if (stream) {
|
||||
auto crit = stream->lockopen_preKernelCommand();
|
||||
|
||||
stream->ensureHaveQueue(crit);
|
||||
|
||||
hc::completion_future cf ;
|
||||
|
||||
if ((sizeBytes & 0x3) == 0) {
|
||||
@@ -863,6 +865,7 @@ hipError_t hipMemset(void* dst, int value, size_t sizeBytes )
|
||||
if (stream) {
|
||||
auto crit = stream->lockopen_preKernelCommand();
|
||||
|
||||
stream->ensureHaveQueue(crit);
|
||||
hc::completion_future cf ;
|
||||
|
||||
if ((sizeBytes & 0x3) == 0) {
|
||||
|
||||
@@ -48,6 +48,7 @@ hipError_t ihipStreamCreate(hipStream_t *stream, unsigned int flags)
|
||||
{
|
||||
// Obtain mutex access to the device critical data, release by destructor
|
||||
LockedAccessor_CtxCrit_t ctxCrit(ctx->criticalData());
|
||||
|
||||
auto istream = new ihipStream_t(ctx, ctx->createOrStealQueue(ctxCrit), flags);
|
||||
|
||||
ctxCrit->addStream(istream);
|
||||
@@ -124,8 +125,14 @@ hipError_t hipStreamQuery(hipStream_t stream)
|
||||
stream = device->_defaultStream;
|
||||
}
|
||||
|
||||
LockedAccessor_StreamCrit_t crit(stream->_criticalData);
|
||||
int pendingOps = crit->_av.get_pending_async_ops();
|
||||
int pendingOps = 0;
|
||||
|
||||
{
|
||||
LockedAccessor_StreamCrit_t crit(stream->_criticalData);
|
||||
if (crit->_hasQueue) {
|
||||
pendingOps = crit->_av.get_pending_async_ops();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
hipError_t e = (pendingOps > 0) ? hipErrorNotReady : hipSuccess;
|
||||
|
||||
Reference in New Issue
Block a user