SWDEV-514686 - Fixed hipEventSynchronize/hipStreamWaitEvent for IPC events

Resolved an issue where hipEventSynchronize and hipStreamWaitEvent APIs
did not function correctly for events created with the hipEventInterprocess flag.
The bug caused the event to be incorrectly marked as "recorded,"
leading to these APIs failing to wait for the event as expected.

Change-Id: Ic9fdfaab2393beb93d6e0b83661545e902a63499
Tento commit je obsažen v:
Ioannis Assiouras
2025-02-11 13:50:05 +00:00
rodič cf6aabb823
revize 1cdfbfd270
7 změnil soubory, kde provedl 44 přidání a 64 odebrání
+5 -11
Zobrazit soubor
@@ -138,12 +138,7 @@ hipError_t Event::elapsedTime(Event& eStop, float& ms) {
// Hence for now make sure CPU status is updated by calling awaitCompletion();
awaitEventCompletion();
eStop.awaitEventCompletion();
if (unrecorded_ && eStop.isUnRecorded()) {
// Both the events are not recorded, just need the end and start of stop event
ms = static_cast<float>(eStop.time(false) - eStop.time(true)) / 1000000.f;
} else {
ms = static_cast<float>(eStop.time(false) - time(false)) / 1000000.f;
}
ms = static_cast<float>(eStop.time(false) - time(false)) / 1000000.f;
}
return hipSuccess;
}
@@ -235,7 +230,7 @@ hipError_t Event::recordCommand(amd::Command*& command, amd::HostQueue* stream,
}
// ================================================================================================
hipError_t Event::enqueueRecordCommand(hipStream_t stream, amd::Command* command, bool record) {
hipError_t Event::enqueueRecordCommand(hipStream_t stream, amd::Command* command) {
command->enqueue();
if (event_ == &command->event()) {
return hipSuccess;
@@ -244,14 +239,13 @@ hipError_t Event::enqueueRecordCommand(hipStream_t stream, amd::Command* command
event_->release();
}
event_ = &command->event();
unrecorded_ = !record;
return hipSuccess;
}
// ================================================================================================
hipError_t Event::addMarker(hipStream_t stream, amd::Command* command,
bool record, bool batch_flush) {
bool batch_flush) {
// Skip wait as we should not be resolving stream in this sub
constexpr bool kSkipWait = true;
hip::Stream* hip_stream = hip::getStream(stream, kSkipWait);
@@ -261,7 +255,7 @@ hipError_t Event::addMarker(hipStream_t stream, amd::Command* command,
if (status != hipSuccess) {
return hipSuccess;
}
status = enqueueRecordCommand(stream, command, record);
status = enqueueRecordCommand(stream, command);
return status;
}
@@ -417,7 +411,7 @@ hipError_t hipEventRecord_common(hipEvent_t event, hipStream_t stream) {
if (g_devices[e->deviceId()]->devices()[0] != &hip_stream->device()) {
return hipErrorInvalidHandle;
}
status = e->addMarker(stream, nullptr, true, !hip::Event::kBatchFlush);
status = e->addMarker(stream, nullptr, !hip::Event::kBatchFlush);
}
return status;
}
+5 -11
Zobrazit soubor
@@ -105,7 +105,7 @@ class Event {
constexpr static bool kBatchFlush = true; //!< Flushes CPU command batch in direct dispatch mode
Event(uint32_t flags) : flags_(flags), lock_(true) /* hipEvent_t lock*/,
event_(nullptr), unrecorded_(false), stream_(nullptr) {
event_(nullptr), stream_(nullptr) {
device_id_ = hip::getCurrentDevice()->deviceId(); // Created in current device ctx
}
@@ -126,21 +126,19 @@ class Event {
virtual hipError_t recordCommand(amd::Command*& command, amd::HostQueue* stream,
uint32_t flags = 0, bool batch_flush = true);
virtual hipError_t enqueueRecordCommand(hipStream_t stream, amd::Command* command, bool record);
virtual hipError_t enqueueRecordCommand(hipStream_t stream, amd::Command* command);
hipError_t addMarker(hipStream_t stream, amd::Command* command,
bool record, bool batch_flush = true);
bool batch_flush = true);
void BindCommand(amd::Command& command, bool record) {
void BindCommand(amd::Command& command) {
amd::ScopedLock lock(lock_);
if (event_ != nullptr) {
event_->release();
}
event_ = &command.event();
unrecorded_ = !record;
command.retain();
}
bool isUnRecorded() const { return unrecorded_; }
amd::Monitor& lock() { return lock_; }
const int deviceId() const { return device_id_; }
void setDeviceId(int id) { device_id_ = id; }
@@ -170,10 +168,6 @@ class Event {
hip::Stream* stream_;
amd::Event* event_;
int device_id_;
//! Flag to indicate hipEventRecord has not been called. This is needed for
//! hip*ModuleLaunchKernel API which takes start and stop events so no
//! hipEventRecord is called. Cleanup needed once those APIs are deprecated.
bool unrecorded_;
};
class EventDD : public Event {
@@ -222,7 +216,7 @@ class IPCEvent : public Event {
hipError_t recordCommand(amd::Command*& command, amd::HostQueue* queue,
uint32_t flags = 0, bool batch_flush = true) override;
hipError_t enqueueRecordCommand(hipStream_t stream, amd::Command* command, bool record);
hipError_t enqueueRecordCommand(hipStream_t stream, amd::Command* command);
};
+29 -37
Zobrazit soubor
@@ -142,49 +142,41 @@ hipError_t IPCEvent::streamWait(hipStream_t stream, uint flags) {
// ================================================================================================
hipError_t IPCEvent::recordCommand(amd::Command*& command, amd::HostQueue* stream,
uint32_t flags, bool batch_flush) {
bool unrecorded = isUnRecorded();
if (unrecorded) {
command = new amd::Marker(*stream, kMarkerDisableFlush);
} else {
return Event::recordCommand(command, stream, batch_flush);
}
command = new amd::Marker(*stream, kMarkerDisableFlush);
return hipSuccess;
}
// ================================================================================================
hipError_t IPCEvent::enqueueRecordCommand(hipStream_t stream, amd::Command* command, bool record) {
bool unrecorded = isUnRecorded();
if (unrecorded) {
amd::Event& tEvent = command->event();
createIpcEventShmemIfNeeded();
int write_index = ipc_evt_.ipc_shmem_->write_index++;
int offset = write_index % IPC_SIGNALS_PER_EVENT;
while (ipc_evt_.ipc_shmem_->signal[offset] != 0) {
amd::Os::sleep(1);
}
// Lock signal.
ipc_evt_.ipc_shmem_->signal[offset] = 1;
ipc_evt_.ipc_shmem_->owners_device_id = deviceId();
command->enqueue();
hipError_t IPCEvent::enqueueRecordCommand(hipStream_t stream, amd::Command* command) {
// device writes 0 to signal after the hipEventRecord command is completed
// the signal value is checked by WaitThenDecrementSignal cb
hipError_t status = ihipStreamOperation(stream, ROCCLR_COMMAND_STREAM_WRITE_VALUE,
&(ipc_evt_.ipc_shmem_->signal[offset]),
0,
0, 0, sizeof(uint32_t));
if (status != hipSuccess) {
return status;
}
// Update read index to indicate new signal.
int expected = write_index - 1;
while (!ipc_evt_.ipc_shmem_->read_index.compare_exchange_weak(expected, write_index)) {
amd::Os::sleep(1);
}
} else {
return Event::enqueueRecordCommand(stream, command, record);
amd::Event& tEvent = command->event();
createIpcEventShmemIfNeeded();
int write_index = ipc_evt_.ipc_shmem_->write_index++;
int offset = write_index % IPC_SIGNALS_PER_EVENT;
while (ipc_evt_.ipc_shmem_->signal[offset] != 0) {
amd::Os::sleep(1);
}
// Lock signal.
ipc_evt_.ipc_shmem_->signal[offset] = 1;
ipc_evt_.ipc_shmem_->owners_device_id = deviceId();
command->enqueue();
// device writes 0 to signal after the hipEventRecord command is completed
// the signal value is checked by WaitThenDecrementSignal cb
hipError_t status = ihipStreamOperation(stream, ROCCLR_COMMAND_STREAM_WRITE_VALUE,
&(ipc_evt_.ipc_shmem_->signal[offset]),
0,
0, 0, sizeof(uint32_t));
if (status != hipSuccess) {
return status;
}
// Update read index to indicate new signal.
int expected = write_index - 1;
while (!ipc_evt_.ipc_shmem_->read_index.compare_exchange_weak(expected, write_index)) {
amd::Os::sleep(1);
}
return hipSuccess;
}
+1 -1
Zobrazit soubor
@@ -2121,7 +2121,7 @@ class GraphEventRecordNode : public GraphNode {
hip::Event* e = reinterpret_cast<hip::Event*>(event_);
// command release during enqueueRecordCommand
hipError_t status = e->enqueueRecordCommand(
reinterpret_cast<hipStream_t>(stream), commands_[0], true);
reinterpret_cast<hipStream_t>(stream), commands_[0]);
if (status != hipSuccess) {
ClPrint(amd::LOG_ERROR, amd::LOG_CODE,
"[hipGraph] Enqueue event record command failed for node %p - status %d", this,
+1 -1
Zobrazit soubor
@@ -206,7 +206,7 @@ hipError_t hipFreeAsync(void* dev_ptr, hipStream_t stream) {
event = new hip::Event(0);
if (event != nullptr) {
if (hipSuccess !=
event->addMarker(reinterpret_cast<hipStream_t>(hip_stream), nullptr, true)) {
event->addMarker(reinterpret_cast<hipStream_t>(hip_stream), nullptr)) {
delete event;
event = nullptr;
} else {
+1 -1
Zobrazit soubor
@@ -284,7 +284,7 @@ bool MemoryPool::FreeMemory(amd::Memory* memory, Stream* stream, Event* event) {
// Add a marker to the stream to trace availability of this memory
Event* e = new hip::Event(0);
if (e != nullptr) {
if (hipSuccess == e->addMarker(reinterpret_cast<hipStream_t>(stream), nullptr, true)) {
if (hipSuccess == e->addMarker(reinterpret_cast<hipStream_t>(stream), nullptr)) {
ts.SetEvent(e);
// Make sure runtime sends a notification
auto result = e->ready();
+2 -2
Zobrazit soubor
@@ -461,7 +461,7 @@ hipError_t ihipModuleLaunchKernel(hipFunction_t f, uint32_t globalWorkSizeX,
if (startEvent != nullptr) {
hip::Event* eStart = reinterpret_cast<hip::Event*>(startEvent);
status = eStart->addMarker(hStream, nullptr, false);
status = eStart->addMarker(hStream, nullptr);
if (status != hipSuccess) {
return status;
}
@@ -476,7 +476,7 @@ hipError_t ihipModuleLaunchKernel(hipFunction_t f, uint32_t globalWorkSizeX,
}
// Enqueue Dispatch and bind the stop event
command->enqueue();
eStop->BindCommand(*command, false);
eStop->BindCommand(*command);
} else {
command->enqueue();
}