Merge pull request #254 from bensander/event_thread_safe

Event thread safe

[ROCm/clr commit: 7b534885e6]
Этот коммит содержится в:
Ben Sander
2017-11-07 17:59:51 +01:00
коммит произвёл GitHub
родитель 15a6a84a81 2c95d48f87
Коммит 6f7992387b
4 изменённых файлов: 190 добавлений и 123 удалений
+97 -90
Просмотреть файл
@@ -31,12 +31,9 @@ THE SOFTWARE.
ihipEvent_t::ihipEvent_t(unsigned flags)
: _criticalData(this)
{
_state = hipEventStatusCreated;
_stream = NULL;
_flags = flags;
_timestamp = 0;
_type = hipEventTypeIndependent;
};
@@ -45,56 +42,45 @@ ihipEvent_t::ihipEvent_t(unsigned flags)
void ihipEvent_t::attachToCompletionFuture(const hc::completion_future *cf,
hipStream_t stream, ihipEventType_t eventType)
{
_marker = *cf;
_type = eventType;
_stream = stream;
_state = hipEventStatusRecording;
LockedAccessor_EventCrit_t crit(_criticalData);
crit->_eventData.marker(*cf);
crit->_eventData._type = eventType;
crit->_eventData._stream = stream;
crit->_eventData._state = hipEventStatusRecording;
}
void ihipEvent_t::refereshEventStatus()
std::pair<hipEventStatus_t, uint64_t>
ihipEvent_t::refreshEventStatus()
{
bool isReady0 = locked_isReady();
bool isReady1;
int val = 0;
if (_state == hipEventStatusRecording) {
// TODO - use completion-future functions to obtain ticks and timestamps:
hsa_signal_t *sig = static_cast<hsa_signal_t*> (_marker.get_native_handle());
isReady1 = locked_isReady();
if (sig) {
val = hsa_signal_load_acquire(*sig);
if (val == 0) {
auto ecd = locked_copyCrit();
if (ecd._state == hipEventStatusRecording) {
bool isReady1 = ecd._stream->locked_eventIsReady(this);
if (isReady1) {
LockedAccessor_EventCrit_t eCrit(_criticalData);
if ((_type == hipEventTypeIndependent) || (_type == hipEventTypeStopCommand)) {
_timestamp = _marker.get_end_tick();
} else if (_type == hipEventTypeStartCommand) {
_timestamp = _marker.get_begin_tick();
} else {
assert(0); // TODO - move to debug assert
_timestamp = 0;
}
_state = hipEventStatusComplete;
if ((eCrit->_eventData._type == hipEventTypeIndependent) ||
(eCrit->_eventData._type == hipEventTypeStopCommand)) {
eCrit->_eventData._timestamp = eCrit->_eventData.marker().get_end_tick();
} else if (eCrit->_eventData._type == hipEventTypeStartCommand) {
eCrit->_eventData._timestamp = eCrit->_eventData.marker().get_begin_tick();
} else {
eCrit->_eventData._timestamp = 0;
assert(0); // TODO - move to debug assert
}
eCrit->_eventData._state = hipEventStatusComplete;
return std::pair<hipEventStatus_t, uint64_t> (eCrit->_eventData._state, eCrit->_eventData._timestamp);
}
}
}
if (_state != hipEventStatusComplete) {
//printf (" not ready isReady0=%d val=%d isReady1=%d\n", isReady0, val, isReady1);
}
// Not complete path here:
return std::pair<hipEventStatus_t, uint64_t> (ecd._state, ecd._timestamp);
}
bool ihipEvent_t::locked_isReady()
{
return _stream->locked_eventIsReady(this);
}
void ihipEvent_t::locked_waitComplete(hc::hcWaitMode waitMode)
{
return _stream->locked_eventWaitComplete(this, waitMode);
}
hipError_t ihipEventCreate(hipEvent_t* event, unsigned flags)
@@ -136,33 +122,44 @@ hipError_t hipEventCreate(hipEvent_t* event)
return ihipLogStatus(ihipEventCreate(event, 0));
}
hipError_t hipEventRecord(hipEvent_t event, hipStream_t stream)
{
HIP_INIT_SPECIAL_API(TRACE_SYNC, event, stream);
if (event && event->_state != hipEventStatusUnitialized) {
auto ecd = event->locked_copyCrit();
if (event && ecd._state != hipEventStatusUnitialized) {
stream = ihipSyncAndResolveStream(stream);
event->_stream = stream;
if (HIP_SYNC_NULL_STREAM && stream->isDefaultStream()) {
// TODO-HIP_SYNC_NULL_STREAM : can remove this code when HIP_SYNC_NULL_STREAM = 0
//
// If default stream , then wait on all queues.
ihipCtx_t *ctx = ihipGetTlsDefaultCtx();
ctx->locked_syncDefaultStream(true, true);
event->_timestamp = hc::get_system_ticks();
event->_state = hipEventStatusComplete;
{
LockedAccessor_EventCrit_t eCrit(event->criticalData());
eCrit->_eventData.marker(hc::completion_future()); // reset event
eCrit->_eventData._stream = stream;
eCrit->_eventData._timestamp = hc::get_system_ticks();
eCrit->_eventData._state = hipEventStatusComplete;
}
return ihipLogStatus(hipSuccess);
} else {
// Clear timestamps
event->_timestamp = 0;
// Record the event in the stream:
stream->locked_recordEvent(event);
event->_state = hipEventStatusRecording;
// Keep a copy outside the critical section so we lock stream first, then event - to avoid deadlock
hc::completion_future cf = stream->locked_recordEvent(event);
{
LockedAccessor_EventCrit_t eCrit(event->criticalData());
eCrit->_eventData.marker(cf);
eCrit->_eventData._stream = stream;
eCrit->_eventData._timestamp = 0;
eCrit->_eventData._state = hipEventStatusRecording;
}
return ihipLogStatus(hipSuccess);
}
} else {
@@ -170,15 +167,13 @@ hipError_t hipEventRecord(hipEvent_t event, hipStream_t stream)
}
}
hipError_t hipEventDestroy(hipEvent_t event)
{
HIP_INIT_API(event);
if (event) {
event->_state = hipEventStatusUnitialized;
delete event;
event = NULL;
return ihipLogStatus(hipSuccess);
} else {
@@ -193,20 +188,24 @@ hipError_t hipEventSynchronize(hipEvent_t event)
if (!(event->_flags & hipEventReleaseToSystem)) {
tprintf(DB_WARN, "hipEventSynchronize on event without system-scope fence ; consider creating with hipEventReleaseToSystem\n");
}
auto ecd = event->locked_copyCrit();
if (event) {
if (event->_state == hipEventStatusUnitialized) {
if (ecd._state == hipEventStatusUnitialized) {
return ihipLogStatus(hipErrorInvalidResourceHandle);
} else if (event->_state == hipEventStatusCreated ) {
} else if (ecd._state == hipEventStatusCreated ) {
// Created but not actually recorded on any device:
return ihipLogStatus(hipSuccess);
} else if (HIP_SYNC_NULL_STREAM && (event->_stream->isDefaultStream() )) {
} else if (HIP_SYNC_NULL_STREAM && (ecd._stream->isDefaultStream() )) {
auto *ctx = ihipGetTlsDefaultCtx();
// TODO-HIP_SYNC_NULL_STREAM - can remove this code
ctx->locked_syncDefaultStream(true, true);
return ihipLogStatus(hipSuccess);
} else {
event->locked_waitComplete((event->_flags & hipEventBlockingSync) ? hc::hcWaitModeBlocked : hc::hcWaitModeActive);
ecd._stream->locked_eventWaitComplete(
ecd.marker(),
(event->_flags & hipEventBlockingSync) ?
hc::hcWaitModeBlocked : hc::hcWaitModeActive);
return ihipLogStatus(hipSuccess);
}
@@ -223,44 +222,50 @@ hipError_t hipEventElapsedTime(float *ms, hipEvent_t start, hipEvent_t stop)
*ms = 0.0f;
if ((start == nullptr) ||
(start->_flags & hipEventDisableTiming) ||
(start->_state == hipEventStatusUnitialized) || (start->_state == hipEventStatusCreated) ||
(stop == nullptr) ||
(stop->_flags & hipEventDisableTiming) ||
( stop->_state == hipEventStatusUnitialized) || ( stop->_state == hipEventStatusCreated)) {
// Both events must be at least recorded else return hipErrorInvalidResourceHandle
if ((start == nullptr) || (stop == nullptr)) {
status = hipErrorInvalidResourceHandle;
} else {
// Refresh status, if still recording...
start->refereshEventStatus();
stop->refereshEventStatus();
auto startEcd = start->locked_copyCrit();
auto stopEcd = stop->locked_copyCrit();
if ((start->_state == hipEventStatusComplete) && (stop->_state == hipEventStatusComplete)) {
// Common case, we have good information for both events.
if ((start->_flags & hipEventDisableTiming) ||
(startEcd._state == hipEventStatusUnitialized) || (startEcd._state == hipEventStatusCreated) ||
(stop->_flags & hipEventDisableTiming) ||
(stopEcd._state == hipEventStatusUnitialized) || (stopEcd._state == hipEventStatusCreated)) {
int64_t tickDiff = (stop->timestamp() - start->timestamp());
// Both events must be at least recorded else return hipErrorInvalidResourceHandle
status = hipErrorInvalidResourceHandle;
uint64_t freqHz;
hsa_system_get_info(HSA_SYSTEM_INFO_TIMESTAMP_FREQUENCY, &freqHz);
if (freqHz) {
*ms = ((double)(tickDiff) / (double)(freqHz)) * 1000.0f;
status = hipSuccess;
} else {
* ms = 0.0f;
status = hipErrorInvalidValue;
}
// Refresh status, if still recording...
auto startStatus = start->refreshEventStatus(); // pair < state, timestamp >
auto stopStatus = stop->refreshEventStatus(); // pair < state, timestamp >
if ((startStatus.first == hipEventStatusComplete) && (stopStatus.first == hipEventStatusComplete)) {
// Common case, we have good information for both events. 'second" is the timestamp:
int64_t tickDiff = (stopStatus.second - startStatus.second);
uint64_t freqHz;
hsa_system_get_info(HSA_SYSTEM_INFO_TIMESTAMP_FREQUENCY, &freqHz);
if (freqHz) {
*ms = ((double)(tickDiff) / (double)(freqHz)) * 1000.0f;
status = hipSuccess;
} else {
* ms = 0.0f;
status = hipErrorInvalidValue;
}
} else if ((start->_state == hipEventStatusRecording) ||
(stop->_state == hipEventStatusRecording)) {
} else if ((startStatus.first == hipEventStatusRecording) ||
(stopStatus.first == hipEventStatusRecording)) {
status = hipErrorNotReady;
} else {
status = hipErrorNotReady;
} else {
assert(0);
}
}
}
@@ -275,7 +280,9 @@ hipError_t hipEventQuery(hipEvent_t event)
tprintf(DB_WARN, "hipEventQuery on event without system-scope fence ; consider creating with hipEventReleaseToSystem\n");
}
if ((event->_state == hipEventStatusRecording) && !event->locked_isReady()) {
auto ecd = event->locked_copyCrit();
if ((ecd._state == hipEventStatusRecording) && !ecd._stream->locked_eventIsReady(event)) {
return ihipLogStatus(hipErrorNotReady);
} else {
return ihipLogStatus(hipSuccess);
+12 -9
Просмотреть файл
@@ -339,12 +339,11 @@ void ihipStream_t::locked_wait()
// Causes current stream to wait for specified event to complete:
// Note this does not provide any kind of host serialization.
void ihipStream_t::locked_streamWaitEvent(hipEvent_t event)
void ihipStream_t::locked_streamWaitEvent(ihipEventData_t &ecd)
{
LockedAccessor_StreamCrit_t crit(_criticalData);
crit->_av.create_blocking_marker(event->marker(), hc::accelerator_scope);
crit->_av.create_blocking_marker(ecd.marker(), hc::accelerator_scope);
}
@@ -352,24 +351,28 @@ void ihipStream_t::locked_streamWaitEvent(hipEvent_t event)
// Note this does not provide any kind of host serialization.
bool ihipStream_t::locked_eventIsReady(hipEvent_t event)
{
// Event query that returns "Complete" may cause HCC to manipulate
// internal queue state so lock the stream's queue here.
LockedAccessor_StreamCrit_t crit(_criticalData);
LockedAccessor_StreamCrit_t scrit(_criticalData);
return (event->marker().is_ready());
LockedAccessor_EventCrit_t ecrit(event->criticalData());
return (ecrit->_eventData.marker().is_ready());
}
void ihipStream_t::locked_eventWaitComplete(hipEvent_t event, hc::hcWaitMode waitMode)
// Waiting on event can cause HCC to reclaim stream resources - so need to lock the stream.
void ihipStream_t::locked_eventWaitComplete(hc::completion_future &marker, hc::hcWaitMode waitMode)
{
LockedAccessor_StreamCrit_t crit(_criticalData);
event->marker().wait(waitMode);
marker.wait(waitMode);
}
// Create a marker in this stream.
// Save state in the event so it can track the status of the event.
void ihipStream_t::locked_recordEvent(hipEvent_t event)
hc::completion_future ihipStream_t::locked_recordEvent(hipEvent_t event)
{
// Lock the stream to prevent simultaneous access
LockedAccessor_StreamCrit_t crit(_criticalData);
@@ -385,7 +388,7 @@ void ihipStream_t::locked_recordEvent(hipEvent_t event)
scopeFlag = HIP_EVENT_SYS_RELEASE ? hc::system_scope : hc::accelerator_scope;
}
event->marker(crit->_av.create_marker(scopeFlag));
return crit->_av.create_marker(scopeFlag);
};
//=============================================================================
+73 -21
Просмотреть файл
@@ -137,6 +137,7 @@ extern std::vector<ProfTrigger> g_dbStopTriggers;
class ihipStream_t;
class ihipDevice_t;
class ihipCtx_t;
struct ihipEventData_t;
// Color defs for debug messages:
#define KNRM "\x1B[0m"
@@ -152,10 +153,12 @@ extern const char *API_COLOR;
extern const char *API_COLOR_END;
// If set, thread-safety is enforced on all stream functions.
// Stream functions will acquire a mutex before entering critical sections.
#define STREAM_THREAD_SAFE 1
// If set, thread-safety is enforced on all event/stream/ctx/device functions.
// Can disable for performance or functional experiments - in this case
// the code uses a dummy "no-op" mutex.
#define EVENT_THREAD_SAFE 1
#define STREAM_THREAD_SAFE 1
#define CTX_THREAD_SAFE 1
@@ -390,6 +393,12 @@ class FakeMutex
void unlock() { }
};
#if EVENT_THREAD_SAFE
typedef std::mutex EventMutex;
#else
#warning "Stream thread-safe disabled"
typedef FakeMutex EventMutex;
#endif
#if STREAM_THREAD_SAFE
typedef std::mutex StreamMutex;
@@ -540,11 +549,11 @@ public:
hc::accelerator_view* locked_getAv() { LockedAccessor_StreamCrit_t crit(_criticalData); return &(crit->_av); };
void locked_streamWaitEvent(hipEvent_t event);
void locked_recordEvent(hipEvent_t event);
void locked_streamWaitEvent(ihipEventData_t & event);
hc::completion_future locked_recordEvent(hipEvent_t event);
bool locked_eventIsReady(hipEvent_t event);
void locked_eventWaitComplete(hipEvent_t event, hc::hcWaitMode waitMode);
void locked_eventWaitComplete(hc::completion_future &marker, hc::hcWaitMode waitMode);
ihipStreamCritical_t &criticalData() { return _criticalData; };
@@ -628,32 +637,76 @@ enum ihipEventType_t {
hipEventTypeStopCommand,
};
struct ihipEventData_t
{
ihipEventData_t() {
_state = hipEventStatusCreated;
_stream = NULL;
_timestamp = 0;
_type = hipEventTypeIndependent;
};
void marker(const hc::completion_future & marker) { _marker = marker; };
hc::completion_future & marker() { return _marker; }
uint64_t timestamp() const { return _timestamp; } ;
ihipEventType_t type() const { return _type; };
ihipEventType_t _type;
hipEventStatus_t _state;
hipStream_t _stream; // Stream where the event is recorded. Null stream is resolved to actual stream when recorded
uint64_t _timestamp; // store timestamp, may be set on host or by marker.
private:
hc::completion_future _marker;
};
//=============================================================================
//class ihipEventCriticalBase_t
template <typename MUTEX_TYPE>
class ihipEventCriticalBase_t : LockedBase<MUTEX_TYPE>
{
public:
ihipEventCriticalBase_t(const ihipEvent_t *parentEvent) :
_parent(parentEvent)
{}
~ihipEventCriticalBase_t() {};
// Keep data in structure so it can be easily copied into snapshots
// (used to reduce lock contention and preserve correct lock order)
ihipEventData_t _eventData;
private:
const ihipEvent_t *_parent;
friend class LockedAccessor<ihipEventCriticalBase_t>;
};
typedef ihipEventCriticalBase_t<EventMutex> ihipEventCritical_t;
typedef LockedAccessor<ihipEventCritical_t> LockedAccessor_EventCrit_t;
// internal hip event structure.
class ihipEvent_t {
public:
ihipEvent_t(unsigned flags);
void attachToCompletionFuture(const hc::completion_future *cf, hipStream_t stream, ihipEventType_t eventType);
void refereshEventStatus();
hc::completion_future & marker() { return _marker; }
void marker(hc::completion_future cf) { _marker = cf; };
std::pair<hipEventStatus_t, uint64_t> refreshEventStatus(); // returns pair <state, timestamp>
bool locked_isReady();
void locked_waitComplete(hc::hcWaitMode waitMode);
uint64_t timestamp() const { return _timestamp; } ;
ihipEventType_t type() const { return _type; };
// Return a copy of the critical state. The critical data is locked during the copy.
ihipEventData_t locked_copyCrit() {
LockedAccessor_EventCrit_t crit(_criticalData);
return _criticalData._eventData;
};
ihipEventCritical_t &criticalData() { return _criticalData; };
public:
hipEventStatus_t _state;
hipStream_t _stream; // Stream where the event is recorded. Null stream is resolved to actual stream when recorded
unsigned _flags;
private:
hc::completion_future _marker;
ihipEventType_t _type;
uint64_t _timestamp; // store timestamp, may be set on host or by marker.
ihipEventCritical_t _criticalData;
friend hipError_t hipEventRecord(hipEvent_t event, hipStream_t stream);
} ;
@@ -671,7 +724,6 @@ public:
};
~ihipDeviceCriticalBase_t() {
}
// Contexts:
+8 -3
Просмотреть файл
@@ -93,18 +93,23 @@ hipError_t hipStreamWaitEvent(hipStream_t stream, hipEvent_t event, unsigned int
hipError_t e = hipSuccess;
auto ecd = event->locked_copyCrit();
if (event == nullptr) {
e = hipErrorInvalidResourceHandle;
} else if (event->_state != hipEventStatusUnitialized) {
} else if ((ecd._state != hipEventStatusUnitialized) &&
(ecd._state != hipEventStatusCreated)) {
if (HIP_SYNC_STREAM_WAIT || (HIP_SYNC_NULL_STREAM && (stream == 0))) {
// conservative wait on host for the specified event to complete:
event->locked_waitComplete((event->_flags & hipEventBlockingSync) ? hc::hcWaitModeBlocked : hc::hcWaitModeActive);
// return _stream->locked_eventWaitComplete(this, waitMode);
//
ecd._stream->locked_eventWaitComplete(ecd.marker(), (event->_flags & hipEventBlockingSync) ? hc::hcWaitModeBlocked : hc::hcWaitModeActive);
} else {
stream = ihipSyncAndResolveStream(stream);
// This will use create_blocking_marker to wait on the specified queue.
stream->locked_streamWaitEvent(event);
stream->locked_streamWaitEvent(ecd);
}
} // else event not recorded, return immediately and don't create marker.