Add HIP_MAX_QUEUES feature.

Includes some tricky manipulation of the locks for contexts and streams.
issue is that stealing a stream requires we lock the context to
walk the streams to find a victim.  To avoid deadlock, we can't
have a stream locked when we lock the context.  This implementation
releases the stream lock, then acquires the context and selects the
victim.
A more stable implemenation might be to copy the stream list
from a context so that a lock is not required to walk all streams.
Smart shared_ptr could be used to prevent the streams from being
deallocated during the walk.
Tá an tiomantas seo le fáil i:
Ben Sander
2017-01-09 17:19:40 -06:00
tuismitheoir c9f5fe34e6
tiomantas b29fbf736d
D'athraigh 4 comhad le 76 breiseanna agus 34 scriosta
+55 -23
Féach ar an gComhad
@@ -265,11 +265,37 @@ ihipStream_t::~ihipStream_t()
}
inline void ihipStream_t::ensureHaveQueue(LockedAccessor_StreamCrit_t &streamCrit)
{
if (HIP_MAX_QUEUES && !streamCrit->_hasQueue) {
// To avoid deadlock, we have to release the stream lock before acquiring context lock.
// Else we can get hung if another thread has the context lock is trying to get lock for this stream.
// We lock it again below.
streamCrit->munlock();
// Obtain mutex access to the device critical data, release by destructor
LockedAccessor_CtxCrit_t ctxCrit(this->_ctx->criticalData());
// TODO
auto needyCritPtr = this->_criticalData.mlock();
// Second test to ensure we still need to steal the queue - another thread may have
// snuck in here and already solved the issue.
if (!needyCritPtr->_hasQueue) {
needyCritPtr->_av = this->_ctx->stealActiveQueue(ctxCrit, this);
}
streamCrit->_hasQueue = true;
}
assert(streamCrit->_hasQueue);
}
//Wait for all kernel and data copy commands in this stream to complete.
//This signature should be used in routines that already have locked the stream mutex
void ihipStream_t::wait(LockedAccessor_StreamCrit_t &crit, bool assertQueueEmpty)
void ihipStream_t::wait(LockedAccessor_StreamCrit_t &crit)
{
if (! assertQueueEmpty) {
if (crit->_hasQueue) {
tprintf (DB_SYNC, "%s wait for queue-empty..\n", ToString(this).c_str());
hc::hcWaitMode waitMode = hc::hcWaitModeActive;
@@ -294,6 +320,8 @@ void ihipStream_t::wait(LockedAccessor_StreamCrit_t &crit, bool assertQueueEmpty
}
crit->_av.wait(waitMode);
} else {
tprintf (DB_SYNC, "%s wait for queue empty (done since stream has no physical queue).\n", ToString(this).c_str());
}
crit->_kernelCnt = 0;
@@ -301,11 +329,11 @@ void ihipStream_t::wait(LockedAccessor_StreamCrit_t &crit, bool assertQueueEmpty
//---
//Wait for all kernel and data copy commands in this stream to complete.
void ihipStream_t::locked_wait(bool assertQueueEmpty)
void ihipStream_t::locked_wait()
{
LockedAccessor_StreamCrit_t crit(_criticalData);
wait(crit, assertQueueEmpty);
wait(crit);
};
@@ -314,6 +342,8 @@ void ihipStream_t::locked_waitEvent(hipEvent_t event)
{
LockedAccessor_StreamCrit_t crit(_criticalData);
this->ensureHaveQueue(crit);
crit->_av.create_blocking_marker(event->_marker);
}
@@ -324,6 +354,7 @@ void ihipStream_t::locked_recordEvent(hipEvent_t event)
// Lock the stream to prevent simultaneous access
LockedAccessor_StreamCrit_t crit(_criticalData);
this->ensureHaveQueue(crit);
event->_marker = crit->_av.create_marker();
}
@@ -361,19 +392,11 @@ LockedAccessor_StreamCrit_t ihipStream_t::lockopen_preKernelCommand()
this->wait(crit);
crit->_kernelCnt = 0;
}
crit->_kernelCnt++;
if (HIP_MAX_QUEUES && !crit->_hasQueue) {
// Obtain mutex access to the device critical data, release by destructor
LockedAccessor_CtxCrit_t ctxCrit(this->_ctx->criticalData());
crit->_av = this->_ctx->stealActiveQueue(ctxCrit, this);
crit->_hasQueue = true;
}
this->ensureHaveQueue(crit);
assert(crit->_hasQueue);
return crit;
}
@@ -896,16 +919,18 @@ std::string ihipCtx_t::toString() const
};
hc::accelerator_view
ihipCtx_t::stealActiveQueue(LockedAccessor_CtxCrit_t &ctxCrit, ihipStream_t *needyStream )
hc::accelerator_view
ihipCtx_t::stealActiveQueue(LockedAccessor_CtxCrit_t &ctxCrit, ihipStream_t *needyStream)
{
// TODO - review handling if queue can't be found.
while (1) {
for (auto iter=ctxCrit->streams().begin(); iter != ctxCrit->streams().end(); iter++) {
if (*iter != needyStream) {
auto victimCritPtr = (*iter)->_criticalData.mtry_lock();
if (victimCritPtr) {
// try-lock succeeded:
if (victimCritPtr->_hasQueue && (victimCritPtr->_kernelCnt == 0)) {
victimCritPtr->_hasQueue = false;
@@ -913,16 +938,16 @@ ihipCtx_t::stealActiveQueue(LockedAccessor_CtxCrit_t &ctxCrit, ihipStream_t *nee
tprintf(DB_SYNC, " stealActiveQueue from victim:%s to needy:%s\n",
ToString(*iter).c_str(), ToString(needyStream).c_str());
hc::accelerator_view av = victimCritPtr->_av;
// TODO - cleanup to remove forced setting to N
hc::accelerator_view av = victimCritPtr->_av;
uint64_t *p = (uint64_t*)(&victimCritPtr->_av);
*p = 0; // damage the victim av so attempt to use it will fault.
(*iter)->_criticalData.munlock();
return av;
} else {
(*iter)->_criticalData.munlock();
}
return av;
}
(*iter)->_criticalData.munlock();
}
}
}
@@ -935,7 +960,8 @@ ihipCtx_t::createOrStealQueue(LockedAccessor_CtxCrit_t &ctxCrit)
{
if (HIP_MAX_QUEUES && (ctxCrit->streams().size() >= HIP_MAX_QUEUES)) {
// Steal a queue from an existing stream:
return this->stealActiveQueue (ctxCrit, nullptr);
hc::accelerator_view av = this->stealActiveQueue (ctxCrit, nullptr);
return av;
} else {
// Create a new view
return getWriteableDevice()->_acc.create_view();
@@ -1838,6 +1864,7 @@ void ihipStream_t::locked_copySync(void* dst, const void* src, size_t sizeBytes,
src, srcPtrInfo._hostPointer, srcPtrInfo._devicePointer, srcPtrInfo._sizeBytes,
srcPtrInfo._appId, srcTracked, srcPtrInfo._isInDeviceMem);
this->ensureHaveQueue(crit);
#if USE_COPY_EXT_V2
crit->_av.copy_ext(src, dst, sizeBytes, hcCopyDir, srcPtrInfo, dstPtrInfo, copyDevice ? &copyDevice->getDevice()->_acc : nullptr, forceUnpinnedCopy);
@@ -1902,6 +1929,8 @@ void ihipStream_t::locked_copyAsync(void* dst, const void* src, size_t sizeBytes
// Perform fast asynchronous copy - we know copyDevice != NULL based on check above
try {
this->ensureHaveQueue(crit);
if (HIP_FORCE_SYNC_COPY) {
#if USE_COPY_EXT_V2
crit->_av.copy_ext (src, dst, sizeBytes, hcCopyDir, srcPtrInfo, dstPtrInfo, &copyDevice->getDevice()->_acc, forceUnpinnedCopy);
@@ -1928,6 +1957,8 @@ void ihipStream_t::locked_copyAsync(void* dst, const void* src, size_t sizeBytes
} else {
LockedAccessor_StreamCrit_t crit(_criticalData);
this->ensureHaveQueue(crit);
#if USE_COPY_EXT_V2
crit->_av.copy_ext(src, dst, sizeBytes, hcCopyDir, srcPtrInfo, dstPtrInfo, copyDevice ? &copyDevice->getDevice()->_acc : nullptr, forceUnpinnedCopy);
#else
@@ -1985,6 +2016,7 @@ hipError_t hipHccGetAccelerator(int deviceId, hc::accelerator *acc)
//---
// Warning - with HIP_MAX_QUEUES!=0 there is no mechanism to prevent accelerator_view from being re-assigned...
hipError_t hipHccGetAcceleratorView(hipStream_t stream, hc::accelerator_view **av)
{
HIP_INIT_API(stream, av);
@@ -1994,7 +2026,7 @@ hipError_t hipHccGetAcceleratorView(hipStream_t stream, hc::accelerator_view **a
stream = device->_defaultStream;
}
*av = stream->locked_getAv();
*av = stream->locked_getAv(); // TODO - review.
hipError_t err = hipSuccess;
return ihipLogStatus(err);
+9 -9
Féach ar an gComhad
@@ -358,21 +358,21 @@ public:
_autoUnlock(autoUnlock)
{
tprintf(DB_SYNC, "lock criticalData=%p for %s\n", _criticalData, ToString(_criticalData->_parent).c_str());
tprintf(DB_SYNC, "locking criticalData=%p for %s..\n", _criticalData, ToString(_criticalData->_parent).c_str());
_criticalData->_mutex.lock();
};
~LockedAccessor()
{
if (_autoUnlock) {
tprintf(DB_SYNC, "auto-unlock criticalData=%p for %s\n", _criticalData, ToString(_criticalData->_parent).c_str());
tprintf(DB_SYNC, "auto-unlocking criticalData=%p for %s...\n", _criticalData, ToString(_criticalData->_parent).c_str());
_criticalData->_mutex.unlock();
}
}
void unlock()
{
tprintf(DB_SYNC, "unlock criticalData=%p for %s\n", _criticalData, ToString(_criticalData->_parent).c_str());
tprintf(DB_SYNC, "unlocking criticalData=%p for %s...\n", _criticalData, ToString(_criticalData->_parent).c_str());
_criticalData->_mutex.unlock();
}
@@ -416,13 +416,13 @@ public:
ihipStreamCriticalBase_t<StreamMutex> * mlock() { LockedBase<MUTEX_TYPE>::lock(); return this;};
void munlock() {
tprintf(DB_SYNC, "munlock criticalData=%p for %s\n", this, ToString(this->_parent).c_str());
tprintf(DB_SYNC, "munlocking criticalData=%p for %s...\n", this, ToString(this->_parent).c_str());
LockedBase<MUTEX_TYPE>::unlock();
};
ihipStreamCriticalBase_t<StreamMutex> * mtry_lock() {
bool gotLock = LockedBase<MUTEX_TYPE>::try_lock() ;
tprintf(DB_SYNC, "mtry_lock=%d criticalData=%p for %s\n", gotLock, this, ToString(this->_parent).c_str());
tprintf(DB_SYNC, "mtry_locking=%d criticalData=%p for %s...\n", gotLock, this, ToString(this->_parent).c_str());
return gotLock ? this: nullptr;
};
@@ -476,7 +476,7 @@ public:
void lockclose_postKernelCommand(const char *kernelName, hc::accelerator_view *av);
void locked_wait(bool assertQueueEmpty=false);
void locked_wait();
hc::accelerator_view* locked_getAv() { LockedAccessor_StreamCrit_t crit(_criticalData); return &(crit->_av); };
@@ -487,7 +487,7 @@ public:
//---
// Use this if we already have the stream critical data mutex:
void wait(LockedAccessor_StreamCrit_t &crit, bool assertQueueEmpty=false);
void wait(LockedAccessor_StreamCrit_t &crit);
void launchModuleKernel(hc::accelerator_view av, hsa_signal_t signal,
uint32_t blockDimX, uint32_t blockDimY, uint32_t blockDimZ,
@@ -502,6 +502,7 @@ public:
const ihipDevice_t * getDevice() const;
ihipCtx_t * getCtx() const;
void ensureHaveQueue(LockedAccessor_StreamCrit_t &streamCrit);
public:
//---
@@ -693,8 +694,7 @@ public: // Functions:
void locked_syncDefaultStream(bool waitOnSelf);
// Will allocate a queue and assign it to the needyStream:
hc::accelerator_view stealActiveQueue(LockedAccessor_CtxCrit_t &ctxCrit,
ihipStream_t *needyStream);
hc::accelerator_view stealActiveQueue(LockedAccessor_CtxCrit_t &ctxCrit, ihipStream_t *needyStream);
hc::accelerator_view createOrStealQueue(LockedAccessor_CtxCrit_t &ctxCrit);
ihipCtxCritical_t &criticalData() { return _criticalData; };
+3
Féach ar an gComhad
@@ -813,6 +813,8 @@ hipError_t hipMemsetAsync(void* dst, int value, size_t sizeBytes, hipStream_t s
if (stream) {
auto crit = stream->lockopen_preKernelCommand();
stream->ensureHaveQueue(crit);
hc::completion_future cf ;
if ((sizeBytes & 0x3) == 0) {
@@ -863,6 +865,7 @@ hipError_t hipMemset(void* dst, int value, size_t sizeBytes )
if (stream) {
auto crit = stream->lockopen_preKernelCommand();
stream->ensureHaveQueue(crit);
hc::completion_future cf ;
if ((sizeBytes & 0x3) == 0) {
+9 -2
Féach ar an gComhad
@@ -48,6 +48,7 @@ hipError_t ihipStreamCreate(hipStream_t *stream, unsigned int flags)
{
// Obtain mutex access to the device critical data, release by destructor
LockedAccessor_CtxCrit_t ctxCrit(ctx->criticalData());
auto istream = new ihipStream_t(ctx, ctx->createOrStealQueue(ctxCrit), flags);
ctxCrit->addStream(istream);
@@ -124,8 +125,14 @@ hipError_t hipStreamQuery(hipStream_t stream)
stream = device->_defaultStream;
}
LockedAccessor_StreamCrit_t crit(stream->_criticalData);
int pendingOps = crit->_av.get_pending_async_ops();
int pendingOps = 0;
{
LockedAccessor_StreamCrit_t crit(stream->_criticalData);
if (crit->_hasQueue) {
pendingOps = crit->_av.get_pending_async_ops();
}
}
hipError_t e = (pendingOps > 0) ? hipErrorNotReady : hipSuccess;