diff --git a/projects/clr/hipamd/src/hip_event.cpp b/projects/clr/hipamd/src/hip_event.cpp index 3a97124899..687e487420 100644 --- a/projects/clr/hipamd/src/hip_event.cpp +++ b/projects/clr/hipamd/src/hip_event.cpp @@ -32,28 +32,24 @@ namespace hip { static std::shared_mutex eventSetLock{}; static std::unordered_set eventSet; +// ================================================================================================ bool Event::ready() { - if (event_->status() != CL_COMPLETE) { - event_->notifyCmdQueue(); - } // Check HW status of the ROCcrl event. Note: not all ROCclr modes support HW status - bool ready = CheckHwEvent(); - if (!ready) { - ready = (event_->status() == CL_COMPLETE); + if (CheckHwEvent() || event_->status() == CL_COMPLETE) { + return true; } - return ready; + + event_->notifyCmdQueue(); + return false; } +// ================================================================================================ bool EventDD::ready() { // Check HW status of the ROCcrl event. Note: not all ROCclr modes support HW status - bool ready = CheckHwEvent(); - // FIXME: Remove status check entirely - if (!ready) { - ready = (event_->status() == CL_COMPLETE); - } - return ready; + return CheckHwEvent() || (event_->status() == CL_COMPLETE); } +// ================================================================================================ hipError_t Event::query() { amd::ScopedLock lock(lock_); @@ -74,12 +70,13 @@ hipError_t Event::synchronize() { return hipSuccess; } - auto hip_device = g_devices[deviceId()]; // Check HW status of the ROCcrl event. Note: not all ROCclr modes support HW status - static constexpr bool kWaitCompletion = true; - amd::SyncPolicy policy = + constexpr bool kWaitCompletion = true; + const amd::SyncPolicy policy = (flags_ == hipEventBlockingSync) ? amd::SyncPolicy::Blocking : amd::SyncPolicy::Auto; - if (!hip_device->devices()[0]->IsHwEventReady(*event_, kWaitCompletion, policy)) { + + const amd::Device* device = g_devices[deviceId()]->devices()[0]; + if (!device->IsHwEventReady(*event_, kWaitCompletion, policy)) { event_->awaitCompletion(); } return hipSuccess; @@ -88,65 +85,62 @@ hipError_t Event::synchronize() { // ================================================================================================ bool Event::awaitEventCompletion() { return event_->awaitCompletion(); } +// ================================================================================================ bool EventDD::awaitEventCompletion() { - amd::SyncPolicy policy = + constexpr bool kWaitCompletion = true; + const amd::SyncPolicy policy = (flags_ == hipEventBlockingSync) ? amd::SyncPolicy::Blocking : amd::SyncPolicy::Auto; - return g_devices[deviceId()]->devices()[0]->IsHwEventReady(*event_, true, policy); + return g_devices[deviceId()]->devices()[0]->IsHwEventReady(*event_, kWaitCompletion, policy); } +// ================================================================================================ hipError_t Event::elapsedTime(Event& eStop, float& ms) { amd::ScopedLock startLock(lock_); + + // Handle same event case if (this == &eStop) { + if (event_ == nullptr || (flags_ & hipEventDisableTiming)) { + return hipErrorInvalidHandle; + } ms = 0.f; - if (event_ == nullptr) { - return hipErrorInvalidHandle; - } - - if (flags_ & hipEventDisableTiming) { - return hipErrorInvalidHandle; - } - - if (!ready()) { - return hipErrorNotReady; - } - - return hipSuccess; + return ready() ? hipSuccess : hipErrorNotReady; } + amd::ScopedLock stopLock(eStop.lock()); + // Validate events if (event_ == nullptr || eStop.event() == nullptr) { return hipErrorInvalidHandle; } - if ((flags_ | eStop.flags_) & hipEventDisableTiming) { return hipErrorInvalidHandle; } - if (!ready() || !eStop.ready()) { return hipErrorNotReady; } + constexpr float kNsToMs = 1.0f / 1000000.0f; + if (event_ == eStop.event_) { // Events are the same, which indicates the stream is empty and likely - // eventRecord is called on another stream. For such cases insert and measure a - // marker. - amd::Command* command = new amd::Marker(*event_->command().queue(), kMarkerDisableFlush); + // eventRecord is called on another stream. For such cases insert and measure a marker. + auto* command = new amd::Marker(*event_->command().queue(), kMarkerDisableFlush); command->enqueue(); command->awaitCompletion(); ms = static_cast(static_cast(command->event().profilingInfo().end_) - - time(false)) / - 1000000.f; + time(false)) * kNsToMs; command->release(); } else { // Note: with direct dispatch eStop.ready() relies on HW event, but CPU status can be delayed. // Hence for now make sure CPU status is updated by calling awaitCompletion(); awaitEventCompletion(); eStop.awaitEventCompletion(); - ms = static_cast(eStop.time(false) - time(false)) / 1000000.f; + ms = static_cast(eStop.time(false) - time(false)) * kNsToMs; } return hipSuccess; } +// ================================================================================================ int64_t Event::time(bool getStartTs) const { assert(event_ != nullptr); if (getStartTs) { @@ -156,51 +150,51 @@ int64_t Event::time(bool getStartTs) const { } } +// ================================================================================================ int64_t EventDD::time(bool getStartTs) const { - uint64_t start = 0, end = 0; assert(event_ != nullptr); + uint64_t start = 0, end = 0; g_devices[deviceId()]->devices()[0]->getHwEventTime(*event_, &start, &end); - // FIXME: This is only needed if the command had to wait CL_COMPLETE status - if (start == 0 || end == 0) { + + // Select the requested timestamp and fallback to CPU profiling if not available + const uint64_t timestamp = getStartTs ? start : end; + if (timestamp == 0) { return Event::time(getStartTs); } - if (getStartTs) { - return static_cast(start); - } else { - return static_cast(end); - } + return static_cast(timestamp); } // ================================================================================================ hipError_t Event::streamWaitCommand(amd::Command*& command, hip::Stream* stream) { - amd::Command::EventWaitList eventWaitList; - if (event_ != nullptr) { - eventWaitList.push_back(event_); - } + const amd::Command::EventWaitList eventWaitList = + (event_ != nullptr) ? amd::Command::EventWaitList{event_} : amd::Command::EventWaitList{}; + command = new amd::Marker(*stream, kMarkerDisableFlush, eventWaitList); + if (command == nullptr) { + return hipErrorOutOfMemory; + } // Since we only need to have a dependency on an existing event, // we may not need to flush any caches. command->setCommandEntryScope(amd::Device::kCacheStateIgnore); - - if (command == NULL) { - return hipErrorOutOfMemory; - } return hipSuccess; } // ================================================================================================ hipError_t Event::streamWait(hip::Stream* stream, uint flags) { - // Access to event_ object must be lock protected amd::ScopedLock lock(lock_); + + // Early return if event is not recorded, same stream, or already ready if ((event_ == nullptr) || (event_->command().queue() == stream) || ready()) { return hipSuccess; } + if (!event_->notifyCmdQueue()) { return hipErrorLaunchOutOfResources; } + amd::Command* command; - hipError_t status = streamWaitCommand(command, stream); - if (status != hipSuccess) { + if (const auto status = streamWaitCommand(command, stream); status != hipSuccess) { return status; } + command->enqueue(); command->release(); return hipSuccess; @@ -209,33 +203,38 @@ hipError_t Event::streamWait(hip::Stream* stream, uint flags) { // ================================================================================================ hipError_t Event::recordCommand(amd::Command*& command, amd::HostQueue* stream, uint32_t ext_flags, bool batch_flush) { - if (command == nullptr) { - int32_t releaseFlags = - ((ext_flags == 0) ? flags_ : ext_flags) & - (hipEventReleaseToDevice | hipEventReleaseToSystem | hipEventDisableSystemFence); - if (releaseFlags & hipEventDisableSystemFence) { - releaseFlags = amd::Device::kCacheStateIgnore; - } else { - releaseFlags = amd::Device::kCacheStateInvalid; - } - // Always submit a EventMarker. - constexpr bool kMarkerTs = true; - command = - new hip::EventMarker(*stream, !kMarkerDisableFlush, kMarkerTs, releaseFlags, batch_flush); + if (command != nullptr) { + return hipSuccess; } + + const auto flags = (ext_flags == 0) ? flags_ : ext_flags; + + const auto releaseFlags = [&]() { + if (flags & hipEventDisableSystemFence) { + return amd::Device::kCacheStateIgnore; + } + return amd::Device::kCacheStateInvalid; + }(); + + constexpr bool kMarkerTs = true; + constexpr bool kFlushCache = false; + command = new hip::EventMarker(*stream, kFlushCache, kMarkerTs, releaseFlags, batch_flush); return hipSuccess; } // ================================================================================================ hipError_t Event::enqueueRecordCommand(hip::Stream* stream, amd::Command* command) { command->enqueue(); - if (event_ == &command->event()) { + + amd::Event& new_event = command->event(); + if (event_ == &new_event) { return hipSuccess; } + if (event_ != nullptr) { event_->release(); } - event_ = &command->event(); + event_ = &new_event; return hipSuccess; } @@ -244,73 +243,69 @@ hipError_t Event::enqueueRecordCommand(hip::Stream* stream, amd::Command* comman hipError_t Event::addMarker(hip::Stream* hip_stream, amd::Command* command, bool batch_flush) { // Keep the lock always at the beginning of this to avoid a race. SWDEV-277847 amd::ScopedLock lock(lock_); - hipError_t status = recordCommand(command, hip_stream, 0, batch_flush); - if (status != hipSuccess) { - return hipSuccess; + if (const auto status = recordCommand(command, hip_stream, 0, batch_flush); + status != hipSuccess) { + return status; } - status = enqueueRecordCommand(hip_stream, command); - return status; + return enqueueRecordCommand(hip_stream, command); } // ================================================================================================ bool isValid(hipEvent_t event) { - // NULL event is always valid if (event == nullptr) { return true; } std::shared_lock lock(eventSetLock); - if (eventSet.find(event) == eventSet.end()) { - return false; - } - - return true; + return eventSet.find(event) != eventSet.end(); } // ================================================================================================ -hipError_t ihipEventCreateWithFlags(hipEvent_t* event, unsigned flags) { - unsigned supportedFlags = hipEventDefault | hipEventBlockingSync | hipEventDisableTiming | - hipEventReleaseToDevice | hipEventReleaseToSystem | - hipEventInterprocess | hipEventDisableSystemFence; +hipError_t ihipEventCreateWithFlags(hipEvent_t* event, uint32_t flags) { + // Define supported event flags + constexpr uint32_t kSupportedFlags = hipEventDefault | hipEventBlockingSync | + hipEventDisableTiming | hipEventReleaseToDevice | + hipEventReleaseToSystem | hipEventInterprocess | + hipEventDisableSystemFence; + constexpr uint32_t kReleaseFlags = (hipEventReleaseToDevice | hipEventReleaseToSystem | + hipEventDisableSystemFence); - const unsigned releaseFlags = - (hipEventReleaseToDevice | hipEventReleaseToSystem | hipEventDisableSystemFence); - // can't set any unsupported flags. - // can set only one of the release flags. - // if hipEventInterprocess flag is set, then hipEventDisableTiming flag also must be set - const bool illegalFlags = (flags & ~supportedFlags) || ([](unsigned int num) { - unsigned int bitcount; - for (bitcount = 0; num; bitcount++) { - num &= num - 1; - } - return bitcount; - }(flags & releaseFlags) > 1) || - ((flags & hipEventInterprocess) && !(flags & hipEventDisableTiming)); - if (!illegalFlags) { - hip::Event* e = nullptr; - if (flags & hipEventInterprocess) { - e = new hip::IPCEvent(); - } else { - if (AMD_DIRECT_DISPATCH) { - e = new hip::EventDD(flags); - } else { - e = new hip::Event(flags); - } + // Helper to count set bits for validating multiple release flags + constexpr auto countBits = [](uint32_t num) { + uint32_t count = 0; + while (num) { + num &= num - 1; + ++count; } - // App might have used combination of flags i.e. hipEventInterprocess|hipEventDisableTiming - // However based on hipEventInterprocess flag, IPCEvent creates even with - // JUST hipEventInterprocess and hence, Actual hipEventInterprocess|hipEventDisableTiming - // flag is getting supressed with hipEventInterprocess - e->flags_ = flags; - if (e == nullptr) { - return hipErrorOutOfMemory; - } - *event = reinterpret_cast(e); - std::unique_lock lock(hip::eventSetLock); - hip::eventSet.insert(*event); - } else { + return count; + }; + + // Validate flags: no unsupported flags, max one release flag, + // interprocess requires disable timing + if ((flags & ~kSupportedFlags) || (countBits(flags & kReleaseFlags) > 1) || + ((flags & hipEventInterprocess) && !(flags & hipEventDisableTiming))) { return hipErrorInvalidValue; } + + // Create appropriate event type based on flags + hip::Event* e = nullptr; + if (flags & hipEventInterprocess) { + e = new hip::IPCEvent(flags); + } else if (AMD_DIRECT_DISPATCH) { + e = new hip::EventDD(flags); + } else { + e = new hip::Event(flags); + } + + if (e == nullptr) { + return hipErrorOutOfMemory; + } + *event = reinterpret_cast(e); + + // Register event in global set + std::unique_lock lock(hip::eventSetLock); + hip::eventSet.insert(*event); + return hipSuccess; } @@ -349,14 +344,13 @@ hipError_t hipEventDestroy(hipEvent_t event) { return hipErrorContextIsDestroyed; } - hip::Event* e = reinterpret_cast(event); - // There is a possibility that stream destroy be called first - hipStream_t s = e->GetCaptureStream(); - if (hip::isValid(s)) { - if (s != nullptr && s != hipStreamLegacy) { - reinterpret_cast(e->GetCaptureStream())->EraseCaptureEvent(event); - } + auto* e = reinterpret_cast(event); + // Handle capture stream cleanup (stream might be destroyed first) + hipStream_t stream = e->GetCaptureStream(); + if (hip::isValid(stream) && stream != nullptr && stream != hipStreamLegacy) { + reinterpret_cast(stream)->EraseCaptureEvent(event); } + delete e; HIP_RETURN(hipSuccess); } @@ -365,16 +359,16 @@ hipError_t hipEventDestroy(hipEvent_t event) { hipError_t hipEventElapsedTime(float* ms, hipEvent_t start, hipEvent_t stop) { HIP_INIT_API(hipEventElapsedTime, ms, start, stop); + // Validate parameters if (ms == nullptr) { HIP_RETURN(hipErrorInvalidValue); } - if (start == nullptr || stop == nullptr) { HIP_RETURN(hipErrorInvalidHandle); } - hip::Event* eStart = reinterpret_cast(start); - hip::Event* eStop = reinterpret_cast(stop); + auto* const eStart = reinterpret_cast(start); + auto* const eStop = reinterpret_cast(stop); if (eStart->deviceId() != eStop->deviceId()) { HIP_RETURN(hipErrorInvalidResourceHandle); @@ -384,52 +378,58 @@ hipError_t hipEventElapsedTime(float* ms, hipEvent_t start, hipEvent_t stop) { } // ================================================================================================ -hipError_t hipEventRecord_common(hipEvent_t event, hipStream_t stream, unsigned int flags) { - if (!(flags == hipEventRecordDefault || flags == hipEventRecordExternal)) { +hipError_t hipEventRecord_common(hipEvent_t event, hipStream_t stream, uint32_t flags) { + // Validate flags and event + if (flags != hipEventRecordDefault && flags != hipEventRecordExternal) { return hipErrorInvalidValue; } - hipError_t status = hipSuccess; if (event == nullptr) { return hipErrorInvalidHandle; } + getStreamPerThread(stream); - hip::Event* e = reinterpret_cast(event); - hip::Stream* s = reinterpret_cast(stream); - hip::Stream* hip_stream = hip::getStream(stream); - if (hipStream_t lastCaptureStream = e->GetCaptureStream()) { - if (hip::isValid(lastCaptureStream)) { - if ((lastCaptureStream != nullptr) && (lastCaptureStream != hipStreamLegacy)) { - reinterpret_cast(e->GetCaptureStream())->EraseCaptureEvent(event); - } - } + auto* const e = reinterpret_cast(event); + auto* const hip_stream = hip::getStream(stream); + if (hip_stream == nullptr) { + return hipErrorInvalidValue; + } + + // Clean up previous capture stream association + hipStream_t lastCaptureStream = e->GetCaptureStream(); + if (hip::isValid(lastCaptureStream) && lastCaptureStream != nullptr && + lastCaptureStream != hipStreamLegacy) { + reinterpret_cast(lastCaptureStream)->EraseCaptureEvent(event); } e->SetCaptureStream(stream); - if ((stream != nullptr && stream != hipStreamLegacy) && - (s->GetCaptureStatus() == hipStreamCaptureStatusActive)) { + + // Handle stream capture mode + if (stream != nullptr && stream != hipStreamLegacy && + hip_stream->GetCaptureStatus() == hipStreamCaptureStatusActive) { ClPrint(amd::LOG_INFO, amd::LOG_CODE, "[hipGraph] Current capture node EventRecord on stream : %p, Event %p", stream, event); - s->SetCaptureEvent(event); - std::vector lastCapturedNodes = s->GetLastCapturedNodes(); + hip_stream->SetCaptureEvent(event); + const auto& lastCapturedNodes = hip_stream->GetLastCapturedNodes(); e->SetNodesPrevToRecorded(lastCapturedNodes); + if (flags == hipEventRecordExternal) { - hip::GraphNode* node = new hip::GraphEventRecordNode(reinterpret_cast(e)); - hipError_t status = hip::ihipGraphAddNode( - node, reinterpret_cast(s->GetCaptureGraph()), - reinterpret_cast(s->GetLastCapturedNodes().data()), - s->GetLastCapturedNodes().size(), false); + auto* const node = new hip::GraphEventRecordNode(event); + const auto status = hip::ihipGraphAddNode(node, hip_stream->GetCaptureGraph(), + lastCapturedNodes.data(), + lastCapturedNodes.size(), false); if (status != hipSuccess) { ClPrint(amd::LOG_ERROR, amd::LOG_CODE, "hipEventRecord add external event node failed"); return status; } - s->SetLastCapturedNode(node); + hip_stream->SetLastCapturedNode(node); } - } else { - if (e->deviceId() != hip_stream->DeviceId()) { - return hipErrorInvalidResourceHandle; - } - status = e->addMarker(hip_stream, nullptr, !hip::Event::kBatchFlush); + return hipSuccess; } - return status; + + // Normal event recording + if (e->deviceId() != hip_stream->DeviceId()) { + return hipErrorInvalidResourceHandle; + } + return e->addMarker(hip_stream, nullptr, !hip::Event::kBatchFlush); } // ================================================================================================ @@ -446,7 +446,7 @@ hipError_t hipEventRecord_spt(hipEvent_t event, hipStream_t stream) { } // ================================================================================================ -hipError_t hipEventRecordWithFlags(hipEvent_t event, hipStream_t stream, unsigned int flags) { +hipError_t hipEventRecordWithFlags(hipEvent_t event, hipStream_t stream, uint32_t flags) { HIP_INIT_API(hipEventRecordWithFlags, event, stream, flags); HIP_RETURN(hipEventRecord_common(event, stream, flags)); } @@ -458,19 +458,24 @@ hipError_t hipEventSynchronize(hipEvent_t event) { if (event == nullptr) { HIP_RETURN(hipErrorInvalidHandle); } - hip::Event* e = reinterpret_cast(event); - auto hip_stream = e->GetCaptureStream(); - hip::Stream* s = reinterpret_cast(hip_stream); - if ((hip_stream != nullptr && hip_stream != hipStreamLegacy) && - (s->GetCaptureStatus() == hipStreamCaptureStatusActive)) { - s->SetCaptureStatus(hipStreamCaptureStatusInvalidated); - HIP_RETURN(hipErrorCapturedEvent); + + auto* e = reinterpret_cast(event); + const auto hip_stream = e->GetCaptureStream(); + + // Check for active capture + if (hip_stream != nullptr && hip_stream != hipStreamLegacy) { + auto* s = reinterpret_cast(hip_stream); + if (s->GetCaptureStatus() == hipStreamCaptureStatusActive) { + s->SetCaptureStatus(hipStreamCaptureStatusInvalidated); + HIP_RETURN(hipErrorCapturedEvent); + } } - if (hip::Stream::StreamCaptureOngoing(hip_stream) == true) { + + if (hip::Stream::StreamCaptureOngoing(hip_stream)) { HIP_RETURN(hipErrorStreamCaptureUnsupported); } - hipError_t status = e->synchronize(); + const auto status = e->synchronize(); // Release freed memory for all memory pools on the device g_devices[e->deviceId()]->ReleaseFreedMemory(); HIP_RETURN(status); @@ -482,20 +487,26 @@ hipError_t ihipEventQuery(hipEvent_t event) { return hipErrorInvalidHandle; } - hip::Event* e = reinterpret_cast(event); - auto hip_stream = e->GetCaptureStream(); - hip::Stream* s = reinterpret_cast(hip_stream); - if ((hip_stream != nullptr && hip_stream != hipStreamLegacy) && - (s->GetCaptureStatus() == hipStreamCaptureStatusActive)) { - s->SetCaptureStatus(hipStreamCaptureStatusInvalidated); - HIP_RETURN(hipErrorCapturedEvent); + auto* e = reinterpret_cast(event); + const auto hip_stream = e->GetCaptureStream(); + + // Check for active capture + if (hip_stream != nullptr && hip_stream != hipStreamLegacy) { + auto* s = reinterpret_cast(hip_stream); + if (s->GetCaptureStatus() == hipStreamCaptureStatusActive) { + s->SetCaptureStatus(hipStreamCaptureStatusInvalidated); + HIP_RETURN(hipErrorCapturedEvent); + } } - if (hip::Stream::StreamCaptureOngoing(e->GetCaptureStream())) { + + if (hip::Stream::StreamCaptureOngoing(hip_stream)) { HIP_RETURN(hipErrorStreamCaptureUnsupported); } + return e->query(); } +// ================================================================================================ hipError_t hipEventQuery(hipEvent_t event) { HIP_INIT_API(hipEventQuery, event); HIP_RETURN(ihipEventQuery(event)); diff --git a/projects/clr/hipamd/src/hip_event.hpp b/projects/clr/hipamd/src/hip_event.hpp index 7986a4c5ef..2126d44591 100644 --- a/projects/clr/hipamd/src/hip_event.hpp +++ b/projects/clr/hipamd/src/hip_event.hpp @@ -32,55 +32,58 @@ namespace hip { class StreamCallback { protected: - void* userData_; + void* userData_; //!< User data passed to callback function public: - StreamCallback(void* userData) : userData_(userData) {} + explicit StreamCallback(void* userData) : userData_(userData) {} virtual void CL_CALLBACK callback() = 0; - virtual ~StreamCallback() {}; + virtual ~StreamCallback() = default; }; class StreamAddCallback : public StreamCallback { - hipStreamCallback_t callBack_; - hipStream_t stream_; + hipStreamCallback_t callBack_; //!< Stream callback function pointer + hipStream_t stream_; //!< Stream associated with the callback public: StreamAddCallback(hipStream_t stream, hipStreamCallback_t callback, void* userData) - : StreamCallback(userData) { - stream_ = stream; - callBack_ = callback; - } + : StreamCallback(userData), stream_(stream), callBack_(callback) {} - void CL_CALLBACK callback() { + void CL_CALLBACK callback() override { hipError_t status = hipSuccess; callBack_(stream_, status, userData_); } }; class LaunchHostFuncCallback : public StreamCallback { - hipHostFn_t callBack_; + hipHostFn_t callBack_; //!< Host function callback pointer public: - LaunchHostFuncCallback(hipHostFn_t callback, void* userData) : StreamCallback(userData) { - callBack_ = callback; - } + LaunchHostFuncCallback(hipHostFn_t callback, void* userData) + : StreamCallback(userData), callBack_(callback) {} - void CL_CALLBACK callback() { callBack_(userData_); } + void CL_CALLBACK callback() override { callBack_(userData_); } }; void CL_CALLBACK ihipStreamCallback(cl_event event, cl_int command_exec_status, void* user_data); - #define IPC_SIGNALS_PER_EVENT 32 + +// Optimized IPC event shared memory structure +// Note: All atomics use relaxed memory ordering where safe for performance typedef struct ihipIpcEventShmem_s { + // Reference counting for shared memory lifecycle std::atomic owners; + // Metadata: only written once during initialization, relaxed ordering safe std::atomic owners_device_id; std::atomic owners_process_id; + // Ring buffer indices: requires acquire-release ordering for synchronization std::atomic read_index; std::atomic write_index; - uint32_t signal[IPC_SIGNALS_PER_EVENT]; + // Signal array: GPU-accessible memory for event signaling + // Using uint32_t for GPU compatibility + alignas(64) uint32_t signal[IPC_SIGNALS_PER_EVENT]; } ihipIpcEventShmem_t; class EventMarker : public amd::Marker { @@ -97,24 +100,25 @@ class EventMarker : public amd::Marker { }; class Event { - /// capture stream where event is recorded + /// Capture stream where event is recorded hipStream_t captureStream_ = nullptr; /// Previous captured nodes before event record std::vector nodesPrevToRecorded_; protected: bool CheckHwEvent() { - amd::SyncPolicy policy = - (flags_ == hipEventBlockingSync) ? amd::SyncPolicy::Blocking : amd::SyncPolicy::Auto; + const amd::SyncPolicy policy = + (flags_ == hipEventBlockingSync) ? amd::SyncPolicy::Blocking : amd::SyncPolicy::Auto; return g_devices[deviceId()]->devices()[0]->IsHwEventReady(*event_, false, policy); } public: - constexpr static bool kBatchFlush = true; //!< Flushes CPU command batch in direct dispatch mode + // Flushes CPU command batch in direct dispatch mode + static constexpr bool kBatchFlush = true; - Event(uint32_t flags) - : flags_(flags), lock_(true) /* hipEvent_t lock*/, event_(nullptr), stream_(nullptr) { - device_id_ = hip::getCurrentDevice()->deviceId(); // Created in current device ctx + explicit Event(uint32_t flags) + : flags_(flags), lock_(true), event_(nullptr) { + device_id_ = hip::getCurrentDevice()->deviceId(); } virtual ~Event() { @@ -122,7 +126,6 @@ class Event { event_->release(); } } - uint32_t flags_; //!< flags associated with the event virtual hipError_t query(); virtual hipError_t synchronize(); @@ -136,6 +139,8 @@ class Event { virtual hipError_t enqueueRecordCommand(hip::Stream* stream, amd::Command* command); hipError_t addMarker(hip::Stream* stream, amd::Command* command, bool batch_flush = true); + uint32_t flags() const { return flags_; } + void BindCommand(amd::Command& command) { amd::ScopedLock lock(lock_); if (event_ != nullptr) { @@ -146,17 +151,20 @@ class Event { } amd::Monitor& lock() { return lock_; } - const int deviceId() const { return device_id_; } + int deviceId() const { return device_id_; } void setDeviceId(int id) { device_id_ = id; } amd::Event* event() { return event_; } + /// Get capture stream where event is recorded hipStream_t GetCaptureStream() const { return captureStream_; } /// Set capture stream where event is recorded void SetCaptureStream(hipStream_t stream) { captureStream_ = stream; } /// Returns previous captured nodes before event record - std::vector GetNodesPrevToRecorded() const { return nodesPrevToRecorded_; } + const std::vector& GetNodesPrevToRecorded() const { + return nodesPrevToRecorded_; + } /// Set last captured graph node before event record - void SetNodesPrevToRecorded(std::vector& graphNode) { + void SetNodesPrevToRecorded(const std::vector& graphNode) { nodesPrevToRecorded_ = graphNode; } virtual hipError_t GetHandle(ihipIpcEventHandle_t* handle) { @@ -170,35 +178,38 @@ class Event { virtual int64_t time(bool getStartTs) const; protected: - amd::Monitor lock_; - hip::Stream* stream_; - amd::Event* event_; - int device_id_; + uint32_t flags_; //!< Flags associated with the event + amd::Monitor lock_; //!< Mutex for thread-safe access to event state + amd::Event* event_; //!< Underlying ROCclr event object for GPU synchronization + int device_id_; //!< Device ID where this event was created }; class EventDD : public Event { public: - EventDD(unsigned int flags) : Event(flags) {} - virtual ~EventDD() {} + explicit EventDD(uint32_t flags) : Event(flags) {} + ~EventDD() override = default; - virtual bool awaitEventCompletion(); - virtual bool ready(); - virtual int64_t time(bool getStartTs) const; + bool awaitEventCompletion() override; + bool ready() override; + int64_t time(bool getStartTs) const override; }; class IPCEvent : public Event { - // IPC Events + /// IPC event metadata structure struct ihipIpcEvent_t { - std::string ipc_name_; - int ipc_fd_; - ihipIpcEventShmem_t* ipc_shmem_; - ihipIpcEvent_t() : ipc_name_("dummy"), ipc_fd_(0), ipc_shmem_(nullptr) {} - void setipcname(const char* name) { ipc_name_ = std::string(name); } + std::string ipc_name_; //!< Name of the shared memory object for IPC + ihipIpcEventShmem_t* ipc_shmem_; //!< Pointer to mapped IPC shared memory structure + + ihipIpcEvent_t() : ipc_shmem_(nullptr) { + ipc_name_.reserve(32); // Reserve space for typical IPC name "/hip__" + } + void setipcname(const char* name) { ipc_name_ = name; } }; ihipIpcEvent_t ipc_evt_; public: - ~IPCEvent() { + explicit IPCEvent(uint32_t flags = hipEventInterprocess) : Event(flags) {} + ~IPCEvent() override { if (ipc_evt_.ipc_shmem_) { int owners = --ipc_evt_.ipc_shmem_->owners; // Make sure event is synchronized @@ -213,12 +224,11 @@ class IPCEvent : public Event { } #if !defined(_MSC_VER) // Clean up the POSIX shared memory object - if (!ipc_evt_.ipc_name_.empty() && ipc_evt_.ipc_name_ != "dummy") { + if (!ipc_evt_.ipc_name_.empty()) { shm_unlink(ipc_evt_.ipc_name_.c_str()); } #endif } - IPCEvent() : Event(hipEventInterprocess) {} bool createIpcEventShmemIfNeeded(); hipError_t GetHandle(ihipIpcEventHandle_t* handle) override; hipError_t OpenHandle(ihipIpcEventHandle_t* handle) override; @@ -232,10 +242,10 @@ class IPCEvent : public Event { hipError_t enqueueRecordCommand(hip::Stream* stream, amd::Command* command) override; }; - +/// Callback data for IPC event stream wait operations struct CallbackData { - int previous_read_index; - hip::ihipIpcEventShmem_t* shmem; + const int previous_read_index; //!< Snapshot of read index for synchronization + hip::ihipIpcEventShmem_t* const shmem; //!< IPC shared memory for event signaling }; } // namespace hip diff --git a/projects/clr/hipamd/src/hip_event_ipc.cpp b/projects/clr/hipamd/src/hip_event_ipc.cpp index e755b6c22d..88b473d9fe 100644 --- a/projects/clr/hipamd/src/hip_event_ipc.cpp +++ b/projects/clr/hipamd/src/hip_event_ipc.cpp @@ -32,12 +32,14 @@ namespace hip { hipError_t ihipEventCreateWithFlags(hipEvent_t* event, unsigned flags); +// ================================================================================================ bool IPCEvent::createIpcEventShmemIfNeeded() { + // Early return if shared memory already exists if (ipc_evt_.ipc_shmem_) { - // ipc_shmem_ already created, no need to create it again return true; } + // Generate unique IPC name #if !defined(_MSC_VER) static std::atomic counter{0}; ipc_evt_.ipc_name_ = "/hip_" + std::to_string(getpid()) + "_" + std::to_string(counter++); @@ -48,34 +50,33 @@ bool IPCEvent::createIpcEventShmemIfNeeded() { ipc_evt_.ipc_name_.replace(0, 5, "/hip_"); #endif - if (!amd::Os::MemoryMapFileTruncated( - ipc_evt_.ipc_name_.c_str(), - const_cast(reinterpret_cast(&(ipc_evt_.ipc_shmem_))), - sizeof(hip::ihipIpcEventShmem_t))) { + // Create memory-mapped file for shared memory + auto** shmem_ptr = reinterpret_cast(&ipc_evt_.ipc_shmem_); + if (!amd::Os::MemoryMapFileTruncated(ipc_evt_.ipc_name_.c_str(), + const_cast(shmem_ptr), + sizeof(hip::ihipIpcEventShmem_t))) { return false; } - ipc_evt_.ipc_shmem_->owners = 1; - ipc_evt_.ipc_shmem_->read_index = -1; - ipc_evt_.ipc_shmem_->write_index = 0; - for (uint32_t sig_idx = 0; sig_idx < IPC_SIGNALS_PER_EVENT; ++sig_idx) { - ipc_evt_.ipc_shmem_->signal[sig_idx] = 0; - } + // Initialize shared memory fields + auto* const shmem = ipc_evt_.ipc_shmem_; + shmem->owners = 1; + shmem->read_index = -1; + shmem->write_index = 0; + std::fill_n(shmem->signal, IPC_SIGNALS_PER_EVENT, 0); - // device sets 0 to this ptr when the ipc event is completed - hipError_t status = - ihipHostRegister(&ipc_evt_.ipc_shmem_->signal, sizeof(uint32_t) * IPC_SIGNALS_PER_EVENT, 0); - if (status != hipSuccess) { - return false; - } - return true; + // Register signal array with device + constexpr size_t kSignalArraySize = sizeof(uint32_t) * IPC_SIGNALS_PER_EVENT; + const auto status = ihipHostRegister(&shmem->signal, kSignalArraySize, 0); + return status == hipSuccess; } // ================================================================================================ hipError_t IPCEvent::query() { if (ipc_evt_.ipc_shmem_) { - int prev_read_idx = ipc_evt_.ipc_shmem_->read_index; - int offset = (prev_read_idx % IPC_SIGNALS_PER_EVENT); + const int prev_read_idx = ipc_evt_.ipc_shmem_->read_index; + const int offset = prev_read_idx % IPC_SIGNALS_PER_EVENT; + if (ipc_evt_.ipc_shmem_->read_index < prev_read_idx + IPC_SIGNALS_PER_EVENT && ipc_evt_.ipc_shmem_->signal[offset] != 0) { return hipErrorNotReady; @@ -101,11 +102,12 @@ hipError_t IPCEvent::synchronize() { // ================================================================================================ hipError_t IPCEvent::streamWait(hip::Stream* stream, uint flags) { - int offset = ipc_evt_.ipc_shmem_->read_index; - hipError_t status = - ihipStreamOperation(reinterpret_cast(stream), ROCCLR_COMMAND_STREAM_WAIT_VALUE, - &(ipc_evt_.ipc_shmem_->signal[offset]), 0, 1, 1, sizeof(uint32_t)); - return status; + const int offset = ipc_evt_.ipc_shmem_->read_index; + return ihipStreamOperation( + reinterpret_cast(stream), + ROCCLR_COMMAND_STREAM_WAIT_VALUE, + &(ipc_evt_.ipc_shmem_->signal[offset]), 0, 1, 1, + sizeof(uint32_t)); } // ================================================================================================ @@ -117,37 +119,41 @@ hipError_t IPCEvent::recordCommand(amd::Command*& command, amd::HostQueue* strea // ================================================================================================ hipError_t IPCEvent::enqueueRecordCommand(hip::Stream* stream, amd::Command* command) { - amd::Event& tEvent = command->event(); createIpcEventShmemIfNeeded(); - int write_index = ipc_evt_.ipc_shmem_->write_index++; - int offset = write_index % IPC_SIGNALS_PER_EVENT; - while (ipc_evt_.ipc_shmem_->signal[offset] != 0) { + + // Allocate signal slot for this event + auto* const shmem = ipc_evt_.ipc_shmem_; + const int write_index = shmem->write_index++; + const int offset = write_index % IPC_SIGNALS_PER_EVENT; + auto& signal = shmem->signal[offset]; + + // Wait for signal slot to become available + while (signal != 0) { amd::Os::sleep(1); } - // Lock signal. - ipc_evt_.ipc_shmem_->signal[offset] = 1; - ipc_evt_.ipc_shmem_->owners_device_id = deviceId(); + + // Lock signal and set device ID + signal = 1; + shmem->owners_device_id = deviceId(); command->enqueue(); - // Set event_ in order to release marked command when event is destroyed + // Set event_ to release marked command when event is destroyed if (event_ != nullptr) { event_->release(); } event_ = &command->event(); - // device writes 0 to signal after the hipEventRecord command is completed - // the signal value is checked by WaitThenDecrementSignal cb - hipError_t status = - ihipStreamOperation(reinterpret_cast(stream), ROCCLR_COMMAND_STREAM_WRITE_VALUE, - &(ipc_evt_.ipc_shmem_->signal[offset]), 0, 0, 0, sizeof(uint32_t)); - + // Device writes 0 to signal after hipEventRecord command completes + const auto status = ihipStreamOperation(reinterpret_cast(stream), + ROCCLR_COMMAND_STREAM_WRITE_VALUE, &signal, 0, 0, 0, + sizeof(uint32_t)); if (status != hipSuccess) { return status; } - // Update read index to indicate new signal. + // Update read index to indicate new signal int expected = write_index - 1; - while (!ipc_evt_.ipc_shmem_->read_index.compare_exchange_weak(expected, write_index)) { + while (!shmem->read_index.compare_exchange_weak(expected, write_index)) { amd::Os::sleep(1); } @@ -169,23 +175,28 @@ hipError_t IPCEvent::GetHandle(ihipIpcEventHandle_t* handle) { // ================================================================================================ hipError_t IPCEvent::OpenHandle(ihipIpcEventHandle_t* handle) { ipc_evt_.ipc_name_ = handle->shmem_name; + + // Map shared memory from IPC handle + auto** shmem_ptr = reinterpret_cast(&ipc_evt_.ipc_shmem_); if (!amd::Os::MemoryMapFileTruncated(ipc_evt_.ipc_name_.c_str(), - (const void**)&(ipc_evt_.ipc_shmem_), + const_cast(shmem_ptr), sizeof(ihipIpcEventShmem_t))) { return hipErrorInvalidValue; } - if (amd::Os::getProcessId() == ipc_evt_.ipc_shmem_->owners_process_id.load()) { - // If this is in the same process, return error. + auto* const shmem = ipc_evt_.ipc_shmem_; + + // Prevent opening in the same process + const auto current_process_id = amd::Os::getProcessId(); + if (current_process_id == shmem->owners_process_id.load()) { return hipErrorInvalidContext; } - ipc_evt_.ipc_shmem_->owners += 1; - // device sets 0 to this ptr when the ipc event is completed - hipError_t status = hipSuccess; - status = - ihipHostRegister(&ipc_evt_.ipc_shmem_->signal, sizeof(uint32_t) * IPC_SIGNALS_PER_EVENT, 0); - return status; + shmem->owners += 1; + + // Register signal array with device + constexpr size_t kSignalArraySize = sizeof(uint32_t) * IPC_SIGNALS_PER_EVENT; + return ihipHostRegister(&shmem->signal, kSignalArraySize, 0); } // ================================================================================================ @@ -195,31 +206,33 @@ hipError_t hipIpcGetEventHandle(hipIpcEventHandle_t* handle, hipEvent_t event) { if (handle == nullptr || event == nullptr) { HIP_RETURN(hipErrorInvalidValue); } - hip::Event* e = reinterpret_cast(event); + + auto e = reinterpret_cast(event); HIP_RETURN(e->GetHandle(reinterpret_cast(handle))); } +// ================================================================================================ hipError_t hipIpcOpenEventHandle(hipEvent_t* event, hipIpcEventHandle_t handle) { HIP_INIT_API(hipIpcOpenEventHandle, event, handle); - hipError_t status = hipSuccess; if (event == nullptr) { HIP_RETURN(hipErrorInvalidValue); } - status = ihipEventCreateWithFlags(event, hipEventDisableTiming | hipEventInterprocess); + // Create IPC event with timing disabled + constexpr uint32_t kIpcEventFlags = hipEventDisableTiming | hipEventInterprocess; + auto status = ihipEventCreateWithFlags(event, kIpcEventFlags); if (status != hipSuccess) { HIP_RETURN(status); } - hip::Event* e = reinterpret_cast(*event); - ihipIpcEventHandle_t* iHandle = reinterpret_cast(&handle); + auto* const e = reinterpret_cast(*event); + auto* const iHandle = reinterpret_cast(&handle); - status = e->OpenHandle(iHandle); - // Free the event in case of failure - if (status != hipSuccess) { + const auto open_status = e->OpenHandle(iHandle); + if (open_status != hipSuccess) { delete e; } - HIP_RETURN(status); + HIP_RETURN(open_status); } } // namespace hip diff --git a/projects/clr/hipamd/src/hip_module.cpp b/projects/clr/hipamd/src/hip_module.cpp index ad0a02ee52..11949ace3b 100644 --- a/projects/clr/hipamd/src/hip_module.cpp +++ b/projects/clr/hipamd/src/hip_module.cpp @@ -509,7 +509,7 @@ hipError_t ihipModuleLaunchKernel(hipFunction_t f, amd::LaunchParams& launch_par if (stopEvent != nullptr) { hip::Event* eStop = reinterpret_cast(stopEvent); - if (eStop->flags_ & hipEventDisableSystemFence) { + if (eStop->flags() & hipEventDisableSystemFence) { command->setCommandEntryScope(amd::Device::kCacheStateIgnore); } else { command->setCommandEntryScope(amd::Device::kCacheStateSystem);