From 530ab9434aa9e1ea02fcc5f728c20ac75b1f6b4e Mon Sep 17 00:00:00 2001 From: Ben Sander Date: Sat, 26 Mar 2016 10:46:20 -0500 Subject: [PATCH] Make ihipDevice_t thread-safe. Move critical data into separate class and protect with LockAccessor wrapper class. For device, the streams list is the critical data since it is modified when streams are created or destroyed. The streams list is accessed in several places including when synchronizing across all streams on the device (ie from the default stream). Other device data is set once by the device cosntructor and is not critical so All functions which acquire the LockAccessor now named with "locked_" prefix. --- include/hcc_detail/hip_hcc.h | 88 +++++++++++++++++-- src/hip_device.cpp | 14 ++- src/hip_event.cpp | 4 +- src/hip_hcc.cpp | 54 +++++++++--- src/hip_memory.cpp | 2 +- src/hip_stream.cpp | 10 ++- tests/src/CMakeLists.txt | 9 +- tests/src/hipThreadSafeDevice.cpp | 137 ++++++++++++++++++++++++++++++ 8 files changed, 279 insertions(+), 39 deletions(-) create mode 100644 tests/src/hipThreadSafeDevice.cpp diff --git a/include/hcc_detail/hip_hcc.h b/include/hcc_detail/hip_hcc.h index 9000602ff1..409573d2dc 100644 --- a/include/hcc_detail/hip_hcc.h +++ b/include/hcc_detail/hip_hcc.h @@ -63,8 +63,8 @@ extern int HIP_DISABLE_HW_COPY_DEP; extern thread_local int tls_defaultDevice; extern thread_local hipError_t tls_lastHipError; -struct ihipStream_t; -struct ihipDevice_t; +class ihipStream_t; +class ihipDevice_t; // Color defs for debug messages: @@ -86,6 +86,9 @@ struct ihipDevice_t; // Stream functions will acquire a mutex before entering critical sections. #define STREAM_THREAD_SAFE 1 + +#define DEVICE_THREAD_SAFE 1 + // If FORCE_COPY_DEP=1 , HIP runtime will add // synchronization for copy commands in the same stream, regardless of command type. // If FORCE_COPY_DEP=0 data copies of the same kind (H2H, H2D, D2H, D2D) are assumed to be implicitly ordered. @@ -376,12 +379,76 @@ struct ihipEvent_t { } ; +//--- +// Protects access to the member _data with a lock acquired on contruction/destruction. +// T must contain a _mutex field which meets the BasicLockable requirements (lock/unlock) +template +class LockedAccessor +{ +public: + LockedAccessor(T &data) : _data(&data) + { + _data->_mutex.lock(); + }; + + ~LockedAccessor() + { + _data->_mutex.unlock(); + } + + // Syntactic sugar so -> can be used to get the underlying type. + T *operator->() { return _data; }; + +private: + T *_data; +}; + + + +//--- +// Data that must be protected with thread-safe access +// All members are private - this class must be accessed through friend LockedAccessor which +// will lock the mutex on construction and unlock on destruction. +// +// MUTEX_TYPE is template argument so can easily convert to FakeMutex for performance or stress testing. +template +class ihipDeviceCriticalBase_t +{ +public: + friend class LockedAccessor; + + std::list &streams() { return _streams; }; + const std::list &const_streams() const { return _streams; }; + +private: + std::list _streams; // streams associated with this device. + MUTEX_TYPE _mutex; +}; + +// Typedefs for common definitions. +#if DEVICE_THREAD_SAFE +typedef ihipDeviceCriticalBase_t ihipDeviceCritical_t; // Use real mutex +#else +#warning "Device thread-safe disabled" +typedef ihipDeviceCriticalBase_t ihipDeviceCritical_t; // Fake mutex, for testing +#endif + +typedef LockedAccessor Locked_ihipDeviceCritical_t; //------------------------------------------------------------------------------------------------- -struct ihipDevice_t +// Functions which read or write the critical data are named locked_. +// ihipDevice_t does not use recursive locks so the ihip implementation must avoid calling a locked_ function from within a locked_ function. +// External functions which call several locked_ functions will acquire and release the lock for each function. if this occurs in +// performance-sensitive code we may want to refactor by adding non-locked functions and creating a new locked_ member function to call them all. +class ihipDevice_t { +public: // Functions: + void locked_addStream(ihipStream_t *s); + void locked_removeStream(ihipStream_t *s); + +public: // Data, set at initialization: unsigned _device_index; // index into g_devices. hipDeviceProp_t _props; // saved device properties. @@ -392,7 +459,6 @@ struct ihipDevice_t // NULL has special synchronization properties with other streams. ihipStream_t *_default_stream; - std::list _streams; // streams associated with this device. unsigned _compute_units; @@ -403,20 +469,24 @@ struct ihipDevice_t unsigned _device_flags; public: - void init(unsigned device_index, hc::accelerator acc, unsigned flags); + void locked_init(unsigned device_index, hc::accelerator acc, unsigned flags); ~ihipDevice_t(); - void reset(); hipError_t getProperties(hipDeviceProp_t* prop); - void waitAllStreams(); - void syncDefaultStream(bool waitOnSelf); + void locked_reset(); + void locked_waitAllStreams(); + void locked_syncDefaultStream(bool waitOnSelf); private: + // Members of _protected data MUST be accessed through the LockedAccessor. + // Search for LockedAccessor for examples; do not access _criticalData directly. + ihipDeviceCritical_t _criticalData; }; -// Global initialization. + +// Global variable definition: extern std::once_flag hip_initialized; extern ihipDevice_t *g_devices; // Array of all non-emulated (ie GPU) accelerators in the system. extern bool g_visible_device; // Set the flag when HIP_VISIBLE_DEVICES is set diff --git a/src/hip_device.cpp b/src/hip_device.cpp index b819673967..c935f20d34 100644 --- a/src/hip_device.cpp +++ b/src/hip_device.cpp @@ -150,8 +150,7 @@ hipError_t hipDeviceSynchronize(void) { HIP_INIT_API(); - ihipGetTlsDefaultDevice()->waitAllStreams(); // ignores non-blocking streams, this waits for all activity to finish. - + ihipGetTlsDefaultDevice()->locked_waitAllStreams(); // ignores non-blocking streams, this waits for all activity to finish. return ihipLogStatus(hipSuccess); } @@ -174,15 +173,12 @@ hipError_t hipDeviceReset(void) if (device) { //--- - //Wait for pending activity to complete? - //TODO - check if this is required behavior: - for (auto streamI=device->_streams.begin(); streamI!=device->_streams.end(); streamI++) { - ihipStream_t *stream = *streamI; - stream->wait(); - } + //Wait for pending activity to complete? TODO - check if this is required behavior: + + device->locked_waitAllStreams(); // Release device resources (streams and memory): - device->reset(); + device->locked_reset(); } return ihipLogStatus(hipSuccess); diff --git a/src/hip_event.cpp b/src/hip_event.cpp index 27232875b9..66a7c7c7fc 100644 --- a/src/hip_event.cpp +++ b/src/hip_event.cpp @@ -67,7 +67,7 @@ hipError_t hipEventRecord(hipEvent_t event, hipStream_t stream) // TODO-HCC fix this - is CUDA this conservative or still uses device timestamps? // TODO-HCC can we use barrier or event marker to implement better solution? ihipDevice_t *device = ihipGetTlsDefaultDevice(); - device->syncDefaultStream(true); + device->locked_syncDefaultStream(true); eh->_timestamp = hc::get_system_ticks(); eh->_state = hipEventStatusRecorded; @@ -117,7 +117,7 @@ hipError_t hipEventSynchronize(hipEvent_t event) return ihipLogStatus(hipSuccess); } else if (eh->_stream == NULL) { ihipDevice_t *device = ihipGetTlsDefaultDevice(); - device->syncDefaultStream(true); + device->locked_syncDefaultStream(true); return ihipLogStatus(hipSuccess); } else { #if __hcc_workweek__ >= 16033 diff --git a/src/hip_hcc.cpp b/src/hip_hcc.cpp index 1c93ae48dd..cb216919e2 100644 --- a/src/hip_hcc.cpp +++ b/src/hip_hcc.cpp @@ -145,6 +145,8 @@ ihipStream_t::ihipStream_t(unsigned device_index, hc::accelerator_view av, SeqNu ihipStream_t::~ihipStream_t() { _signalPool.clear(); + // Hack to catch memory issues, in particular accesses to _acc after it has been destroyed. + //memset (&_av, 0x0, sizeof(hc::accelerator_view&)); } @@ -348,6 +350,8 @@ int ihipStream_t::preCopyCommand(ihipSignal_t *lastCopy, hsa_signal_t *waitSigna hsa_signal_t *hsaSignal = (static_cast (_last_kernel_future.get_native_handle())); if (hsaSignal) { *waitSignal = * hsaSignal; + } else { + assert(0); // if NULL signal, and we return 1, hsa_amd_memory_copy_async will fail. Confirm this never happens. } } else if (_last_copy_signal) { needSync = 1; @@ -383,10 +387,13 @@ int ihipStream_t::preCopyCommand(ihipSignal_t *lastCopy, hsa_signal_t *waitSigna // //Reset the device - this is called from hipDeviceReset. //Device may be reset multiple times, and may be reset after init. -void ihipDevice_t::reset() +void ihipDevice_t::locked_reset() { + // Obtain mutex access to the device critical data, release by destructor + Locked_ihipDeviceCritical_t l(_criticalData); + // Reset and remove streams: - _streams.clear(); + l->streams().clear(); // Reset and release all memory stored in the tracker: am_memtracker_reset(_acc); @@ -395,7 +402,7 @@ void ihipDevice_t::reset() //--- -void ihipDevice_t::init(unsigned device_index, hc::accelerator acc, unsigned flags) +void ihipDevice_t::locked_init(unsigned device_index, hc::accelerator acc, unsigned flags) { _stream_id = 0; @@ -416,8 +423,12 @@ void ihipDevice_t::init(unsigned device_index, hc::accelerator acc, unsigned fla getProperties(&_props); - _default_stream = new ihipStream_t(device_index, acc.get_default_view(), _stream_id++, hipStreamDefault); - this->_streams.push_back(_default_stream); + { + Locked_ihipDeviceCritical_t l(_criticalData); + _default_stream = new ihipStream_t(device_index, acc.get_default_view(), _stream_id++, hipStreamDefault); + l->streams().push_back(_default_stream); + } + tprintf(DB_SYNC, "created device with default_stream=%p\n", _default_stream); @@ -670,11 +681,13 @@ hipError_t ihipDevice_t::getProperties(hipDeviceProp_t* prop) // Implement "default" stream syncronization // This waits for all other streams to drain before continuing. // If waitOnSelf is set, this additionally waits for the default stream to empty. -void ihipDevice_t::syncDefaultStream(bool waitOnSelf) +void ihipDevice_t::locked_syncDefaultStream(bool waitOnSelf) { + Locked_ihipDeviceCritical_t l(_criticalData); + tprintf(DB_SYNC, "syncDefaultStream\n"); - for (auto streamI=_streams.begin(); streamI!=_streams.end(); streamI++) { + for (auto streamI=l->const_streams().begin(); streamI!=l->const_streams().end(); streamI++) { ihipStream_t *stream = *streamI; // Don't wait for streams that have "opted-out" of syncing with NULL stream. @@ -690,13 +703,31 @@ void ihipDevice_t::syncDefaultStream(bool waitOnSelf) } } +//--- +void ihipDevice_t::locked_addStream(ihipStream_t *s) +{ + Locked_ihipDeviceCritical_t l(_criticalData); + + l->streams().push_back(s); +} + +//--- +void ihipDevice_t::locked_removeStream(ihipStream_t *s) +{ + Locked_ihipDeviceCritical_t l(_criticalData); + + l->streams().remove(s); +} + //--- //Heavyweight synchronization that waits on all streams, ignoring hipStreamNonBlocking flag. -void ihipDevice_t::waitAllStreams() +void ihipDevice_t::locked_waitAllStreams() { + Locked_ihipDeviceCritical_t l(_criticalData); + tprintf(DB_SYNC, "waitAllStream\n"); - for (auto streamI=_streams.begin(); streamI!=_streams.end(); streamI++) { + for (auto streamI=l->const_streams().begin(); streamI!=l->const_streams().end(); streamI++) { (*streamI)->wait(); } } @@ -864,7 +895,7 @@ void ihipInit() //If device is not in visible devices list, ignore continue; } - g_devices[g_deviceCnt].init(g_deviceCnt, accs[i], hipDeviceMapHost); + g_devices[g_deviceCnt].locked_init(g_deviceCnt, accs[i], hipDeviceMapHost); g_deviceCnt++; } } @@ -933,7 +964,7 @@ hipStream_t ihipSyncAndResolveStream(hipStream_t stream) ihipDevice_t *device = ihipGetTlsDefaultDevice(); #ifndef HIP_API_PER_THREAD_DEFAULT_STREAM - device->syncDefaultStream(false); + device->locked_syncDefaultStream(false); #endif return device->_default_stream; } else { @@ -1302,6 +1333,7 @@ hipError_t hipHccGetAcceleratorView(hipStream_t stream, hc::accelerator_view **a // TODO - describe naming convention. ihip _. No accessors. No early returns from functions. Set status to success at top, only set error codes in implementation. No tabs. // Caps convention _ or camelCase // if { } +// Should use ihip* data structures inside code rather than app-facing hip. For example, use ihipDevice_t (rather than hipDevice_t), ihipStream_t (rather than hipStream_t). // TODO - describe MT strategy // //// TODO - add identifier numbers for streams and devices to help with debugging. diff --git a/src/hip_memory.cpp b/src/hip_memory.cpp index 46fbd38fd5..c633d23611 100644 --- a/src/hip_memory.cpp +++ b/src/hip_memory.cpp @@ -459,7 +459,7 @@ hipError_t hipFree(void* ptr) hipError_t hipStatus = hipErrorInvalidDevicePointer; // Synchronize to ensure all work has finished. - ihipGetTlsDefaultDevice()->waitAllStreams(); // ignores non-blocking streams, this waits for all activity to finish. + ihipGetTlsDefaultDevice()->locked_waitAllStreams(); // ignores non-blocking streams, this waits for all activity to finish. if (ptr) { hc::accelerator acc; diff --git a/src/hip_stream.cpp b/src/hip_stream.cpp index 6dbf7458ce..784fa9224c 100644 --- a/src/hip_stream.cpp +++ b/src/hip_stream.cpp @@ -42,7 +42,9 @@ hipError_t hipStreamCreateWithFlags(hipStream_t *stream, unsigned int flags) //This matches CUDA stream behavior: auto istream = new ihipStream_t(device->_device_index, acc.create_view(), device->_stream_id, flags); - device->_streams.push_back(istream); + + device->locked_addStream(istream); + *stream = istream; tprintf(DB_SYNC, "hipStreamCreate, stream=%p\n", *stream); @@ -82,7 +84,7 @@ hipError_t hipStreamSynchronize(hipStream_t stream) if (stream == NULL) { ihipDevice_t *device = ihipGetTlsDefaultDevice(); - device->syncDefaultStream(true/*waitOnSelf*/); + device->locked_syncDefaultStream(true/*waitOnSelf*/); } else { stream->wait(); e = hipSuccess; @@ -106,7 +108,7 @@ hipError_t hipStreamDestroy(hipStream_t stream) //--- Drain the stream: if (stream == NULL) { ihipDevice_t *device = ihipGetTlsDefaultDevice(); - device->syncDefaultStream(true/*waitOnSelf*/); + device->locked_syncDefaultStream(true/*waitOnSelf*/); } else { stream->wait(); e = hipSuccess; @@ -115,7 +117,7 @@ hipError_t hipStreamDestroy(hipStream_t stream) ihipDevice_t *device = stream->getDevice(); if (device) { - device->_streams.remove(stream); + device->locked_removeStream(stream); delete stream; } else { e = hipErrorInvalidResourceHandle; diff --git a/tests/src/CMakeLists.txt b/tests/src/CMakeLists.txt index acd2060647..4288624ccf 100644 --- a/tests/src/CMakeLists.txt +++ b/tests/src/CMakeLists.txt @@ -9,7 +9,6 @@ set (HIP_Unit_Test_VERSION_MAJOR 1) set (HIP_Unit_Test_VERSION_MINOR 0) set(HIP_PATH $ENV{HIP_PATH}) -MESSAGE("HIP_PATH=" ${HIP_PATH}) if (NOT DEFINED HIP_PATH) set (HIP_PATH ../..) endif() @@ -39,12 +38,13 @@ if (${HIP_PLATFORM} STREQUAL "hcc") #These includes are used for all files. #Include HIP and HC since the tests need both of these: - #Note below HSA path is surgically included only where necessary. include_directories(${HIP_PATH}/include) - include_directories(${HSA_PATH}/include) # This will create a subdir "hip_hcc" in the test build directory # Any changes to hip_hcc source will be detected and force the library and then the tests to be rebuilt. + add_subdirectory(${HIP_PATH} build.hip_hcc) + link_directories(${CMAKE_CURRENT_BINARY_DIR}/build.hip_hcc) # search the local hip_hcc for libhip_hcc.a + elseif (${HIP_PLATFORM} STREQUAL "nvcc") MESSAGE ("HIP_PLATFORM=nvcc") @@ -156,6 +156,8 @@ make_hip_executable (hipFuncGetDevice hipFuncGetDevice.cpp) make_hip_executable (hipFuncSetDevice hipFuncSetDevice.cpp) make_hip_executable (hipFuncDeviceSynchronize hipFuncDeviceSynchronize.cpp) +make_hip_executable (hipThreadSafeDevice hipThreadSafeDevice.cpp) + make_test(hip_ballot " " ) make_test(hip_anyall " " ) make_test(hip_popc " " ) @@ -196,5 +198,6 @@ make_test(hipFuncSetDeviceFlags " ") make_test(hipFuncGetDevice " ") make_test(hipFuncSetDevice " ") make_test(hipFuncDeviceSynchronize " ") +make_test (hipThreadSafeDevice " ") make_hipify_test(specialFunc.cu ) diff --git a/tests/src/hipThreadSafeDevice.cpp b/tests/src/hipThreadSafeDevice.cpp new file mode 100644 index 0000000000..fca994cba8 --- /dev/null +++ b/tests/src/hipThreadSafeDevice.cpp @@ -0,0 +1,137 @@ +#include +#include "test_common.h" + + +// Create a lot of streams and then destroy 'em. +void createThenDestroyStreams(int iterations, int burstSize) +{ + hipStream_t *streams = new hipStream_t[burstSize]; + + for (int i=0; i