Tweak thread-safe implementation.

introduce LockedAccessor option so destructor does not unlock.
Allows locks to exist across function boundaries, required
for hipLaunchKernel macro which has several unusual requirements.
(including C comppatibility, must use variadic macro, more).


[ROCm/hip commit: 8635863724]
This commit is contained in:
Ben Sander
2016-03-28 21:41:47 -05:00
والد 9e3ac64c54
کامیت 630ef59d7b
4فایلهای تغییر یافته به همراه67 افزوده شده و 55 حذف شده
@@ -295,14 +295,24 @@ template<typename T>
class LockedAccessor
{
public:
LockedAccessor(T &criticalData) : _criticalData(&criticalData)
LockedAccessor(T &criticalData, bool autoUnlock=true) :
_criticalData(&criticalData),
_autoUnlock(autoUnlock)
{
_criticalData->_mutex.lock();
};
~LockedAccessor()
{
_criticalData->_mutex.unlock();
if (_autoUnlock) {
_criticalData->_mutex.unlock();
}
}
void unlock()
{
_criticalData->_mutex.unlock();
}
// Syntactic sugar so -> can be used to get the underlying type.
@@ -310,6 +320,7 @@ public:
private:
T *_criticalData;
bool _autoUnlock;
};
@@ -368,7 +379,7 @@ public:
typedef ihipStreamCriticalBase_t<StreamMutex> ihipStreamCritical_t;
typedef LockedAccessor<ihipStreamCritical_t> Locked_ihipStreamCritical_t;
typedef LockedAccessor<ihipStreamCritical_t> LockedAccessor_StreamCrit_t;
@@ -381,30 +392,30 @@ typedef uint64_t SeqNum_t ;
~ihipStream_t();
// kind is hipMemcpyKind
void copySync (Locked_ihipStreamCritical_t &crit, void* dst, const void* src, size_t sizeBytes, unsigned kind);
void copySync (LockedAccessor_StreamCrit_t &crit, void* dst, const void* src, size_t sizeBytes, unsigned kind);
void locked_copySync (void* dst, const void* src, size_t sizeBytes, unsigned kind);
void copyAsync(void* dst, const void* src, size_t sizeBytes, unsigned kind);
//---
// Thread-safe accessors - these acquire / release mutex:
bool preKernelCommand();
void postKernelCommand(hc::completion_future &kernel_future);
bool lockopen_preKernelCommand();
void lockclose_postKernelCommand(hc::completion_future &kernel_future);
int preCopyCommand(Locked_ihipStreamCritical_t &crit, ihipSignal_t *lastCopy, hsa_signal_t *waitSignal, ihipCommand_t copyType);
int preCopyCommand(LockedAccessor_StreamCrit_t &crit, ihipSignal_t *lastCopy, hsa_signal_t *waitSignal, ihipCommand_t copyType);
void locked_reclaimSignals(SIGSEQNUM sigNum);
void locked_wait(bool assertQueueEmpty=false);
SIGSEQNUM locked_lastCopySeqId() {LockedAccessor_StreamCrit_t crit(_criticalData); return lastCopySeqId(crit); };
// Use this if we already have the stream critical data mutex:
void wait(Locked_ihipStreamCritical_t &crit, bool assertQueueEmpty=false);
void wait(LockedAccessor_StreamCrit_t &crit, bool assertQueueEmpty=false);
SIGSEQNUM locked_lastCopySeqId() {Locked_ihipStreamCritical_t crit(_criticalData); return lastCopySeqId(crit); };
// Non-threadsafe accessors - must be protected by high-level stream lock with accessor passed to function.
SIGSEQNUM lastCopySeqId(Locked_ihipStreamCritical_t &crit) { return crit->_last_copy_signal ? crit->_last_copy_signal->_sig_id : 0; };
ihipSignal_t * allocSignal(Locked_ihipStreamCritical_t &crit);
SIGSEQNUM lastCopySeqId (LockedAccessor_StreamCrit_t &crit) { return crit->_last_copy_signal ? crit->_last_copy_signal->_sig_id : 0; };
ihipSignal_t * allocSignal (LockedAccessor_StreamCrit_t &crit);
//-- Non-racy accessors:
@@ -412,19 +423,19 @@ typedef uint64_t SeqNum_t ;
ihipDevice_t * getDevice() const;
public:
//---
//Member vars - these are set at initialization:
//Public member vars - these are set at initialization and never change:
SeqNum_t _id; // monotonic sequence ID
hc::accelerator_view _av;
unsigned _flags;
private:
private: // Critical Data. THis MUST be accessed through LockedAccessor_StreamCrit_t
ihipStreamCritical_t _criticalData;
private:
void enqueueBarrier(hsa_queue_t* queue, ihipSignal_t *depSignal);
void waitCopy(Locked_ihipStreamCritical_t &crit, ihipSignal_t *signal);
void waitCopy(LockedAccessor_StreamCrit_t &crit, ihipSignal_t *signal);
// The unsigned return is hipMemcpyKind
unsigned resolveMemcpyDirection(bool srcInDeviceMem, bool dstInDeviceMem);
@@ -502,7 +513,7 @@ private:
typedef ihipDeviceCriticalBase_t<DeviceMutex> ihipDeviceCritical_t;
// This type is used by functions that need access to the critical device structures.
typedef LockedAccessor<ihipDeviceCritical_t> Locked_ihipDeviceCritical_t;
typedef LockedAccessor<ihipDeviceCritical_t> LockedAccessor_DeviceCrit_t;
@@ -222,7 +222,7 @@ static inline hipError_t hipMalloc ( T** devPtr, size_t size)
// Provide an override to automatically typecast the pointer type from void**, and also provide a default for the flags.
template<class T>
static inline hipError_t hipHostMalloc( T** ptr, size_t size, unsigned int flags = 0)
static inline hipError_t hipHostMalloc( T** ptr, size_t size, unsigned int flags = hipHostMallocDefault)
{
return hipHostMalloc((void**)ptr, size, flags);
}
+37 -36
مشاهده پرونده
@@ -144,7 +144,7 @@ ihipStream_t::~ihipStream_t()
//---
void ihipStream_t::locked_reclaimSignals(SIGSEQNUM sigNum)
{
Locked_ihipStreamCritical_t crit(_criticalData);
LockedAccessor_StreamCrit_t crit(_criticalData);
tprintf(DB_SIGNAL, "reclaim signal #%lu\n", sigNum);
// Mark all signals older and including this one as available for re-allocation.
@@ -153,7 +153,7 @@ void ihipStream_t::locked_reclaimSignals(SIGSEQNUM sigNum)
//---
void ihipStream_t::waitCopy(Locked_ihipStreamCritical_t &crit, ihipSignal_t *signal)
void ihipStream_t::waitCopy(LockedAccessor_StreamCrit_t &crit, ihipSignal_t *signal)
{
hsa_signal_wait_acquire(signal->_hsa_signal, HSA_SIGNAL_CONDITION_LT, 1, UINT64_MAX, HSA_WAIT_STATE_ACTIVE);
@@ -169,7 +169,7 @@ void ihipStream_t::waitCopy(Locked_ihipStreamCritical_t &crit, ihipSignal_t *sig
//Wait for all kernel and data copy commands in this stream to complete.
//This signature should be used in routines that already have locked the stream mutex
void ihipStream_t::wait(Locked_ihipStreamCritical_t &crit, bool assertQueueEmpty)
void ihipStream_t::wait(LockedAccessor_StreamCrit_t &crit, bool assertQueueEmpty)
{
if (! assertQueueEmpty) {
tprintf (DB_SYNC, "stream %p wait for queue-empty..\n", this);
@@ -190,7 +190,7 @@ void ihipStream_t::wait(Locked_ihipStreamCritical_t &crit, bool assertQueueEmpty
//Wait for all kernel and data copy commands in this stream to complete.
void ihipStream_t::locked_wait(bool assertQueueEmpty)
{
Locked_ihipStreamCritical_t crit(_criticalData);
LockedAccessor_StreamCrit_t crit(_criticalData);
wait(crit, assertQueueEmpty);
@@ -213,7 +213,7 @@ ihipDevice_t * ihipStream_t::getDevice() const
// Allocate a new signal from the signal pool.
// Returned signals have value of 0.
// Signals are intended for use in this stream and are always reclaimed "in-order".
ihipSignal_t *ihipStream_t::allocSignal(Locked_ihipStreamCritical_t &crit)
ihipSignal_t *ihipStream_t::allocSignal(LockedAccessor_StreamCrit_t &crit)
{
int numToScan = crit->_signalPool.size();
do {
@@ -289,33 +289,32 @@ void ihipStream_t::enqueueBarrier(hsa_queue_t* queue, ihipSignal_t *depSignal)
//into the stream to mimic CUDA stream semantics. (some hardware uses separate
//queues for data commands and kernel commands, and no implicit ordering is provided).
//
bool ihipStream_t::preKernelCommand()
bool ihipStream_t::lockopen_preKernelCommand()
{
ihipStreamCritical_t *critData =_criticalData.mlock();// will be unlocked in postKernelCommand
LockedAccessor_StreamCrit_t crit(_criticalData, false/*no unlock at destruction*/);
bool addedSync = false;
// If switching command types, we need to add a barrier packet to synchronize things.
if (critData->_last_command_type != ihipCommandKernel) {
if (critData->_last_copy_signal) {
if (crit->_last_command_type != ihipCommandKernel) {
if (crit->_last_copy_signal) {
addedSync = true;
hsa_queue_t * q = (hsa_queue_t*)_av.get_hsa_queue();
if (HIP_DISABLE_HW_KERNEL_DEP == 0) {
this->enqueueBarrier(q, critData->_last_copy_signal);
this->enqueueBarrier(q, crit->_last_copy_signal);
tprintf (DB_SYNC, "stream %p switch %s to %s (barrier pkt inserted with wait on #%lu)\n",
this, ihipCommandName[critData->_last_command_type], ihipCommandName[ihipCommandKernel], critData->_last_copy_signal->_sig_id)
this, ihipCommandName[crit->_last_command_type], ihipCommandName[ihipCommandKernel], crit->_last_copy_signal->_sig_id)
} else if (HIP_DISABLE_HW_KERNEL_DEP>0) {
tprintf (DB_SYNC, "stream %p switch %s to %s (HOST wait for previous...)\n",
this, ihipCommandName[critData->_last_command_type], ihipCommandName[ihipCommandKernel]);
assert(0); // Fix/enable next line. TODO
//this->waitCopy(critData, critData->_last_copy_signal);
this, ihipCommandName[crit->_last_command_type], ihipCommandName[ihipCommandKernel]);
this->waitCopy(crit, crit->_last_copy_signal);
} else if (HIP_DISABLE_HW_KERNEL_DEP==-1) {
tprintf (DB_SYNC, "stream %p switch %s to %s (IGNORE dependency)\n",
this, ihipCommandName[critData->_last_command_type], ihipCommandName[ihipCommandKernel]);
this, ihipCommandName[crit->_last_command_type], ihipCommandName[ihipCommandKernel]);
}
}
critData->_last_command_type = ihipCommandKernel;
crit->_last_command_type = ihipCommandKernel;
}
return addedSync;
@@ -323,12 +322,13 @@ bool ihipStream_t::preKernelCommand()
//---
void ihipStream_t::postKernelCommand(hc::completion_future &kernelFuture)
// Must be called after kernel finishes, this releases the lock on the stream so other commands can submit.
void ihipStream_t::lockclose_postKernelCommand(hc::completion_future &kernelFuture)
{
// We locked _criticalData in the preKernelCommand() so OK to access here:
// We locked _criticalData in the lockopen_preKernelCommand() so OK to access here:
_criticalData._last_kernel_future = kernelFuture;
_criticalData.unlock(); // paired with lock from preKernelCommand
_criticalData.unlock(); // paired with lock from lockopen_preKernelCommand.
};
@@ -336,7 +336,7 @@ void ihipStream_t::postKernelCommand(hc::completion_future &kernelFuture)
//---
// Called whenever a copy command is set to the stream.
// Examines the last command sent to this stream and returns a signal to wait on, if required.
int ihipStream_t::preCopyCommand(Locked_ihipStreamCritical_t &crit, ihipSignal_t *lastCopy, hsa_signal_t *waitSignal, ihipCommand_t copyType)
int ihipStream_t::preCopyCommand(LockedAccessor_StreamCrit_t &crit, ihipSignal_t *lastCopy, hsa_signal_t *waitSignal, ihipCommand_t copyType)
{
int needSync = 0;
@@ -395,10 +395,10 @@ int ihipStream_t::preCopyCommand(Locked_ihipStreamCritical_t &crit, ihipSignal_t
void ihipDevice_t::locked_reset()
{
// Obtain mutex access to the device critical data, release by destructor
Locked_ihipDeviceCritical_t l(_criticalData);
LockedAccessor_DeviceCrit_t crit(_criticalData);
// Reset and remove streams:
l->streams().clear();
crit->streams().clear();
// Reset and release all memory stored in the tracker:
am_memtracker_reset(_acc);
@@ -685,11 +685,11 @@ hipError_t ihipDevice_t::getProperties(hipDeviceProp_t* prop)
// If waitOnSelf is set, this additionally waits for the default stream to empty.
void ihipDevice_t::locked_syncDefaultStream(bool waitOnSelf)
{
Locked_ihipDeviceCritical_t l(_criticalData);
LockedAccessor_DeviceCrit_t crit(_criticalData);
tprintf(DB_SYNC, "syncDefaultStream\n");
for (auto streamI=l->const_streams().begin(); streamI!=l->const_streams().end(); streamI++) {
for (auto streamI=crit->const_streams().begin(); streamI!=crit->const_streams().end(); streamI++) {
ihipStream_t *stream = *streamI;
// Don't wait for streams that have "opted-out" of syncing with NULL stream.
@@ -708,18 +708,18 @@ void ihipDevice_t::locked_syncDefaultStream(bool waitOnSelf)
//---
void ihipDevice_t::locked_addStream(ihipStream_t *s)
{
Locked_ihipDeviceCritical_t l(_criticalData);
LockedAccessor_DeviceCrit_t crit(_criticalData);
l->streams().push_back(s);
s->_id = l->incStreamId();
crit->streams().push_back(s);
s->_id = crit->incStreamId();
}
//---
void ihipDevice_t::locked_removeStream(ihipStream_t *s)
{
Locked_ihipDeviceCritical_t l(_criticalData);
LockedAccessor_DeviceCrit_t crit(_criticalData);
l->streams().remove(s);
crit->streams().remove(s);
}
@@ -727,10 +727,10 @@ void ihipDevice_t::locked_removeStream(ihipStream_t *s)
//Heavyweight synchronization that waits on all streams, ignoring hipStreamNonBlocking flag.
void ihipDevice_t::locked_waitAllStreams()
{
Locked_ihipDeviceCritical_t l(_criticalData);
LockedAccessor_DeviceCrit_t crit(_criticalData);
tprintf(DB_SYNC, "waitAllStream\n");
for (auto streamI=l->const_streams().begin(); streamI!=l->const_streams().end(); streamI++) {
for (auto streamI=crit->const_streams().begin(); streamI!=crit->const_streams().end(); streamI++) {
(*streamI)->locked_wait();
}
}
@@ -990,7 +990,8 @@ hipStream_t ihipPreLaunchKernel(hipStream_t stream, hc::accelerator_view **av)
std::call_once(hip_initialized, ihipInit);
stream = ihipSyncAndResolveStream(stream);
stream->preKernelCommand();
stream->lockopen_preKernelCommand();
*av = &stream->_av;
@@ -1002,7 +1003,7 @@ hipStream_t ihipPreLaunchKernel(hipStream_t stream, hc::accelerator_view **av)
//Called after kernel finishes execution.
void ihipPostLaunchKernel(hipStream_t stream, hc::completion_future &kernelFuture)
{
stream->postKernelCommand(kernelFuture);
stream->lockclose_postKernelCommand(kernelFuture);
if (HIP_LAUNCH_BLOCKING) {
tprintf(DB_SYNC, " stream:%p LAUNCH_BLOCKING for kernel completion\n", stream);
}
@@ -1106,7 +1107,7 @@ void ihipStream_t::setCopyAgents(unsigned kind, ihipCommand_t *commandType, hsa_
}
void ihipStream_t::copySync(Locked_ihipStreamCritical_t &crit, void* dst, const void* src, size_t sizeBytes, unsigned kind)
void ihipStream_t::copySync(LockedAccessor_StreamCrit_t &crit, void* dst, const void* src, size_t sizeBytes, unsigned kind)
{
ihipDevice_t *device = this->getDevice();
@@ -1213,7 +1214,7 @@ void ihipStream_t::copySync(Locked_ihipStreamCritical_t &crit, void* dst, const
// Sync copy that acquires lock:
void ihipStream_t::locked_copySync(void* dst, const void* src, size_t sizeBytes, unsigned kind)
{
Locked_ihipStreamCritical_t crit (_criticalData);
LockedAccessor_StreamCrit_t crit (_criticalData);
copySync(crit, dst, src, sizeBytes, kind);
}
@@ -1221,7 +1222,7 @@ void ihipStream_t::locked_copySync(void* dst, const void* src, size_t sizeBytes,
void ihipStream_t::copyAsync(void* dst, const void* src, size_t sizeBytes, unsigned kind)
{
Locked_ihipStreamCritical_t crit(_criticalData);
LockedAccessor_StreamCrit_t crit(_criticalData);
ihipDevice_t *device = this->getDevice();
@@ -363,9 +363,9 @@ hipError_t hipMemsetAsync(void* dst, int value, size_t sizeBytes, hipStream_t s
hipError_t e = hipSuccess;
stream = ihipSyncAndResolveStream(stream);
stream->preKernelCommand();
if (stream) {
stream->lockopen_preKernelCommand();
hc::completion_future cf ;
@@ -389,7 +389,7 @@ hipError_t hipMemsetAsync(void* dst, int value, size_t sizeBytes, hipStream_t s
}
}
stream->postKernelCommand(cf);
stream->lockclose_postKernelCommand(cf);
if (HIP_LAUNCH_BLOCKING) {