From b29fbf736dc76414c41336fae2724348bd2caaad Mon Sep 17 00:00:00 2001 From: Ben Sander Date: Mon, 9 Jan 2017 17:19:40 -0600 Subject: [PATCH] Add HIP_MAX_QUEUES feature. Includes some tricky manipulation of the locks for contexts and streams. issue is that stealing a stream requires we lock the context to walk the streams to find a victim. To avoid deadlock, we can't have a stream locked when we lock the context. This implementation releases the stream lock, then acquires the context and selects the victim. A more stable implemenation might be to copy the stream list from a context so that a lock is not required to walk all streams. Smart shared_ptr could be used to prevent the streams from being deallocated during the walk. --- hipamd/src/hip_hcc.cpp | 78 +++++++++++++++++++++++++++------------ hipamd/src/hip_hcc.h | 18 ++++----- hipamd/src/hip_memory.cpp | 3 ++ hipamd/src/hip_stream.cpp | 11 +++++- 4 files changed, 76 insertions(+), 34 deletions(-) diff --git a/hipamd/src/hip_hcc.cpp b/hipamd/src/hip_hcc.cpp index 3bb3d6d128..e5b7937e25 100644 --- a/hipamd/src/hip_hcc.cpp +++ b/hipamd/src/hip_hcc.cpp @@ -265,11 +265,37 @@ ihipStream_t::~ihipStream_t() } +inline void ihipStream_t::ensureHaveQueue(LockedAccessor_StreamCrit_t &streamCrit) +{ + if (HIP_MAX_QUEUES && !streamCrit->_hasQueue) { + + // To avoid deadlock, we have to release the stream lock before acquiring context lock. + // Else we can get hung if another thread has the context lock is trying to get lock for this stream. + // We lock it again below. + streamCrit->munlock(); + + // Obtain mutex access to the device critical data, release by destructor + LockedAccessor_CtxCrit_t ctxCrit(this->_ctx->criticalData()); + // TODO + auto needyCritPtr = this->_criticalData.mlock(); + + // Second test to ensure we still need to steal the queue - another thread may have + // snuck in here and already solved the issue. + if (!needyCritPtr->_hasQueue) { + needyCritPtr->_av = this->_ctx->stealActiveQueue(ctxCrit, this); + } + + streamCrit->_hasQueue = true; + } + assert(streamCrit->_hasQueue); +} + + //Wait for all kernel and data copy commands in this stream to complete. //This signature should be used in routines that already have locked the stream mutex -void ihipStream_t::wait(LockedAccessor_StreamCrit_t &crit, bool assertQueueEmpty) +void ihipStream_t::wait(LockedAccessor_StreamCrit_t &crit) { - if (! assertQueueEmpty) { + if (crit->_hasQueue) { tprintf (DB_SYNC, "%s wait for queue-empty..\n", ToString(this).c_str()); hc::hcWaitMode waitMode = hc::hcWaitModeActive; @@ -294,6 +320,8 @@ void ihipStream_t::wait(LockedAccessor_StreamCrit_t &crit, bool assertQueueEmpty } crit->_av.wait(waitMode); + } else { + tprintf (DB_SYNC, "%s wait for queue empty (done since stream has no physical queue).\n", ToString(this).c_str()); } crit->_kernelCnt = 0; @@ -301,11 +329,11 @@ void ihipStream_t::wait(LockedAccessor_StreamCrit_t &crit, bool assertQueueEmpty //--- //Wait for all kernel and data copy commands in this stream to complete. -void ihipStream_t::locked_wait(bool assertQueueEmpty) +void ihipStream_t::locked_wait() { LockedAccessor_StreamCrit_t crit(_criticalData); - wait(crit, assertQueueEmpty); + wait(crit); }; @@ -314,6 +342,8 @@ void ihipStream_t::locked_waitEvent(hipEvent_t event) { LockedAccessor_StreamCrit_t crit(_criticalData); + this->ensureHaveQueue(crit); + crit->_av.create_blocking_marker(event->_marker); } @@ -324,6 +354,7 @@ void ihipStream_t::locked_recordEvent(hipEvent_t event) // Lock the stream to prevent simultaneous access LockedAccessor_StreamCrit_t crit(_criticalData); + this->ensureHaveQueue(crit); event->_marker = crit->_av.create_marker(); } @@ -361,19 +392,11 @@ LockedAccessor_StreamCrit_t ihipStream_t::lockopen_preKernelCommand() this->wait(crit); crit->_kernelCnt = 0; } - crit->_kernelCnt++; - if (HIP_MAX_QUEUES && !crit->_hasQueue) { - // Obtain mutex access to the device critical data, release by destructor - LockedAccessor_CtxCrit_t ctxCrit(this->_ctx->criticalData()); - crit->_av = this->_ctx->stealActiveQueue(ctxCrit, this); - crit->_hasQueue = true; - } + this->ensureHaveQueue(crit); + - - assert(crit->_hasQueue); - return crit; } @@ -896,16 +919,18 @@ std::string ihipCtx_t::toString() const }; -hc::accelerator_view -ihipCtx_t::stealActiveQueue(LockedAccessor_CtxCrit_t &ctxCrit, ihipStream_t *needyStream ) +hc::accelerator_view +ihipCtx_t::stealActiveQueue(LockedAccessor_CtxCrit_t &ctxCrit, ihipStream_t *needyStream) { // TODO - review handling if queue can't be found. while (1) { + for (auto iter=ctxCrit->streams().begin(); iter != ctxCrit->streams().end(); iter++) { if (*iter != needyStream) { auto victimCritPtr = (*iter)->_criticalData.mtry_lock(); if (victimCritPtr) { + // try-lock succeeded: if (victimCritPtr->_hasQueue && (victimCritPtr->_kernelCnt == 0)) { victimCritPtr->_hasQueue = false; @@ -913,16 +938,16 @@ ihipCtx_t::stealActiveQueue(LockedAccessor_CtxCrit_t &ctxCrit, ihipStream_t *nee tprintf(DB_SYNC, " stealActiveQueue from victim:%s to needy:%s\n", ToString(*iter).c_str(), ToString(needyStream).c_str()); + hc::accelerator_view av = victimCritPtr->_av; + // TODO - cleanup to remove forced setting to N - hc::accelerator_view av = victimCritPtr->_av; uint64_t *p = (uint64_t*)(&victimCritPtr->_av); *p = 0; // damage the victim av so attempt to use it will fault. (*iter)->_criticalData.munlock(); - return av; - } else { - (*iter)->_criticalData.munlock(); - } + return av; + } + (*iter)->_criticalData.munlock(); } } } @@ -935,7 +960,8 @@ ihipCtx_t::createOrStealQueue(LockedAccessor_CtxCrit_t &ctxCrit) { if (HIP_MAX_QUEUES && (ctxCrit->streams().size() >= HIP_MAX_QUEUES)) { // Steal a queue from an existing stream: - return this->stealActiveQueue (ctxCrit, nullptr); + hc::accelerator_view av = this->stealActiveQueue (ctxCrit, nullptr); + return av; } else { // Create a new view return getWriteableDevice()->_acc.create_view(); @@ -1838,6 +1864,7 @@ void ihipStream_t::locked_copySync(void* dst, const void* src, size_t sizeBytes, src, srcPtrInfo._hostPointer, srcPtrInfo._devicePointer, srcPtrInfo._sizeBytes, srcPtrInfo._appId, srcTracked, srcPtrInfo._isInDeviceMem); + this->ensureHaveQueue(crit); #if USE_COPY_EXT_V2 crit->_av.copy_ext(src, dst, sizeBytes, hcCopyDir, srcPtrInfo, dstPtrInfo, copyDevice ? ©Device->getDevice()->_acc : nullptr, forceUnpinnedCopy); @@ -1902,6 +1929,8 @@ void ihipStream_t::locked_copyAsync(void* dst, const void* src, size_t sizeBytes // Perform fast asynchronous copy - we know copyDevice != NULL based on check above try { + this->ensureHaveQueue(crit); + if (HIP_FORCE_SYNC_COPY) { #if USE_COPY_EXT_V2 crit->_av.copy_ext (src, dst, sizeBytes, hcCopyDir, srcPtrInfo, dstPtrInfo, ©Device->getDevice()->_acc, forceUnpinnedCopy); @@ -1928,6 +1957,8 @@ void ihipStream_t::locked_copyAsync(void* dst, const void* src, size_t sizeBytes } else { LockedAccessor_StreamCrit_t crit(_criticalData); + + this->ensureHaveQueue(crit); #if USE_COPY_EXT_V2 crit->_av.copy_ext(src, dst, sizeBytes, hcCopyDir, srcPtrInfo, dstPtrInfo, copyDevice ? ©Device->getDevice()->_acc : nullptr, forceUnpinnedCopy); #else @@ -1985,6 +2016,7 @@ hipError_t hipHccGetAccelerator(int deviceId, hc::accelerator *acc) //--- +// Warning - with HIP_MAX_QUEUES!=0 there is no mechanism to prevent accelerator_view from being re-assigned... hipError_t hipHccGetAcceleratorView(hipStream_t stream, hc::accelerator_view **av) { HIP_INIT_API(stream, av); @@ -1994,7 +2026,7 @@ hipError_t hipHccGetAcceleratorView(hipStream_t stream, hc::accelerator_view **a stream = device->_defaultStream; } - *av = stream->locked_getAv(); + *av = stream->locked_getAv(); // TODO - review. hipError_t err = hipSuccess; return ihipLogStatus(err); diff --git a/hipamd/src/hip_hcc.h b/hipamd/src/hip_hcc.h index 876e5df816..e19ce63263 100644 --- a/hipamd/src/hip_hcc.h +++ b/hipamd/src/hip_hcc.h @@ -358,21 +358,21 @@ public: _autoUnlock(autoUnlock) { - tprintf(DB_SYNC, "lock criticalData=%p for %s\n", _criticalData, ToString(_criticalData->_parent).c_str()); + tprintf(DB_SYNC, "locking criticalData=%p for %s..\n", _criticalData, ToString(_criticalData->_parent).c_str()); _criticalData->_mutex.lock(); }; ~LockedAccessor() { if (_autoUnlock) { - tprintf(DB_SYNC, "auto-unlock criticalData=%p for %s\n", _criticalData, ToString(_criticalData->_parent).c_str()); + tprintf(DB_SYNC, "auto-unlocking criticalData=%p for %s...\n", _criticalData, ToString(_criticalData->_parent).c_str()); _criticalData->_mutex.unlock(); } } void unlock() { - tprintf(DB_SYNC, "unlock criticalData=%p for %s\n", _criticalData, ToString(_criticalData->_parent).c_str()); + tprintf(DB_SYNC, "unlocking criticalData=%p for %s...\n", _criticalData, ToString(_criticalData->_parent).c_str()); _criticalData->_mutex.unlock(); } @@ -416,13 +416,13 @@ public: ihipStreamCriticalBase_t * mlock() { LockedBase::lock(); return this;}; void munlock() { - tprintf(DB_SYNC, "munlock criticalData=%p for %s\n", this, ToString(this->_parent).c_str()); + tprintf(DB_SYNC, "munlocking criticalData=%p for %s...\n", this, ToString(this->_parent).c_str()); LockedBase::unlock(); }; ihipStreamCriticalBase_t * mtry_lock() { bool gotLock = LockedBase::try_lock() ; - tprintf(DB_SYNC, "mtry_lock=%d criticalData=%p for %s\n", gotLock, this, ToString(this->_parent).c_str()); + tprintf(DB_SYNC, "mtry_locking=%d criticalData=%p for %s...\n", gotLock, this, ToString(this->_parent).c_str()); return gotLock ? this: nullptr; }; @@ -476,7 +476,7 @@ public: void lockclose_postKernelCommand(const char *kernelName, hc::accelerator_view *av); - void locked_wait(bool assertQueueEmpty=false); + void locked_wait(); hc::accelerator_view* locked_getAv() { LockedAccessor_StreamCrit_t crit(_criticalData); return &(crit->_av); }; @@ -487,7 +487,7 @@ public: //--- // Use this if we already have the stream critical data mutex: - void wait(LockedAccessor_StreamCrit_t &crit, bool assertQueueEmpty=false); + void wait(LockedAccessor_StreamCrit_t &crit); void launchModuleKernel(hc::accelerator_view av, hsa_signal_t signal, uint32_t blockDimX, uint32_t blockDimY, uint32_t blockDimZ, @@ -502,6 +502,7 @@ public: const ihipDevice_t * getDevice() const; ihipCtx_t * getCtx() const; + void ensureHaveQueue(LockedAccessor_StreamCrit_t &streamCrit); public: //--- @@ -693,8 +694,7 @@ public: // Functions: void locked_syncDefaultStream(bool waitOnSelf); // Will allocate a queue and assign it to the needyStream: - hc::accelerator_view stealActiveQueue(LockedAccessor_CtxCrit_t &ctxCrit, - ihipStream_t *needyStream); + hc::accelerator_view stealActiveQueue(LockedAccessor_CtxCrit_t &ctxCrit, ihipStream_t *needyStream); hc::accelerator_view createOrStealQueue(LockedAccessor_CtxCrit_t &ctxCrit); ihipCtxCritical_t &criticalData() { return _criticalData; }; diff --git a/hipamd/src/hip_memory.cpp b/hipamd/src/hip_memory.cpp index 5bc77cf543..372d295b89 100644 --- a/hipamd/src/hip_memory.cpp +++ b/hipamd/src/hip_memory.cpp @@ -813,6 +813,8 @@ hipError_t hipMemsetAsync(void* dst, int value, size_t sizeBytes, hipStream_t s if (stream) { auto crit = stream->lockopen_preKernelCommand(); + stream->ensureHaveQueue(crit); + hc::completion_future cf ; if ((sizeBytes & 0x3) == 0) { @@ -863,6 +865,7 @@ hipError_t hipMemset(void* dst, int value, size_t sizeBytes ) if (stream) { auto crit = stream->lockopen_preKernelCommand(); + stream->ensureHaveQueue(crit); hc::completion_future cf ; if ((sizeBytes & 0x3) == 0) { diff --git a/hipamd/src/hip_stream.cpp b/hipamd/src/hip_stream.cpp index 8641f72265..aae412160f 100644 --- a/hipamd/src/hip_stream.cpp +++ b/hipamd/src/hip_stream.cpp @@ -48,6 +48,7 @@ hipError_t ihipStreamCreate(hipStream_t *stream, unsigned int flags) { // Obtain mutex access to the device critical data, release by destructor LockedAccessor_CtxCrit_t ctxCrit(ctx->criticalData()); + auto istream = new ihipStream_t(ctx, ctx->createOrStealQueue(ctxCrit), flags); ctxCrit->addStream(istream); @@ -124,8 +125,14 @@ hipError_t hipStreamQuery(hipStream_t stream) stream = device->_defaultStream; } - LockedAccessor_StreamCrit_t crit(stream->_criticalData); - int pendingOps = crit->_av.get_pending_async_ops(); + int pendingOps = 0; + + { + LockedAccessor_StreamCrit_t crit(stream->_criticalData); + if (crit->_hasQueue) { + pendingOps = crit->_av.get_pending_async_ops(); + } + } hipError_t e = (pendingOps > 0) ? hipErrorNotReady : hipSuccess;