SWDEV-567852 - Clean-up HIP events (#2708)

* SWDEV-567852 - Clean-up HIP events

Removed unused fields, optimized memory allocation, improved encapsulation, modernized with C++11 auto, added documentation
This commit is contained in:
German Andryeyev
2026-01-28 13:34:07 -05:00
committad av GitHub
förälder 9de4a2ebb1
incheckning a5ada1e6e3
4 ändrade filer med 323 tillägg och 289 borttagningar
+191 -180
Visa fil
@@ -32,28 +32,24 @@ namespace hip {
static std::shared_mutex eventSetLock{};
static std::unordered_set<hipEvent_t> eventSet;
// ================================================================================================
bool Event::ready() {
if (event_->status() != CL_COMPLETE) {
event_->notifyCmdQueue();
}
// Check HW status of the ROCcrl event. Note: not all ROCclr modes support HW status
bool ready = CheckHwEvent();
if (!ready) {
ready = (event_->status() == CL_COMPLETE);
if (CheckHwEvent() || event_->status() == CL_COMPLETE) {
return true;
}
return ready;
event_->notifyCmdQueue();
return false;
}
// ================================================================================================
bool EventDD::ready() {
// Check HW status of the ROCcrl event. Note: not all ROCclr modes support HW status
bool ready = CheckHwEvent();
// FIXME: Remove status check entirely
if (!ready) {
ready = (event_->status() == CL_COMPLETE);
}
return ready;
return CheckHwEvent() || (event_->status() == CL_COMPLETE);
}
// ================================================================================================
hipError_t Event::query() {
amd::ScopedLock lock(lock_);
@@ -74,12 +70,13 @@ hipError_t Event::synchronize() {
return hipSuccess;
}
auto hip_device = g_devices[deviceId()];
// Check HW status of the ROCcrl event. Note: not all ROCclr modes support HW status
static constexpr bool kWaitCompletion = true;
amd::SyncPolicy policy =
constexpr bool kWaitCompletion = true;
const amd::SyncPolicy policy =
(flags_ == hipEventBlockingSync) ? amd::SyncPolicy::Blocking : amd::SyncPolicy::Auto;
if (!hip_device->devices()[0]->IsHwEventReady(*event_, kWaitCompletion, policy)) {
const amd::Device* device = g_devices[deviceId()]->devices()[0];
if (!device->IsHwEventReady(*event_, kWaitCompletion, policy)) {
event_->awaitCompletion();
}
return hipSuccess;
@@ -88,65 +85,62 @@ hipError_t Event::synchronize() {
// ================================================================================================
bool Event::awaitEventCompletion() { return event_->awaitCompletion(); }
// ================================================================================================
bool EventDD::awaitEventCompletion() {
amd::SyncPolicy policy =
constexpr bool kWaitCompletion = true;
const amd::SyncPolicy policy =
(flags_ == hipEventBlockingSync) ? amd::SyncPolicy::Blocking : amd::SyncPolicy::Auto;
return g_devices[deviceId()]->devices()[0]->IsHwEventReady(*event_, true, policy);
return g_devices[deviceId()]->devices()[0]->IsHwEventReady(*event_, kWaitCompletion, policy);
}
// ================================================================================================
hipError_t Event::elapsedTime(Event& eStop, float& ms) {
amd::ScopedLock startLock(lock_);
// Handle same event case
if (this == &eStop) {
if (event_ == nullptr || (flags_ & hipEventDisableTiming)) {
return hipErrorInvalidHandle;
}
ms = 0.f;
if (event_ == nullptr) {
return hipErrorInvalidHandle;
}
if (flags_ & hipEventDisableTiming) {
return hipErrorInvalidHandle;
}
if (!ready()) {
return hipErrorNotReady;
}
return hipSuccess;
return ready() ? hipSuccess : hipErrorNotReady;
}
amd::ScopedLock stopLock(eStop.lock());
// Validate events
if (event_ == nullptr || eStop.event() == nullptr) {
return hipErrorInvalidHandle;
}
if ((flags_ | eStop.flags_) & hipEventDisableTiming) {
return hipErrorInvalidHandle;
}
if (!ready() || !eStop.ready()) {
return hipErrorNotReady;
}
constexpr float kNsToMs = 1.0f / 1000000.0f;
if (event_ == eStop.event_) {
// Events are the same, which indicates the stream is empty and likely
// eventRecord is called on another stream. For such cases insert and measure a
// marker.
amd::Command* command = new amd::Marker(*event_->command().queue(), kMarkerDisableFlush);
// eventRecord is called on another stream. For such cases insert and measure a marker.
auto* command = new amd::Marker(*event_->command().queue(), kMarkerDisableFlush);
command->enqueue();
command->awaitCompletion();
ms = static_cast<float>(static_cast<int64_t>(command->event().profilingInfo().end_) -
time(false)) /
1000000.f;
time(false)) * kNsToMs;
command->release();
} else {
// Note: with direct dispatch eStop.ready() relies on HW event, but CPU status can be delayed.
// Hence for now make sure CPU status is updated by calling awaitCompletion();
awaitEventCompletion();
eStop.awaitEventCompletion();
ms = static_cast<float>(eStop.time(false) - time(false)) / 1000000.f;
ms = static_cast<float>(eStop.time(false) - time(false)) * kNsToMs;
}
return hipSuccess;
}
// ================================================================================================
int64_t Event::time(bool getStartTs) const {
assert(event_ != nullptr);
if (getStartTs) {
@@ -156,51 +150,51 @@ int64_t Event::time(bool getStartTs) const {
}
}
// ================================================================================================
int64_t EventDD::time(bool getStartTs) const {
uint64_t start = 0, end = 0;
assert(event_ != nullptr);
uint64_t start = 0, end = 0;
g_devices[deviceId()]->devices()[0]->getHwEventTime(*event_, &start, &end);
// FIXME: This is only needed if the command had to wait CL_COMPLETE status
if (start == 0 || end == 0) {
// Select the requested timestamp and fallback to CPU profiling if not available
const uint64_t timestamp = getStartTs ? start : end;
if (timestamp == 0) {
return Event::time(getStartTs);
}
if (getStartTs) {
return static_cast<int64_t>(start);
} else {
return static_cast<int64_t>(end);
}
return static_cast<int64_t>(timestamp);
}
// ================================================================================================
hipError_t Event::streamWaitCommand(amd::Command*& command, hip::Stream* stream) {
amd::Command::EventWaitList eventWaitList;
if (event_ != nullptr) {
eventWaitList.push_back(event_);
}
const amd::Command::EventWaitList eventWaitList =
(event_ != nullptr) ? amd::Command::EventWaitList{event_} : amd::Command::EventWaitList{};
command = new amd::Marker(*stream, kMarkerDisableFlush, eventWaitList);
if (command == nullptr) {
return hipErrorOutOfMemory;
}
// Since we only need to have a dependency on an existing event,
// we may not need to flush any caches.
command->setCommandEntryScope(amd::Device::kCacheStateIgnore);
if (command == NULL) {
return hipErrorOutOfMemory;
}
return hipSuccess;
}
// ================================================================================================
hipError_t Event::streamWait(hip::Stream* stream, uint flags) {
// Access to event_ object must be lock protected
amd::ScopedLock lock(lock_);
// Early return if event is not recorded, same stream, or already ready
if ((event_ == nullptr) || (event_->command().queue() == stream) || ready()) {
return hipSuccess;
}
if (!event_->notifyCmdQueue()) {
return hipErrorLaunchOutOfResources;
}
amd::Command* command;
hipError_t status = streamWaitCommand(command, stream);
if (status != hipSuccess) {
if (const auto status = streamWaitCommand(command, stream); status != hipSuccess) {
return status;
}
command->enqueue();
command->release();
return hipSuccess;
@@ -209,33 +203,38 @@ hipError_t Event::streamWait(hip::Stream* stream, uint flags) {
// ================================================================================================
hipError_t Event::recordCommand(amd::Command*& command, amd::HostQueue* stream, uint32_t ext_flags,
bool batch_flush) {
if (command == nullptr) {
int32_t releaseFlags =
((ext_flags == 0) ? flags_ : ext_flags) &
(hipEventReleaseToDevice | hipEventReleaseToSystem | hipEventDisableSystemFence);
if (releaseFlags & hipEventDisableSystemFence) {
releaseFlags = amd::Device::kCacheStateIgnore;
} else {
releaseFlags = amd::Device::kCacheStateInvalid;
}
// Always submit a EventMarker.
constexpr bool kMarkerTs = true;
command =
new hip::EventMarker(*stream, !kMarkerDisableFlush, kMarkerTs, releaseFlags, batch_flush);
if (command != nullptr) {
return hipSuccess;
}
const auto flags = (ext_flags == 0) ? flags_ : ext_flags;
const auto releaseFlags = [&]() {
if (flags & hipEventDisableSystemFence) {
return amd::Device::kCacheStateIgnore;
}
return amd::Device::kCacheStateInvalid;
}();
constexpr bool kMarkerTs = true;
constexpr bool kFlushCache = false;
command = new hip::EventMarker(*stream, kFlushCache, kMarkerTs, releaseFlags, batch_flush);
return hipSuccess;
}
// ================================================================================================
hipError_t Event::enqueueRecordCommand(hip::Stream* stream, amd::Command* command) {
command->enqueue();
if (event_ == &command->event()) {
amd::Event& new_event = command->event();
if (event_ == &new_event) {
return hipSuccess;
}
if (event_ != nullptr) {
event_->release();
}
event_ = &command->event();
event_ = &new_event;
return hipSuccess;
}
@@ -244,73 +243,69 @@ hipError_t Event::enqueueRecordCommand(hip::Stream* stream, amd::Command* comman
hipError_t Event::addMarker(hip::Stream* hip_stream, amd::Command* command, bool batch_flush) {
// Keep the lock always at the beginning of this to avoid a race. SWDEV-277847
amd::ScopedLock lock(lock_);
hipError_t status = recordCommand(command, hip_stream, 0, batch_flush);
if (status != hipSuccess) {
return hipSuccess;
if (const auto status = recordCommand(command, hip_stream, 0, batch_flush);
status != hipSuccess) {
return status;
}
status = enqueueRecordCommand(hip_stream, command);
return status;
return enqueueRecordCommand(hip_stream, command);
}
// ================================================================================================
bool isValid(hipEvent_t event) {
// NULL event is always valid
if (event == nullptr) {
return true;
}
std::shared_lock lock(eventSetLock);
if (eventSet.find(event) == eventSet.end()) {
return false;
}
return true;
return eventSet.find(event) != eventSet.end();
}
// ================================================================================================
hipError_t ihipEventCreateWithFlags(hipEvent_t* event, unsigned flags) {
unsigned supportedFlags = hipEventDefault | hipEventBlockingSync | hipEventDisableTiming |
hipEventReleaseToDevice | hipEventReleaseToSystem |
hipEventInterprocess | hipEventDisableSystemFence;
hipError_t ihipEventCreateWithFlags(hipEvent_t* event, uint32_t flags) {
// Define supported event flags
constexpr uint32_t kSupportedFlags = hipEventDefault | hipEventBlockingSync |
hipEventDisableTiming | hipEventReleaseToDevice |
hipEventReleaseToSystem | hipEventInterprocess |
hipEventDisableSystemFence;
constexpr uint32_t kReleaseFlags = (hipEventReleaseToDevice | hipEventReleaseToSystem |
hipEventDisableSystemFence);
const unsigned releaseFlags =
(hipEventReleaseToDevice | hipEventReleaseToSystem | hipEventDisableSystemFence);
// can't set any unsupported flags.
// can set only one of the release flags.
// if hipEventInterprocess flag is set, then hipEventDisableTiming flag also must be set
const bool illegalFlags = (flags & ~supportedFlags) || ([](unsigned int num) {
unsigned int bitcount;
for (bitcount = 0; num; bitcount++) {
num &= num - 1;
}
return bitcount;
}(flags & releaseFlags) > 1) ||
((flags & hipEventInterprocess) && !(flags & hipEventDisableTiming));
if (!illegalFlags) {
hip::Event* e = nullptr;
if (flags & hipEventInterprocess) {
e = new hip::IPCEvent();
} else {
if (AMD_DIRECT_DISPATCH) {
e = new hip::EventDD(flags);
} else {
e = new hip::Event(flags);
}
// Helper to count set bits for validating multiple release flags
constexpr auto countBits = [](uint32_t num) {
uint32_t count = 0;
while (num) {
num &= num - 1;
++count;
}
// App might have used combination of flags i.e. hipEventInterprocess|hipEventDisableTiming
// However based on hipEventInterprocess flag, IPCEvent creates even with
// JUST hipEventInterprocess and hence, Actual hipEventInterprocess|hipEventDisableTiming
// flag is getting supressed with hipEventInterprocess
e->flags_ = flags;
if (e == nullptr) {
return hipErrorOutOfMemory;
}
*event = reinterpret_cast<hipEvent_t>(e);
std::unique_lock lock(hip::eventSetLock);
hip::eventSet.insert(*event);
} else {
return count;
};
// Validate flags: no unsupported flags, max one release flag,
// interprocess requires disable timing
if ((flags & ~kSupportedFlags) || (countBits(flags & kReleaseFlags) > 1) ||
((flags & hipEventInterprocess) && !(flags & hipEventDisableTiming))) {
return hipErrorInvalidValue;
}
// Create appropriate event type based on flags
hip::Event* e = nullptr;
if (flags & hipEventInterprocess) {
e = new hip::IPCEvent(flags);
} else if (AMD_DIRECT_DISPATCH) {
e = new hip::EventDD(flags);
} else {
e = new hip::Event(flags);
}
if (e == nullptr) {
return hipErrorOutOfMemory;
}
*event = reinterpret_cast<hipEvent_t>(e);
// Register event in global set
std::unique_lock lock(hip::eventSetLock);
hip::eventSet.insert(*event);
return hipSuccess;
}
@@ -349,14 +344,13 @@ hipError_t hipEventDestroy(hipEvent_t event) {
return hipErrorContextIsDestroyed;
}
hip::Event* e = reinterpret_cast<hip::Event*>(event);
// There is a possibility that stream destroy be called first
hipStream_t s = e->GetCaptureStream();
if (hip::isValid(s)) {
if (s != nullptr && s != hipStreamLegacy) {
reinterpret_cast<hip::Stream*>(e->GetCaptureStream())->EraseCaptureEvent(event);
}
auto* e = reinterpret_cast<hip::Event*>(event);
// Handle capture stream cleanup (stream might be destroyed first)
hipStream_t stream = e->GetCaptureStream();
if (hip::isValid(stream) && stream != nullptr && stream != hipStreamLegacy) {
reinterpret_cast<hip::Stream*>(stream)->EraseCaptureEvent(event);
}
delete e;
HIP_RETURN(hipSuccess);
}
@@ -365,16 +359,16 @@ hipError_t hipEventDestroy(hipEvent_t event) {
hipError_t hipEventElapsedTime(float* ms, hipEvent_t start, hipEvent_t stop) {
HIP_INIT_API(hipEventElapsedTime, ms, start, stop);
// Validate parameters
if (ms == nullptr) {
HIP_RETURN(hipErrorInvalidValue);
}
if (start == nullptr || stop == nullptr) {
HIP_RETURN(hipErrorInvalidHandle);
}
hip::Event* eStart = reinterpret_cast<hip::Event*>(start);
hip::Event* eStop = reinterpret_cast<hip::Event*>(stop);
auto* const eStart = reinterpret_cast<hip::Event*>(start);
auto* const eStop = reinterpret_cast<hip::Event*>(stop);
if (eStart->deviceId() != eStop->deviceId()) {
HIP_RETURN(hipErrorInvalidResourceHandle);
@@ -384,52 +378,58 @@ hipError_t hipEventElapsedTime(float* ms, hipEvent_t start, hipEvent_t stop) {
}
// ================================================================================================
hipError_t hipEventRecord_common(hipEvent_t event, hipStream_t stream, unsigned int flags) {
if (!(flags == hipEventRecordDefault || flags == hipEventRecordExternal)) {
hipError_t hipEventRecord_common(hipEvent_t event, hipStream_t stream, uint32_t flags) {
// Validate flags and event
if (flags != hipEventRecordDefault && flags != hipEventRecordExternal) {
return hipErrorInvalidValue;
}
hipError_t status = hipSuccess;
if (event == nullptr) {
return hipErrorInvalidHandle;
}
getStreamPerThread(stream);
hip::Event* e = reinterpret_cast<hip::Event*>(event);
hip::Stream* s = reinterpret_cast<hip::Stream*>(stream);
hip::Stream* hip_stream = hip::getStream(stream);
if (hipStream_t lastCaptureStream = e->GetCaptureStream()) {
if (hip::isValid(lastCaptureStream)) {
if ((lastCaptureStream != nullptr) && (lastCaptureStream != hipStreamLegacy)) {
reinterpret_cast<hip::Stream*>(e->GetCaptureStream())->EraseCaptureEvent(event);
}
}
auto* const e = reinterpret_cast<hip::Event*>(event);
auto* const hip_stream = hip::getStream(stream);
if (hip_stream == nullptr) {
return hipErrorInvalidValue;
}
// Clean up previous capture stream association
hipStream_t lastCaptureStream = e->GetCaptureStream();
if (hip::isValid(lastCaptureStream) && lastCaptureStream != nullptr &&
lastCaptureStream != hipStreamLegacy) {
reinterpret_cast<hip::Stream*>(lastCaptureStream)->EraseCaptureEvent(event);
}
e->SetCaptureStream(stream);
if ((stream != nullptr && stream != hipStreamLegacy) &&
(s->GetCaptureStatus() == hipStreamCaptureStatusActive)) {
// Handle stream capture mode
if (stream != nullptr && stream != hipStreamLegacy &&
hip_stream->GetCaptureStatus() == hipStreamCaptureStatusActive) {
ClPrint(amd::LOG_INFO, amd::LOG_CODE,
"[hipGraph] Current capture node EventRecord on stream : %p, Event %p", stream, event);
s->SetCaptureEvent(event);
std::vector<hip::GraphNode*> lastCapturedNodes = s->GetLastCapturedNodes();
hip_stream->SetCaptureEvent(event);
const auto& lastCapturedNodes = hip_stream->GetLastCapturedNodes();
e->SetNodesPrevToRecorded(lastCapturedNodes);
if (flags == hipEventRecordExternal) {
hip::GraphNode* node = new hip::GraphEventRecordNode(reinterpret_cast<hipEvent_t>(e));
hipError_t status = hip::ihipGraphAddNode(
node, reinterpret_cast<hip::Graph*>(s->GetCaptureGraph()),
reinterpret_cast<hip::GraphNode* const*>(s->GetLastCapturedNodes().data()),
s->GetLastCapturedNodes().size(), false);
auto* const node = new hip::GraphEventRecordNode(event);
const auto status = hip::ihipGraphAddNode(node, hip_stream->GetCaptureGraph(),
lastCapturedNodes.data(),
lastCapturedNodes.size(), false);
if (status != hipSuccess) {
ClPrint(amd::LOG_ERROR, amd::LOG_CODE, "hipEventRecord add external event node failed");
return status;
}
s->SetLastCapturedNode(node);
hip_stream->SetLastCapturedNode(node);
}
} else {
if (e->deviceId() != hip_stream->DeviceId()) {
return hipErrorInvalidResourceHandle;
}
status = e->addMarker(hip_stream, nullptr, !hip::Event::kBatchFlush);
return hipSuccess;
}
return status;
// Normal event recording
if (e->deviceId() != hip_stream->DeviceId()) {
return hipErrorInvalidResourceHandle;
}
return e->addMarker(hip_stream, nullptr, !hip::Event::kBatchFlush);
}
// ================================================================================================
@@ -446,7 +446,7 @@ hipError_t hipEventRecord_spt(hipEvent_t event, hipStream_t stream) {
}
// ================================================================================================
hipError_t hipEventRecordWithFlags(hipEvent_t event, hipStream_t stream, unsigned int flags) {
hipError_t hipEventRecordWithFlags(hipEvent_t event, hipStream_t stream, uint32_t flags) {
HIP_INIT_API(hipEventRecordWithFlags, event, stream, flags);
HIP_RETURN(hipEventRecord_common(event, stream, flags));
}
@@ -458,19 +458,24 @@ hipError_t hipEventSynchronize(hipEvent_t event) {
if (event == nullptr) {
HIP_RETURN(hipErrorInvalidHandle);
}
hip::Event* e = reinterpret_cast<hip::Event*>(event);
auto hip_stream = e->GetCaptureStream();
hip::Stream* s = reinterpret_cast<hip::Stream*>(hip_stream);
if ((hip_stream != nullptr && hip_stream != hipStreamLegacy) &&
(s->GetCaptureStatus() == hipStreamCaptureStatusActive)) {
s->SetCaptureStatus(hipStreamCaptureStatusInvalidated);
HIP_RETURN(hipErrorCapturedEvent);
auto* e = reinterpret_cast<hip::Event*>(event);
const auto hip_stream = e->GetCaptureStream();
// Check for active capture
if (hip_stream != nullptr && hip_stream != hipStreamLegacy) {
auto* s = reinterpret_cast<hip::Stream*>(hip_stream);
if (s->GetCaptureStatus() == hipStreamCaptureStatusActive) {
s->SetCaptureStatus(hipStreamCaptureStatusInvalidated);
HIP_RETURN(hipErrorCapturedEvent);
}
}
if (hip::Stream::StreamCaptureOngoing(hip_stream) == true) {
if (hip::Stream::StreamCaptureOngoing(hip_stream)) {
HIP_RETURN(hipErrorStreamCaptureUnsupported);
}
hipError_t status = e->synchronize();
const auto status = e->synchronize();
// Release freed memory for all memory pools on the device
g_devices[e->deviceId()]->ReleaseFreedMemory();
HIP_RETURN(status);
@@ -482,20 +487,26 @@ hipError_t ihipEventQuery(hipEvent_t event) {
return hipErrorInvalidHandle;
}
hip::Event* e = reinterpret_cast<hip::Event*>(event);
auto hip_stream = e->GetCaptureStream();
hip::Stream* s = reinterpret_cast<hip::Stream*>(hip_stream);
if ((hip_stream != nullptr && hip_stream != hipStreamLegacy) &&
(s->GetCaptureStatus() == hipStreamCaptureStatusActive)) {
s->SetCaptureStatus(hipStreamCaptureStatusInvalidated);
HIP_RETURN(hipErrorCapturedEvent);
auto* e = reinterpret_cast<hip::Event*>(event);
const auto hip_stream = e->GetCaptureStream();
// Check for active capture
if (hip_stream != nullptr && hip_stream != hipStreamLegacy) {
auto* s = reinterpret_cast<hip::Stream*>(hip_stream);
if (s->GetCaptureStatus() == hipStreamCaptureStatusActive) {
s->SetCaptureStatus(hipStreamCaptureStatusInvalidated);
HIP_RETURN(hipErrorCapturedEvent);
}
}
if (hip::Stream::StreamCaptureOngoing(e->GetCaptureStream())) {
if (hip::Stream::StreamCaptureOngoing(hip_stream)) {
HIP_RETURN(hipErrorStreamCaptureUnsupported);
}
return e->query();
}
// ================================================================================================
hipError_t hipEventQuery(hipEvent_t event) {
HIP_INIT_API(hipEventQuery, event);
HIP_RETURN(ihipEventQuery(event));
+59 -49
Visa fil
@@ -32,55 +32,58 @@
namespace hip {
class StreamCallback {
protected:
void* userData_;
void* userData_; //!< User data passed to callback function
public:
StreamCallback(void* userData) : userData_(userData) {}
explicit StreamCallback(void* userData) : userData_(userData) {}
virtual void CL_CALLBACK callback() = 0;
virtual ~StreamCallback() {};
virtual ~StreamCallback() = default;
};
class StreamAddCallback : public StreamCallback {
hipStreamCallback_t callBack_;
hipStream_t stream_;
hipStreamCallback_t callBack_; //!< Stream callback function pointer
hipStream_t stream_; //!< Stream associated with the callback
public:
StreamAddCallback(hipStream_t stream, hipStreamCallback_t callback, void* userData)
: StreamCallback(userData) {
stream_ = stream;
callBack_ = callback;
}
: StreamCallback(userData), stream_(stream), callBack_(callback) {}
void CL_CALLBACK callback() {
void CL_CALLBACK callback() override {
hipError_t status = hipSuccess;
callBack_(stream_, status, userData_);
}
};
class LaunchHostFuncCallback : public StreamCallback {
hipHostFn_t callBack_;
hipHostFn_t callBack_; //!< Host function callback pointer
public:
LaunchHostFuncCallback(hipHostFn_t callback, void* userData) : StreamCallback(userData) {
callBack_ = callback;
}
LaunchHostFuncCallback(hipHostFn_t callback, void* userData)
: StreamCallback(userData), callBack_(callback) {}
void CL_CALLBACK callback() { callBack_(userData_); }
void CL_CALLBACK callback() override { callBack_(userData_); }
};
void CL_CALLBACK ihipStreamCallback(cl_event event, cl_int command_exec_status, void* user_data);
#define IPC_SIGNALS_PER_EVENT 32
// Optimized IPC event shared memory structure
// Note: All atomics use relaxed memory ordering where safe for performance
typedef struct ihipIpcEventShmem_s {
// Reference counting for shared memory lifecycle
std::atomic<int> owners;
// Metadata: only written once during initialization, relaxed ordering safe
std::atomic<int> owners_device_id;
std::atomic<int> owners_process_id;
// Ring buffer indices: requires acquire-release ordering for synchronization
std::atomic<int> read_index;
std::atomic<int> write_index;
uint32_t signal[IPC_SIGNALS_PER_EVENT];
// Signal array: GPU-accessible memory for event signaling
// Using uint32_t for GPU compatibility
alignas(64) uint32_t signal[IPC_SIGNALS_PER_EVENT];
} ihipIpcEventShmem_t;
class EventMarker : public amd::Marker {
@@ -97,24 +100,25 @@ class EventMarker : public amd::Marker {
};
class Event {
/// capture stream where event is recorded
/// Capture stream where event is recorded
hipStream_t captureStream_ = nullptr;
/// Previous captured nodes before event record
std::vector<hip::GraphNode*> nodesPrevToRecorded_;
protected:
bool CheckHwEvent() {
amd::SyncPolicy policy =
(flags_ == hipEventBlockingSync) ? amd::SyncPolicy::Blocking : amd::SyncPolicy::Auto;
const amd::SyncPolicy policy =
(flags_ == hipEventBlockingSync) ? amd::SyncPolicy::Blocking : amd::SyncPolicy::Auto;
return g_devices[deviceId()]->devices()[0]->IsHwEventReady(*event_, false, policy);
}
public:
constexpr static bool kBatchFlush = true; //!< Flushes CPU command batch in direct dispatch mode
// Flushes CPU command batch in direct dispatch mode
static constexpr bool kBatchFlush = true;
Event(uint32_t flags)
: flags_(flags), lock_(true) /* hipEvent_t lock*/, event_(nullptr), stream_(nullptr) {
device_id_ = hip::getCurrentDevice()->deviceId(); // Created in current device ctx
explicit Event(uint32_t flags)
: flags_(flags), lock_(true), event_(nullptr) {
device_id_ = hip::getCurrentDevice()->deviceId();
}
virtual ~Event() {
@@ -122,7 +126,6 @@ class Event {
event_->release();
}
}
uint32_t flags_; //!< flags associated with the event
virtual hipError_t query();
virtual hipError_t synchronize();
@@ -136,6 +139,8 @@ class Event {
virtual hipError_t enqueueRecordCommand(hip::Stream* stream, amd::Command* command);
hipError_t addMarker(hip::Stream* stream, amd::Command* command, bool batch_flush = true);
uint32_t flags() const { return flags_; }
void BindCommand(amd::Command& command) {
amd::ScopedLock lock(lock_);
if (event_ != nullptr) {
@@ -146,17 +151,20 @@ class Event {
}
amd::Monitor& lock() { return lock_; }
const int deviceId() const { return device_id_; }
int deviceId() const { return device_id_; }
void setDeviceId(int id) { device_id_ = id; }
amd::Event* event() { return event_; }
/// Get capture stream where event is recorded
hipStream_t GetCaptureStream() const { return captureStream_; }
/// Set capture stream where event is recorded
void SetCaptureStream(hipStream_t stream) { captureStream_ = stream; }
/// Returns previous captured nodes before event record
std::vector<hip::GraphNode*> GetNodesPrevToRecorded() const { return nodesPrevToRecorded_; }
const std::vector<hip::GraphNode*>& GetNodesPrevToRecorded() const {
return nodesPrevToRecorded_;
}
/// Set last captured graph node before event record
void SetNodesPrevToRecorded(std::vector<hip::GraphNode*>& graphNode) {
void SetNodesPrevToRecorded(const std::vector<hip::GraphNode*>& graphNode) {
nodesPrevToRecorded_ = graphNode;
}
virtual hipError_t GetHandle(ihipIpcEventHandle_t* handle) {
@@ -170,35 +178,38 @@ class Event {
virtual int64_t time(bool getStartTs) const;
protected:
amd::Monitor lock_;
hip::Stream* stream_;
amd::Event* event_;
int device_id_;
uint32_t flags_; //!< Flags associated with the event
amd::Monitor lock_; //!< Mutex for thread-safe access to event state
amd::Event* event_; //!< Underlying ROCclr event object for GPU synchronization
int device_id_; //!< Device ID where this event was created
};
class EventDD : public Event {
public:
EventDD(unsigned int flags) : Event(flags) {}
virtual ~EventDD() {}
explicit EventDD(uint32_t flags) : Event(flags) {}
~EventDD() override = default;
virtual bool awaitEventCompletion();
virtual bool ready();
virtual int64_t time(bool getStartTs) const;
bool awaitEventCompletion() override;
bool ready() override;
int64_t time(bool getStartTs) const override;
};
class IPCEvent : public Event {
// IPC Events
/// IPC event metadata structure
struct ihipIpcEvent_t {
std::string ipc_name_;
int ipc_fd_;
ihipIpcEventShmem_t* ipc_shmem_;
ihipIpcEvent_t() : ipc_name_("dummy"), ipc_fd_(0), ipc_shmem_(nullptr) {}
void setipcname(const char* name) { ipc_name_ = std::string(name); }
std::string ipc_name_; //!< Name of the shared memory object for IPC
ihipIpcEventShmem_t* ipc_shmem_; //!< Pointer to mapped IPC shared memory structure
ihipIpcEvent_t() : ipc_shmem_(nullptr) {
ipc_name_.reserve(32); // Reserve space for typical IPC name "/hip_<pid>_<counter>"
}
void setipcname(const char* name) { ipc_name_ = name; }
};
ihipIpcEvent_t ipc_evt_;
public:
~IPCEvent() {
explicit IPCEvent(uint32_t flags = hipEventInterprocess) : Event(flags) {}
~IPCEvent() override {
if (ipc_evt_.ipc_shmem_) {
int owners = --ipc_evt_.ipc_shmem_->owners;
// Make sure event is synchronized
@@ -213,12 +224,11 @@ class IPCEvent : public Event {
}
#if !defined(_MSC_VER)
// Clean up the POSIX shared memory object
if (!ipc_evt_.ipc_name_.empty() && ipc_evt_.ipc_name_ != "dummy") {
if (!ipc_evt_.ipc_name_.empty()) {
shm_unlink(ipc_evt_.ipc_name_.c_str());
}
#endif
}
IPCEvent() : Event(hipEventInterprocess) {}
bool createIpcEventShmemIfNeeded();
hipError_t GetHandle(ihipIpcEventHandle_t* handle) override;
hipError_t OpenHandle(ihipIpcEventHandle_t* handle) override;
@@ -232,10 +242,10 @@ class IPCEvent : public Event {
hipError_t enqueueRecordCommand(hip::Stream* stream, amd::Command* command) override;
};
/// Callback data for IPC event stream wait operations
struct CallbackData {
int previous_read_index;
hip::ihipIpcEventShmem_t* shmem;
const int previous_read_index; //!< Snapshot of read index for synchronization
hip::ihipIpcEventShmem_t* const shmem; //!< IPC shared memory for event signaling
};
} // namespace hip
+72 -59
Visa fil
@@ -32,12 +32,14 @@ namespace hip {
hipError_t ihipEventCreateWithFlags(hipEvent_t* event, unsigned flags);
// ================================================================================================
bool IPCEvent::createIpcEventShmemIfNeeded() {
// Early return if shared memory already exists
if (ipc_evt_.ipc_shmem_) {
// ipc_shmem_ already created, no need to create it again
return true;
}
// Generate unique IPC name
#if !defined(_MSC_VER)
static std::atomic<int> counter{0};
ipc_evt_.ipc_name_ = "/hip_" + std::to_string(getpid()) + "_" + std::to_string(counter++);
@@ -48,34 +50,33 @@ bool IPCEvent::createIpcEventShmemIfNeeded() {
ipc_evt_.ipc_name_.replace(0, 5, "/hip_");
#endif
if (!amd::Os::MemoryMapFileTruncated(
ipc_evt_.ipc_name_.c_str(),
const_cast<const void**>(reinterpret_cast<void**>(&(ipc_evt_.ipc_shmem_))),
sizeof(hip::ihipIpcEventShmem_t))) {
// Create memory-mapped file for shared memory
auto** shmem_ptr = reinterpret_cast<void**>(&ipc_evt_.ipc_shmem_);
if (!amd::Os::MemoryMapFileTruncated(ipc_evt_.ipc_name_.c_str(),
const_cast<const void**>(shmem_ptr),
sizeof(hip::ihipIpcEventShmem_t))) {
return false;
}
ipc_evt_.ipc_shmem_->owners = 1;
ipc_evt_.ipc_shmem_->read_index = -1;
ipc_evt_.ipc_shmem_->write_index = 0;
for (uint32_t sig_idx = 0; sig_idx < IPC_SIGNALS_PER_EVENT; ++sig_idx) {
ipc_evt_.ipc_shmem_->signal[sig_idx] = 0;
}
// Initialize shared memory fields
auto* const shmem = ipc_evt_.ipc_shmem_;
shmem->owners = 1;
shmem->read_index = -1;
shmem->write_index = 0;
std::fill_n(shmem->signal, IPC_SIGNALS_PER_EVENT, 0);
// device sets 0 to this ptr when the ipc event is completed
hipError_t status =
ihipHostRegister(&ipc_evt_.ipc_shmem_->signal, sizeof(uint32_t) * IPC_SIGNALS_PER_EVENT, 0);
if (status != hipSuccess) {
return false;
}
return true;
// Register signal array with device
constexpr size_t kSignalArraySize = sizeof(uint32_t) * IPC_SIGNALS_PER_EVENT;
const auto status = ihipHostRegister(&shmem->signal, kSignalArraySize, 0);
return status == hipSuccess;
}
// ================================================================================================
hipError_t IPCEvent::query() {
if (ipc_evt_.ipc_shmem_) {
int prev_read_idx = ipc_evt_.ipc_shmem_->read_index;
int offset = (prev_read_idx % IPC_SIGNALS_PER_EVENT);
const int prev_read_idx = ipc_evt_.ipc_shmem_->read_index;
const int offset = prev_read_idx % IPC_SIGNALS_PER_EVENT;
if (ipc_evt_.ipc_shmem_->read_index < prev_read_idx + IPC_SIGNALS_PER_EVENT &&
ipc_evt_.ipc_shmem_->signal[offset] != 0) {
return hipErrorNotReady;
@@ -101,11 +102,12 @@ hipError_t IPCEvent::synchronize() {
// ================================================================================================
hipError_t IPCEvent::streamWait(hip::Stream* stream, uint flags) {
int offset = ipc_evt_.ipc_shmem_->read_index;
hipError_t status =
ihipStreamOperation(reinterpret_cast<hipStream_t>(stream), ROCCLR_COMMAND_STREAM_WAIT_VALUE,
&(ipc_evt_.ipc_shmem_->signal[offset]), 0, 1, 1, sizeof(uint32_t));
return status;
const int offset = ipc_evt_.ipc_shmem_->read_index;
return ihipStreamOperation(
reinterpret_cast<hipStream_t>(stream),
ROCCLR_COMMAND_STREAM_WAIT_VALUE,
&(ipc_evt_.ipc_shmem_->signal[offset]), 0, 1, 1,
sizeof(uint32_t));
}
// ================================================================================================
@@ -117,37 +119,41 @@ hipError_t IPCEvent::recordCommand(amd::Command*& command, amd::HostQueue* strea
// ================================================================================================
hipError_t IPCEvent::enqueueRecordCommand(hip::Stream* stream, amd::Command* command) {
amd::Event& tEvent = command->event();
createIpcEventShmemIfNeeded();
int write_index = ipc_evt_.ipc_shmem_->write_index++;
int offset = write_index % IPC_SIGNALS_PER_EVENT;
while (ipc_evt_.ipc_shmem_->signal[offset] != 0) {
// Allocate signal slot for this event
auto* const shmem = ipc_evt_.ipc_shmem_;
const int write_index = shmem->write_index++;
const int offset = write_index % IPC_SIGNALS_PER_EVENT;
auto& signal = shmem->signal[offset];
// Wait for signal slot to become available
while (signal != 0) {
amd::Os::sleep(1);
}
// Lock signal.
ipc_evt_.ipc_shmem_->signal[offset] = 1;
ipc_evt_.ipc_shmem_->owners_device_id = deviceId();
// Lock signal and set device ID
signal = 1;
shmem->owners_device_id = deviceId();
command->enqueue();
// Set event_ in order to release marked command when event is destroyed
// Set event_ to release marked command when event is destroyed
if (event_ != nullptr) {
event_->release();
}
event_ = &command->event();
// device writes 0 to signal after the hipEventRecord command is completed
// the signal value is checked by WaitThenDecrementSignal cb
hipError_t status =
ihipStreamOperation(reinterpret_cast<hipStream_t>(stream), ROCCLR_COMMAND_STREAM_WRITE_VALUE,
&(ipc_evt_.ipc_shmem_->signal[offset]), 0, 0, 0, sizeof(uint32_t));
// Device writes 0 to signal after hipEventRecord command completes
const auto status = ihipStreamOperation(reinterpret_cast<hipStream_t>(stream),
ROCCLR_COMMAND_STREAM_WRITE_VALUE, &signal, 0, 0, 0,
sizeof(uint32_t));
if (status != hipSuccess) {
return status;
}
// Update read index to indicate new signal.
// Update read index to indicate new signal
int expected = write_index - 1;
while (!ipc_evt_.ipc_shmem_->read_index.compare_exchange_weak(expected, write_index)) {
while (!shmem->read_index.compare_exchange_weak(expected, write_index)) {
amd::Os::sleep(1);
}
@@ -169,23 +175,28 @@ hipError_t IPCEvent::GetHandle(ihipIpcEventHandle_t* handle) {
// ================================================================================================
hipError_t IPCEvent::OpenHandle(ihipIpcEventHandle_t* handle) {
ipc_evt_.ipc_name_ = handle->shmem_name;
// Map shared memory from IPC handle
auto** shmem_ptr = reinterpret_cast<void**>(&ipc_evt_.ipc_shmem_);
if (!amd::Os::MemoryMapFileTruncated(ipc_evt_.ipc_name_.c_str(),
(const void**)&(ipc_evt_.ipc_shmem_),
const_cast<const void**>(shmem_ptr),
sizeof(ihipIpcEventShmem_t))) {
return hipErrorInvalidValue;
}
if (amd::Os::getProcessId() == ipc_evt_.ipc_shmem_->owners_process_id.load()) {
// If this is in the same process, return error.
auto* const shmem = ipc_evt_.ipc_shmem_;
// Prevent opening in the same process
const auto current_process_id = amd::Os::getProcessId();
if (current_process_id == shmem->owners_process_id.load()) {
return hipErrorInvalidContext;
}
ipc_evt_.ipc_shmem_->owners += 1;
// device sets 0 to this ptr when the ipc event is completed
hipError_t status = hipSuccess;
status =
ihipHostRegister(&ipc_evt_.ipc_shmem_->signal, sizeof(uint32_t) * IPC_SIGNALS_PER_EVENT, 0);
return status;
shmem->owners += 1;
// Register signal array with device
constexpr size_t kSignalArraySize = sizeof(uint32_t) * IPC_SIGNALS_PER_EVENT;
return ihipHostRegister(&shmem->signal, kSignalArraySize, 0);
}
// ================================================================================================
@@ -195,31 +206,33 @@ hipError_t hipIpcGetEventHandle(hipIpcEventHandle_t* handle, hipEvent_t event) {
if (handle == nullptr || event == nullptr) {
HIP_RETURN(hipErrorInvalidValue);
}
hip::Event* e = reinterpret_cast<hip::Event*>(event);
auto e = reinterpret_cast<hip::Event*>(event);
HIP_RETURN(e->GetHandle(reinterpret_cast<ihipIpcEventHandle_t*>(handle)));
}
// ================================================================================================
hipError_t hipIpcOpenEventHandle(hipEvent_t* event, hipIpcEventHandle_t handle) {
HIP_INIT_API(hipIpcOpenEventHandle, event, handle);
hipError_t status = hipSuccess;
if (event == nullptr) {
HIP_RETURN(hipErrorInvalidValue);
}
status = ihipEventCreateWithFlags(event, hipEventDisableTiming | hipEventInterprocess);
// Create IPC event with timing disabled
constexpr uint32_t kIpcEventFlags = hipEventDisableTiming | hipEventInterprocess;
auto status = ihipEventCreateWithFlags(event, kIpcEventFlags);
if (status != hipSuccess) {
HIP_RETURN(status);
}
hip::Event* e = reinterpret_cast<hip::Event*>(*event);
ihipIpcEventHandle_t* iHandle = reinterpret_cast<ihipIpcEventHandle_t*>(&handle);
auto* const e = reinterpret_cast<hip::Event*>(*event);
auto* const iHandle = reinterpret_cast<ihipIpcEventHandle_t*>(&handle);
status = e->OpenHandle(iHandle);
// Free the event in case of failure
if (status != hipSuccess) {
const auto open_status = e->OpenHandle(iHandle);
if (open_status != hipSuccess) {
delete e;
}
HIP_RETURN(status);
HIP_RETURN(open_status);
}
} // namespace hip
+1 -1
Visa fil
@@ -509,7 +509,7 @@ hipError_t ihipModuleLaunchKernel(hipFunction_t f, amd::LaunchParams& launch_par
if (stopEvent != nullptr) {
hip::Event* eStop = reinterpret_cast<hip::Event*>(stopEvent);
if (eStop->flags_ & hipEventDisableSystemFence) {
if (eStop->flags() & hipEventDisableSystemFence) {
command->setCommandEntryScope(amd::Device::kCacheStateIgnore);
} else {
command->setCommandEntryScope(amd::Device::kCacheStateSystem);