From 630ef59d7bebcf2efadc925dd041aa3cd437ad0d Mon Sep 17 00:00:00 2001 From: Ben Sander Date: Mon, 28 Mar 2016 21:41:47 -0500 Subject: [PATCH] Tweak thread-safe implementation. introduce LockedAccessor option so destructor does not unlock. Allows locks to exist across function boundaries, required for hipLaunchKernel macro which has several unusual requirements. (including C comppatibility, must use variadic macro, more). [ROCm/hip commit: 86358637242b6acad0fae778053f3e05d807d8ec] --- projects/hip/include/hcc_detail/hip_hcc.h | 43 ++++++++----- projects/hip/include/hip_runtime_api.h | 2 +- projects/hip/src/hip_hcc.cpp | 73 ++++++++++++----------- projects/hip/src/hip_memory.cpp | 4 +- 4 files changed, 67 insertions(+), 55 deletions(-) diff --git a/projects/hip/include/hcc_detail/hip_hcc.h b/projects/hip/include/hcc_detail/hip_hcc.h index a64cdba6ef..ee510efe47 100644 --- a/projects/hip/include/hcc_detail/hip_hcc.h +++ b/projects/hip/include/hcc_detail/hip_hcc.h @@ -295,14 +295,24 @@ template class LockedAccessor { public: - LockedAccessor(T &criticalData) : _criticalData(&criticalData) + LockedAccessor(T &criticalData, bool autoUnlock=true) : + _criticalData(&criticalData), + _autoUnlock(autoUnlock) + { _criticalData->_mutex.lock(); }; ~LockedAccessor() { - _criticalData->_mutex.unlock(); + if (_autoUnlock) { + _criticalData->_mutex.unlock(); + } + } + + void unlock() + { + _criticalData->_mutex.unlock(); } // Syntactic sugar so -> can be used to get the underlying type. @@ -310,6 +320,7 @@ public: private: T *_criticalData; + bool _autoUnlock; }; @@ -368,7 +379,7 @@ public: typedef ihipStreamCriticalBase_t ihipStreamCritical_t; -typedef LockedAccessor Locked_ihipStreamCritical_t; +typedef LockedAccessor LockedAccessor_StreamCrit_t; @@ -381,30 +392,30 @@ typedef uint64_t SeqNum_t ; ~ihipStream_t(); // kind is hipMemcpyKind - void copySync (Locked_ihipStreamCritical_t &crit, void* dst, const void* src, size_t sizeBytes, unsigned kind); + void copySync (LockedAccessor_StreamCrit_t &crit, void* dst, const void* src, size_t sizeBytes, unsigned kind); void locked_copySync (void* dst, const void* src, size_t sizeBytes, unsigned kind); void copyAsync(void* dst, const void* src, size_t sizeBytes, unsigned kind); //--- // Thread-safe accessors - these acquire / release mutex: - bool preKernelCommand(); - void postKernelCommand(hc::completion_future &kernel_future); + bool lockopen_preKernelCommand(); + void lockclose_postKernelCommand(hc::completion_future &kernel_future); - int preCopyCommand(Locked_ihipStreamCritical_t &crit, ihipSignal_t *lastCopy, hsa_signal_t *waitSignal, ihipCommand_t copyType); + int preCopyCommand(LockedAccessor_StreamCrit_t &crit, ihipSignal_t *lastCopy, hsa_signal_t *waitSignal, ihipCommand_t copyType); void locked_reclaimSignals(SIGSEQNUM sigNum); void locked_wait(bool assertQueueEmpty=false); + SIGSEQNUM locked_lastCopySeqId() {LockedAccessor_StreamCrit_t crit(_criticalData); return lastCopySeqId(crit); }; // Use this if we already have the stream critical data mutex: - void wait(Locked_ihipStreamCritical_t &crit, bool assertQueueEmpty=false); + void wait(LockedAccessor_StreamCrit_t &crit, bool assertQueueEmpty=false); - SIGSEQNUM locked_lastCopySeqId() {Locked_ihipStreamCritical_t crit(_criticalData); return lastCopySeqId(crit); }; // Non-threadsafe accessors - must be protected by high-level stream lock with accessor passed to function. - SIGSEQNUM lastCopySeqId(Locked_ihipStreamCritical_t &crit) { return crit->_last_copy_signal ? crit->_last_copy_signal->_sig_id : 0; }; - ihipSignal_t * allocSignal(Locked_ihipStreamCritical_t &crit); + SIGSEQNUM lastCopySeqId (LockedAccessor_StreamCrit_t &crit) { return crit->_last_copy_signal ? crit->_last_copy_signal->_sig_id : 0; }; + ihipSignal_t * allocSignal (LockedAccessor_StreamCrit_t &crit); //-- Non-racy accessors: @@ -412,19 +423,19 @@ typedef uint64_t SeqNum_t ; ihipDevice_t * getDevice() const; - +public: //--- - //Member vars - these are set at initialization: + //Public member vars - these are set at initialization and never change: SeqNum_t _id; // monotonic sequence ID hc::accelerator_view _av; unsigned _flags; -private: +private: // Critical Data. THis MUST be accessed through LockedAccessor_StreamCrit_t ihipStreamCritical_t _criticalData; private: void enqueueBarrier(hsa_queue_t* queue, ihipSignal_t *depSignal); - void waitCopy(Locked_ihipStreamCritical_t &crit, ihipSignal_t *signal); + void waitCopy(LockedAccessor_StreamCrit_t &crit, ihipSignal_t *signal); // The unsigned return is hipMemcpyKind unsigned resolveMemcpyDirection(bool srcInDeviceMem, bool dstInDeviceMem); @@ -502,7 +513,7 @@ private: typedef ihipDeviceCriticalBase_t ihipDeviceCritical_t; // This type is used by functions that need access to the critical device structures. -typedef LockedAccessor Locked_ihipDeviceCritical_t; +typedef LockedAccessor LockedAccessor_DeviceCrit_t; diff --git a/projects/hip/include/hip_runtime_api.h b/projects/hip/include/hip_runtime_api.h index cd0d841d97..3a0a4b399a 100644 --- a/projects/hip/include/hip_runtime_api.h +++ b/projects/hip/include/hip_runtime_api.h @@ -222,7 +222,7 @@ static inline hipError_t hipMalloc ( T** devPtr, size_t size) // Provide an override to automatically typecast the pointer type from void**, and also provide a default for the flags. template -static inline hipError_t hipHostMalloc( T** ptr, size_t size, unsigned int flags = 0) +static inline hipError_t hipHostMalloc( T** ptr, size_t size, unsigned int flags = hipHostMallocDefault) { return hipHostMalloc((void**)ptr, size, flags); } diff --git a/projects/hip/src/hip_hcc.cpp b/projects/hip/src/hip_hcc.cpp index fccec004c9..cf1cb3953a 100644 --- a/projects/hip/src/hip_hcc.cpp +++ b/projects/hip/src/hip_hcc.cpp @@ -144,7 +144,7 @@ ihipStream_t::~ihipStream_t() //--- void ihipStream_t::locked_reclaimSignals(SIGSEQNUM sigNum) { - Locked_ihipStreamCritical_t crit(_criticalData); + LockedAccessor_StreamCrit_t crit(_criticalData); tprintf(DB_SIGNAL, "reclaim signal #%lu\n", sigNum); // Mark all signals older and including this one as available for re-allocation. @@ -153,7 +153,7 @@ void ihipStream_t::locked_reclaimSignals(SIGSEQNUM sigNum) //--- -void ihipStream_t::waitCopy(Locked_ihipStreamCritical_t &crit, ihipSignal_t *signal) +void ihipStream_t::waitCopy(LockedAccessor_StreamCrit_t &crit, ihipSignal_t *signal) { hsa_signal_wait_acquire(signal->_hsa_signal, HSA_SIGNAL_CONDITION_LT, 1, UINT64_MAX, HSA_WAIT_STATE_ACTIVE); @@ -169,7 +169,7 @@ void ihipStream_t::waitCopy(Locked_ihipStreamCritical_t &crit, ihipSignal_t *sig //Wait for all kernel and data copy commands in this stream to complete. //This signature should be used in routines that already have locked the stream mutex -void ihipStream_t::wait(Locked_ihipStreamCritical_t &crit, bool assertQueueEmpty) +void ihipStream_t::wait(LockedAccessor_StreamCrit_t &crit, bool assertQueueEmpty) { if (! assertQueueEmpty) { tprintf (DB_SYNC, "stream %p wait for queue-empty..\n", this); @@ -190,7 +190,7 @@ void ihipStream_t::wait(Locked_ihipStreamCritical_t &crit, bool assertQueueEmpty //Wait for all kernel and data copy commands in this stream to complete. void ihipStream_t::locked_wait(bool assertQueueEmpty) { - Locked_ihipStreamCritical_t crit(_criticalData); + LockedAccessor_StreamCrit_t crit(_criticalData); wait(crit, assertQueueEmpty); @@ -213,7 +213,7 @@ ihipDevice_t * ihipStream_t::getDevice() const // Allocate a new signal from the signal pool. // Returned signals have value of 0. // Signals are intended for use in this stream and are always reclaimed "in-order". -ihipSignal_t *ihipStream_t::allocSignal(Locked_ihipStreamCritical_t &crit) +ihipSignal_t *ihipStream_t::allocSignal(LockedAccessor_StreamCrit_t &crit) { int numToScan = crit->_signalPool.size(); do { @@ -289,33 +289,32 @@ void ihipStream_t::enqueueBarrier(hsa_queue_t* queue, ihipSignal_t *depSignal) //into the stream to mimic CUDA stream semantics. (some hardware uses separate //queues for data commands and kernel commands, and no implicit ordering is provided). // -bool ihipStream_t::preKernelCommand() +bool ihipStream_t::lockopen_preKernelCommand() { - ihipStreamCritical_t *critData =_criticalData.mlock();// will be unlocked in postKernelCommand + LockedAccessor_StreamCrit_t crit(_criticalData, false/*no unlock at destruction*/); bool addedSync = false; // If switching command types, we need to add a barrier packet to synchronize things. - if (critData->_last_command_type != ihipCommandKernel) { - if (critData->_last_copy_signal) { + if (crit->_last_command_type != ihipCommandKernel) { + if (crit->_last_copy_signal) { addedSync = true; hsa_queue_t * q = (hsa_queue_t*)_av.get_hsa_queue(); if (HIP_DISABLE_HW_KERNEL_DEP == 0) { - this->enqueueBarrier(q, critData->_last_copy_signal); + this->enqueueBarrier(q, crit->_last_copy_signal); tprintf (DB_SYNC, "stream %p switch %s to %s (barrier pkt inserted with wait on #%lu)\n", - this, ihipCommandName[critData->_last_command_type], ihipCommandName[ihipCommandKernel], critData->_last_copy_signal->_sig_id) + this, ihipCommandName[crit->_last_command_type], ihipCommandName[ihipCommandKernel], crit->_last_copy_signal->_sig_id) } else if (HIP_DISABLE_HW_KERNEL_DEP>0) { tprintf (DB_SYNC, "stream %p switch %s to %s (HOST wait for previous...)\n", - this, ihipCommandName[critData->_last_command_type], ihipCommandName[ihipCommandKernel]); - assert(0); // Fix/enable next line. TODO - //this->waitCopy(critData, critData->_last_copy_signal); + this, ihipCommandName[crit->_last_command_type], ihipCommandName[ihipCommandKernel]); + this->waitCopy(crit, crit->_last_copy_signal); } else if (HIP_DISABLE_HW_KERNEL_DEP==-1) { tprintf (DB_SYNC, "stream %p switch %s to %s (IGNORE dependency)\n", - this, ihipCommandName[critData->_last_command_type], ihipCommandName[ihipCommandKernel]); + this, ihipCommandName[crit->_last_command_type], ihipCommandName[ihipCommandKernel]); } } - critData->_last_command_type = ihipCommandKernel; + crit->_last_command_type = ihipCommandKernel; } return addedSync; @@ -323,12 +322,13 @@ bool ihipStream_t::preKernelCommand() //--- -void ihipStream_t::postKernelCommand(hc::completion_future &kernelFuture) +// Must be called after kernel finishes, this releases the lock on the stream so other commands can submit. +void ihipStream_t::lockclose_postKernelCommand(hc::completion_future &kernelFuture) { - // We locked _criticalData in the preKernelCommand() so OK to access here: + // We locked _criticalData in the lockopen_preKernelCommand() so OK to access here: _criticalData._last_kernel_future = kernelFuture; - _criticalData.unlock(); // paired with lock from preKernelCommand + _criticalData.unlock(); // paired with lock from lockopen_preKernelCommand. }; @@ -336,7 +336,7 @@ void ihipStream_t::postKernelCommand(hc::completion_future &kernelFuture) //--- // Called whenever a copy command is set to the stream. // Examines the last command sent to this stream and returns a signal to wait on, if required. -int ihipStream_t::preCopyCommand(Locked_ihipStreamCritical_t &crit, ihipSignal_t *lastCopy, hsa_signal_t *waitSignal, ihipCommand_t copyType) +int ihipStream_t::preCopyCommand(LockedAccessor_StreamCrit_t &crit, ihipSignal_t *lastCopy, hsa_signal_t *waitSignal, ihipCommand_t copyType) { int needSync = 0; @@ -395,10 +395,10 @@ int ihipStream_t::preCopyCommand(Locked_ihipStreamCritical_t &crit, ihipSignal_t void ihipDevice_t::locked_reset() { // Obtain mutex access to the device critical data, release by destructor - Locked_ihipDeviceCritical_t l(_criticalData); + LockedAccessor_DeviceCrit_t crit(_criticalData); // Reset and remove streams: - l->streams().clear(); + crit->streams().clear(); // Reset and release all memory stored in the tracker: am_memtracker_reset(_acc); @@ -685,11 +685,11 @@ hipError_t ihipDevice_t::getProperties(hipDeviceProp_t* prop) // If waitOnSelf is set, this additionally waits for the default stream to empty. void ihipDevice_t::locked_syncDefaultStream(bool waitOnSelf) { - Locked_ihipDeviceCritical_t l(_criticalData); + LockedAccessor_DeviceCrit_t crit(_criticalData); tprintf(DB_SYNC, "syncDefaultStream\n"); - for (auto streamI=l->const_streams().begin(); streamI!=l->const_streams().end(); streamI++) { + for (auto streamI=crit->const_streams().begin(); streamI!=crit->const_streams().end(); streamI++) { ihipStream_t *stream = *streamI; // Don't wait for streams that have "opted-out" of syncing with NULL stream. @@ -708,18 +708,18 @@ void ihipDevice_t::locked_syncDefaultStream(bool waitOnSelf) //--- void ihipDevice_t::locked_addStream(ihipStream_t *s) { - Locked_ihipDeviceCritical_t l(_criticalData); + LockedAccessor_DeviceCrit_t crit(_criticalData); - l->streams().push_back(s); - s->_id = l->incStreamId(); + crit->streams().push_back(s); + s->_id = crit->incStreamId(); } //--- void ihipDevice_t::locked_removeStream(ihipStream_t *s) { - Locked_ihipDeviceCritical_t l(_criticalData); + LockedAccessor_DeviceCrit_t crit(_criticalData); - l->streams().remove(s); + crit->streams().remove(s); } @@ -727,10 +727,10 @@ void ihipDevice_t::locked_removeStream(ihipStream_t *s) //Heavyweight synchronization that waits on all streams, ignoring hipStreamNonBlocking flag. void ihipDevice_t::locked_waitAllStreams() { - Locked_ihipDeviceCritical_t l(_criticalData); + LockedAccessor_DeviceCrit_t crit(_criticalData); tprintf(DB_SYNC, "waitAllStream\n"); - for (auto streamI=l->const_streams().begin(); streamI!=l->const_streams().end(); streamI++) { + for (auto streamI=crit->const_streams().begin(); streamI!=crit->const_streams().end(); streamI++) { (*streamI)->locked_wait(); } } @@ -990,7 +990,8 @@ hipStream_t ihipPreLaunchKernel(hipStream_t stream, hc::accelerator_view **av) std::call_once(hip_initialized, ihipInit); stream = ihipSyncAndResolveStream(stream); - stream->preKernelCommand(); + + stream->lockopen_preKernelCommand(); *av = &stream->_av; @@ -1002,7 +1003,7 @@ hipStream_t ihipPreLaunchKernel(hipStream_t stream, hc::accelerator_view **av) //Called after kernel finishes execution. void ihipPostLaunchKernel(hipStream_t stream, hc::completion_future &kernelFuture) { - stream->postKernelCommand(kernelFuture); + stream->lockclose_postKernelCommand(kernelFuture); if (HIP_LAUNCH_BLOCKING) { tprintf(DB_SYNC, " stream:%p LAUNCH_BLOCKING for kernel completion\n", stream); } @@ -1106,7 +1107,7 @@ void ihipStream_t::setCopyAgents(unsigned kind, ihipCommand_t *commandType, hsa_ } -void ihipStream_t::copySync(Locked_ihipStreamCritical_t &crit, void* dst, const void* src, size_t sizeBytes, unsigned kind) +void ihipStream_t::copySync(LockedAccessor_StreamCrit_t &crit, void* dst, const void* src, size_t sizeBytes, unsigned kind) { ihipDevice_t *device = this->getDevice(); @@ -1213,7 +1214,7 @@ void ihipStream_t::copySync(Locked_ihipStreamCritical_t &crit, void* dst, const // Sync copy that acquires lock: void ihipStream_t::locked_copySync(void* dst, const void* src, size_t sizeBytes, unsigned kind) { - Locked_ihipStreamCritical_t crit (_criticalData); + LockedAccessor_StreamCrit_t crit (_criticalData); copySync(crit, dst, src, sizeBytes, kind); } @@ -1221,7 +1222,7 @@ void ihipStream_t::locked_copySync(void* dst, const void* src, size_t sizeBytes, void ihipStream_t::copyAsync(void* dst, const void* src, size_t sizeBytes, unsigned kind) { - Locked_ihipStreamCritical_t crit(_criticalData); + LockedAccessor_StreamCrit_t crit(_criticalData); ihipDevice_t *device = this->getDevice(); diff --git a/projects/hip/src/hip_memory.cpp b/projects/hip/src/hip_memory.cpp index 12ad9451da..4d75a1d7a1 100644 --- a/projects/hip/src/hip_memory.cpp +++ b/projects/hip/src/hip_memory.cpp @@ -363,9 +363,9 @@ hipError_t hipMemsetAsync(void* dst, int value, size_t sizeBytes, hipStream_t s hipError_t e = hipSuccess; stream = ihipSyncAndResolveStream(stream); - stream->preKernelCommand(); if (stream) { + stream->lockopen_preKernelCommand(); hc::completion_future cf ; @@ -389,7 +389,7 @@ hipError_t hipMemsetAsync(void* dst, int value, size_t sizeBytes, hipStream_t s } } - stream->postKernelCommand(cf); + stream->lockclose_postKernelCommand(cf); if (HIP_LAUNCH_BLOCKING) {