diff --git a/include/hcc_detail/hip_hcc.h b/include/hcc_detail/hip_hcc.h index 9000602ff1..409573d2dc 100644 --- a/include/hcc_detail/hip_hcc.h +++ b/include/hcc_detail/hip_hcc.h @@ -63,8 +63,8 @@ extern int HIP_DISABLE_HW_COPY_DEP; extern thread_local int tls_defaultDevice; extern thread_local hipError_t tls_lastHipError; -struct ihipStream_t; -struct ihipDevice_t; +class ihipStream_t; +class ihipDevice_t; // Color defs for debug messages: @@ -86,6 +86,9 @@ struct ihipDevice_t; // Stream functions will acquire a mutex before entering critical sections. #define STREAM_THREAD_SAFE 1 + +#define DEVICE_THREAD_SAFE 1 + // If FORCE_COPY_DEP=1 , HIP runtime will add // synchronization for copy commands in the same stream, regardless of command type. // If FORCE_COPY_DEP=0 data copies of the same kind (H2H, H2D, D2H, D2D) are assumed to be implicitly ordered. @@ -376,12 +379,76 @@ struct ihipEvent_t { } ; +//--- +// Protects access to the member _data with a lock acquired on contruction/destruction. +// T must contain a _mutex field which meets the BasicLockable requirements (lock/unlock) +template +class LockedAccessor +{ +public: + LockedAccessor(T &data) : _data(&data) + { + _data->_mutex.lock(); + }; + + ~LockedAccessor() + { + _data->_mutex.unlock(); + } + + // Syntactic sugar so -> can be used to get the underlying type. + T *operator->() { return _data; }; + +private: + T *_data; +}; + + + +//--- +// Data that must be protected with thread-safe access +// All members are private - this class must be accessed through friend LockedAccessor which +// will lock the mutex on construction and unlock on destruction. +// +// MUTEX_TYPE is template argument so can easily convert to FakeMutex for performance or stress testing. +template +class ihipDeviceCriticalBase_t +{ +public: + friend class LockedAccessor; + + std::list &streams() { return _streams; }; + const std::list &const_streams() const { return _streams; }; + +private: + std::list _streams; // streams associated with this device. + MUTEX_TYPE _mutex; +}; + +// Typedefs for common definitions. +#if DEVICE_THREAD_SAFE +typedef ihipDeviceCriticalBase_t ihipDeviceCritical_t; // Use real mutex +#else +#warning "Device thread-safe disabled" +typedef ihipDeviceCriticalBase_t ihipDeviceCritical_t; // Fake mutex, for testing +#endif + +typedef LockedAccessor Locked_ihipDeviceCritical_t; //------------------------------------------------------------------------------------------------- -struct ihipDevice_t +// Functions which read or write the critical data are named locked_. +// ihipDevice_t does not use recursive locks so the ihip implementation must avoid calling a locked_ function from within a locked_ function. +// External functions which call several locked_ functions will acquire and release the lock for each function. if this occurs in +// performance-sensitive code we may want to refactor by adding non-locked functions and creating a new locked_ member function to call them all. +class ihipDevice_t { +public: // Functions: + void locked_addStream(ihipStream_t *s); + void locked_removeStream(ihipStream_t *s); + +public: // Data, set at initialization: unsigned _device_index; // index into g_devices. hipDeviceProp_t _props; // saved device properties. @@ -392,7 +459,6 @@ struct ihipDevice_t // NULL has special synchronization properties with other streams. ihipStream_t *_default_stream; - std::list _streams; // streams associated with this device. unsigned _compute_units; @@ -403,20 +469,24 @@ struct ihipDevice_t unsigned _device_flags; public: - void init(unsigned device_index, hc::accelerator acc, unsigned flags); + void locked_init(unsigned device_index, hc::accelerator acc, unsigned flags); ~ihipDevice_t(); - void reset(); hipError_t getProperties(hipDeviceProp_t* prop); - void waitAllStreams(); - void syncDefaultStream(bool waitOnSelf); + void locked_reset(); + void locked_waitAllStreams(); + void locked_syncDefaultStream(bool waitOnSelf); private: + // Members of _protected data MUST be accessed through the LockedAccessor. + // Search for LockedAccessor for examples; do not access _criticalData directly. + ihipDeviceCritical_t _criticalData; }; -// Global initialization. + +// Global variable definition: extern std::once_flag hip_initialized; extern ihipDevice_t *g_devices; // Array of all non-emulated (ie GPU) accelerators in the system. extern bool g_visible_device; // Set the flag when HIP_VISIBLE_DEVICES is set diff --git a/src/hip_device.cpp b/src/hip_device.cpp index b819673967..c935f20d34 100644 --- a/src/hip_device.cpp +++ b/src/hip_device.cpp @@ -150,8 +150,7 @@ hipError_t hipDeviceSynchronize(void) { HIP_INIT_API(); - ihipGetTlsDefaultDevice()->waitAllStreams(); // ignores non-blocking streams, this waits for all activity to finish. - + ihipGetTlsDefaultDevice()->locked_waitAllStreams(); // ignores non-blocking streams, this waits for all activity to finish. return ihipLogStatus(hipSuccess); } @@ -174,15 +173,12 @@ hipError_t hipDeviceReset(void) if (device) { //--- - //Wait for pending activity to complete? - //TODO - check if this is required behavior: - for (auto streamI=device->_streams.begin(); streamI!=device->_streams.end(); streamI++) { - ihipStream_t *stream = *streamI; - stream->wait(); - } + //Wait for pending activity to complete? TODO - check if this is required behavior: + + device->locked_waitAllStreams(); // Release device resources (streams and memory): - device->reset(); + device->locked_reset(); } return ihipLogStatus(hipSuccess); diff --git a/src/hip_event.cpp b/src/hip_event.cpp index 27232875b9..66a7c7c7fc 100644 --- a/src/hip_event.cpp +++ b/src/hip_event.cpp @@ -67,7 +67,7 @@ hipError_t hipEventRecord(hipEvent_t event, hipStream_t stream) // TODO-HCC fix this - is CUDA this conservative or still uses device timestamps? // TODO-HCC can we use barrier or event marker to implement better solution? ihipDevice_t *device = ihipGetTlsDefaultDevice(); - device->syncDefaultStream(true); + device->locked_syncDefaultStream(true); eh->_timestamp = hc::get_system_ticks(); eh->_state = hipEventStatusRecorded; @@ -117,7 +117,7 @@ hipError_t hipEventSynchronize(hipEvent_t event) return ihipLogStatus(hipSuccess); } else if (eh->_stream == NULL) { ihipDevice_t *device = ihipGetTlsDefaultDevice(); - device->syncDefaultStream(true); + device->locked_syncDefaultStream(true); return ihipLogStatus(hipSuccess); } else { #if __hcc_workweek__ >= 16033 diff --git a/src/hip_hcc.cpp b/src/hip_hcc.cpp index 1c93ae48dd..cb216919e2 100644 --- a/src/hip_hcc.cpp +++ b/src/hip_hcc.cpp @@ -145,6 +145,8 @@ ihipStream_t::ihipStream_t(unsigned device_index, hc::accelerator_view av, SeqNu ihipStream_t::~ihipStream_t() { _signalPool.clear(); + // Hack to catch memory issues, in particular accesses to _acc after it has been destroyed. + //memset (&_av, 0x0, sizeof(hc::accelerator_view&)); } @@ -348,6 +350,8 @@ int ihipStream_t::preCopyCommand(ihipSignal_t *lastCopy, hsa_signal_t *waitSigna hsa_signal_t *hsaSignal = (static_cast (_last_kernel_future.get_native_handle())); if (hsaSignal) { *waitSignal = * hsaSignal; + } else { + assert(0); // if NULL signal, and we return 1, hsa_amd_memory_copy_async will fail. Confirm this never happens. } } else if (_last_copy_signal) { needSync = 1; @@ -383,10 +387,13 @@ int ihipStream_t::preCopyCommand(ihipSignal_t *lastCopy, hsa_signal_t *waitSigna // //Reset the device - this is called from hipDeviceReset. //Device may be reset multiple times, and may be reset after init. -void ihipDevice_t::reset() +void ihipDevice_t::locked_reset() { + // Obtain mutex access to the device critical data, release by destructor + Locked_ihipDeviceCritical_t l(_criticalData); + // Reset and remove streams: - _streams.clear(); + l->streams().clear(); // Reset and release all memory stored in the tracker: am_memtracker_reset(_acc); @@ -395,7 +402,7 @@ void ihipDevice_t::reset() //--- -void ihipDevice_t::init(unsigned device_index, hc::accelerator acc, unsigned flags) +void ihipDevice_t::locked_init(unsigned device_index, hc::accelerator acc, unsigned flags) { _stream_id = 0; @@ -416,8 +423,12 @@ void ihipDevice_t::init(unsigned device_index, hc::accelerator acc, unsigned fla getProperties(&_props); - _default_stream = new ihipStream_t(device_index, acc.get_default_view(), _stream_id++, hipStreamDefault); - this->_streams.push_back(_default_stream); + { + Locked_ihipDeviceCritical_t l(_criticalData); + _default_stream = new ihipStream_t(device_index, acc.get_default_view(), _stream_id++, hipStreamDefault); + l->streams().push_back(_default_stream); + } + tprintf(DB_SYNC, "created device with default_stream=%p\n", _default_stream); @@ -670,11 +681,13 @@ hipError_t ihipDevice_t::getProperties(hipDeviceProp_t* prop) // Implement "default" stream syncronization // This waits for all other streams to drain before continuing. // If waitOnSelf is set, this additionally waits for the default stream to empty. -void ihipDevice_t::syncDefaultStream(bool waitOnSelf) +void ihipDevice_t::locked_syncDefaultStream(bool waitOnSelf) { + Locked_ihipDeviceCritical_t l(_criticalData); + tprintf(DB_SYNC, "syncDefaultStream\n"); - for (auto streamI=_streams.begin(); streamI!=_streams.end(); streamI++) { + for (auto streamI=l->const_streams().begin(); streamI!=l->const_streams().end(); streamI++) { ihipStream_t *stream = *streamI; // Don't wait for streams that have "opted-out" of syncing with NULL stream. @@ -690,13 +703,31 @@ void ihipDevice_t::syncDefaultStream(bool waitOnSelf) } } +//--- +void ihipDevice_t::locked_addStream(ihipStream_t *s) +{ + Locked_ihipDeviceCritical_t l(_criticalData); + + l->streams().push_back(s); +} + +//--- +void ihipDevice_t::locked_removeStream(ihipStream_t *s) +{ + Locked_ihipDeviceCritical_t l(_criticalData); + + l->streams().remove(s); +} + //--- //Heavyweight synchronization that waits on all streams, ignoring hipStreamNonBlocking flag. -void ihipDevice_t::waitAllStreams() +void ihipDevice_t::locked_waitAllStreams() { + Locked_ihipDeviceCritical_t l(_criticalData); + tprintf(DB_SYNC, "waitAllStream\n"); - for (auto streamI=_streams.begin(); streamI!=_streams.end(); streamI++) { + for (auto streamI=l->const_streams().begin(); streamI!=l->const_streams().end(); streamI++) { (*streamI)->wait(); } } @@ -864,7 +895,7 @@ void ihipInit() //If device is not in visible devices list, ignore continue; } - g_devices[g_deviceCnt].init(g_deviceCnt, accs[i], hipDeviceMapHost); + g_devices[g_deviceCnt].locked_init(g_deviceCnt, accs[i], hipDeviceMapHost); g_deviceCnt++; } } @@ -933,7 +964,7 @@ hipStream_t ihipSyncAndResolveStream(hipStream_t stream) ihipDevice_t *device = ihipGetTlsDefaultDevice(); #ifndef HIP_API_PER_THREAD_DEFAULT_STREAM - device->syncDefaultStream(false); + device->locked_syncDefaultStream(false); #endif return device->_default_stream; } else { @@ -1302,6 +1333,7 @@ hipError_t hipHccGetAcceleratorView(hipStream_t stream, hc::accelerator_view **a // TODO - describe naming convention. ihip _. No accessors. No early returns from functions. Set status to success at top, only set error codes in implementation. No tabs. // Caps convention _ or camelCase // if { } +// Should use ihip* data structures inside code rather than app-facing hip. For example, use ihipDevice_t (rather than hipDevice_t), ihipStream_t (rather than hipStream_t). // TODO - describe MT strategy // //// TODO - add identifier numbers for streams and devices to help with debugging. diff --git a/src/hip_memory.cpp b/src/hip_memory.cpp index 46fbd38fd5..c633d23611 100644 --- a/src/hip_memory.cpp +++ b/src/hip_memory.cpp @@ -459,7 +459,7 @@ hipError_t hipFree(void* ptr) hipError_t hipStatus = hipErrorInvalidDevicePointer; // Synchronize to ensure all work has finished. - ihipGetTlsDefaultDevice()->waitAllStreams(); // ignores non-blocking streams, this waits for all activity to finish. + ihipGetTlsDefaultDevice()->locked_waitAllStreams(); // ignores non-blocking streams, this waits for all activity to finish. if (ptr) { hc::accelerator acc; diff --git a/src/hip_stream.cpp b/src/hip_stream.cpp index 6dbf7458ce..784fa9224c 100644 --- a/src/hip_stream.cpp +++ b/src/hip_stream.cpp @@ -42,7 +42,9 @@ hipError_t hipStreamCreateWithFlags(hipStream_t *stream, unsigned int flags) //This matches CUDA stream behavior: auto istream = new ihipStream_t(device->_device_index, acc.create_view(), device->_stream_id, flags); - device->_streams.push_back(istream); + + device->locked_addStream(istream); + *stream = istream; tprintf(DB_SYNC, "hipStreamCreate, stream=%p\n", *stream); @@ -82,7 +84,7 @@ hipError_t hipStreamSynchronize(hipStream_t stream) if (stream == NULL) { ihipDevice_t *device = ihipGetTlsDefaultDevice(); - device->syncDefaultStream(true/*waitOnSelf*/); + device->locked_syncDefaultStream(true/*waitOnSelf*/); } else { stream->wait(); e = hipSuccess; @@ -106,7 +108,7 @@ hipError_t hipStreamDestroy(hipStream_t stream) //--- Drain the stream: if (stream == NULL) { ihipDevice_t *device = ihipGetTlsDefaultDevice(); - device->syncDefaultStream(true/*waitOnSelf*/); + device->locked_syncDefaultStream(true/*waitOnSelf*/); } else { stream->wait(); e = hipSuccess; @@ -115,7 +117,7 @@ hipError_t hipStreamDestroy(hipStream_t stream) ihipDevice_t *device = stream->getDevice(); if (device) { - device->_streams.remove(stream); + device->locked_removeStream(stream); delete stream; } else { e = hipErrorInvalidResourceHandle; diff --git a/tests/src/CMakeLists.txt b/tests/src/CMakeLists.txt index acd2060647..4288624ccf 100644 --- a/tests/src/CMakeLists.txt +++ b/tests/src/CMakeLists.txt @@ -9,7 +9,6 @@ set (HIP_Unit_Test_VERSION_MAJOR 1) set (HIP_Unit_Test_VERSION_MINOR 0) set(HIP_PATH $ENV{HIP_PATH}) -MESSAGE("HIP_PATH=" ${HIP_PATH}) if (NOT DEFINED HIP_PATH) set (HIP_PATH ../..) endif() @@ -39,12 +38,13 @@ if (${HIP_PLATFORM} STREQUAL "hcc") #These includes are used for all files. #Include HIP and HC since the tests need both of these: - #Note below HSA path is surgically included only where necessary. include_directories(${HIP_PATH}/include) - include_directories(${HSA_PATH}/include) # This will create a subdir "hip_hcc" in the test build directory # Any changes to hip_hcc source will be detected and force the library and then the tests to be rebuilt. + add_subdirectory(${HIP_PATH} build.hip_hcc) + link_directories(${CMAKE_CURRENT_BINARY_DIR}/build.hip_hcc) # search the local hip_hcc for libhip_hcc.a + elseif (${HIP_PLATFORM} STREQUAL "nvcc") MESSAGE ("HIP_PLATFORM=nvcc") @@ -156,6 +156,8 @@ make_hip_executable (hipFuncGetDevice hipFuncGetDevice.cpp) make_hip_executable (hipFuncSetDevice hipFuncSetDevice.cpp) make_hip_executable (hipFuncDeviceSynchronize hipFuncDeviceSynchronize.cpp) +make_hip_executable (hipThreadSafeDevice hipThreadSafeDevice.cpp) + make_test(hip_ballot " " ) make_test(hip_anyall " " ) make_test(hip_popc " " ) @@ -196,5 +198,6 @@ make_test(hipFuncSetDeviceFlags " ") make_test(hipFuncGetDevice " ") make_test(hipFuncSetDevice " ") make_test(hipFuncDeviceSynchronize " ") +make_test (hipThreadSafeDevice " ") make_hipify_test(specialFunc.cu ) diff --git a/tests/src/hipThreadSafeDevice.cpp b/tests/src/hipThreadSafeDevice.cpp new file mode 100644 index 0000000000..fca994cba8 --- /dev/null +++ b/tests/src/hipThreadSafeDevice.cpp @@ -0,0 +1,137 @@ +#include +#include "test_common.h" + + +// Create a lot of streams and then destroy 'em. +void createThenDestroyStreams(int iterations, int burstSize) +{ + hipStream_t *streams = new hipStream_t[burstSize]; + + for (int i=0; i