SWDEV-430437,SWDEV-434702 - Split the streamset per device

Change-Id: If1bcca45825c9899462bb95ed6f637f5af806cc8
Bu işleme şunda yer alıyor:
Ioannis Assiouras
2024-02-06 00:34:35 +00:00
ebeveyn 1239309c90
işleme be2bdabb76
6 değiştirilmiş dosya ile 175 ekleme ve 144 silme
+1 -1
Dosyayı Görüntüle
@@ -95,7 +95,7 @@ hip::Stream* getStream(hipStream_t stream, bool wait) {
hip::Stream* hip_stream = reinterpret_cast<hip::Stream*>(stream);
if (wait && !(hip_stream->Flags() & hipStreamNonBlocking)) {
constexpr bool WaitNullStreamOnly = true;
iHipWaitActiveStreams(hip_stream, WaitNullStreamOnly);
hip_stream->GetDevice()->WaitActiveStreams(hip_stream, WaitNullStreamOnly);
}
return hip_stream;
}
+141 -2
Dosyayı Görüntüle
@@ -43,7 +43,7 @@ hip::Stream* Device::NullStream(bool wait) {
}
if (wait == true) {
// Wait for all active streams before executing commands on the default
iHipWaitActiveStreams(null_stream_);
WaitActiveStreams(null_stream_);
}
return null_stream_;
}
@@ -149,11 +149,150 @@ void Device::Reset() {
mem_pools_.clear();
}
flags_ = hipDeviceScheduleSpin;
hip::Stream::destroyAllStreams(deviceId_);
destroyAllStreams();
amd::MemObjMap::Purge(devices()[0]);
Create();
}
// ================================================================================================
void Device::WaitActiveStreams(hip::Stream* blocking_stream, bool wait_null_stream) {
amd::Command::EventWaitList eventWaitList(0);
bool submitMarker = 0;
auto waitForStream = [&submitMarker,
&eventWaitList](hip::Stream* stream) {
if (amd::Command *command = stream->getLastQueuedCommand(true)) {
amd::Event &event = command->event();
// Check HW status of the ROCcrl event.
// Note: not all ROCclr modes support HW status
bool ready = stream->device().IsHwEventReady(event);
if (!ready) {
ready = (command->status() == CL_COMPLETE);
}
submitMarker |= stream->vdev()->isFenceDirty();
// Check the current active status
if (!ready) {
command->notifyCmdQueue();
eventWaitList.push_back(command);
} else {
command->release();
}
}
};
if (wait_null_stream) {
if (null_stream_) {
waitForStream(null_stream_);
}
} else {
amd::ScopedLock lock(streamSetLock);
for (const auto& active_stream : streamSet) {
// If it's the current device
if (// Make sure it's a default stream
((active_stream->Flags() & hipStreamNonBlocking) == 0) &&
// and it's not the current stream
(active_stream != blocking_stream)) {
// Get the last valid command
waitForStream(active_stream);
}
}
}
// Check if we have to wait anything
if (eventWaitList.size() > 0 || submitMarker) {
amd::Command* command = new amd::Marker(*blocking_stream, kMarkerDisableFlush, eventWaitList);
if (command != nullptr) {
command->enqueue();
command->release();
}
}
// Release all active commands. It's safe after the marker was enqueued
for (const auto& it : eventWaitList) {
it->release();
}
}
// ================================================================================================
void Device::AddStream(Stream* stream) {
amd::ScopedLock lock(streamSetLock);
streamSet.insert(stream);
}
// ================================================================================================
void Device::RemoveStream(Stream* stream){
amd::ScopedLock lock(streamSetLock);
streamSet.erase(stream);
}
// ================================================================================================
bool Device::StreamExists(Stream* stream){
amd::ScopedLock lock(streamSetLock);
if (streamSet.find(stream) != streamSet.end()) {
return true;
}
return false;
}
// ================================================================================================
void Device::destroyAllStreams() {
std::vector<Stream*> toBeDeleted;
{
amd::ScopedLock lock(streamSetLock);
for (auto& it : streamSet) {
if (it->Null() == false ) {
toBeDeleted.push_back(it);
}
}
}
for (auto& it : toBeDeleted) {
hip::Stream::Destroy(it);
}
}
// ================================================================================================
void Device::SyncAllStreams( bool cpu_wait) {
// Make a local copy to avoid stalls for GPU finish with multiple threads
std::vector<hip::Stream*> streams;
streams.reserve(streamSet.size());
{
amd::ScopedLock lock(streamSetLock);
for (auto it : streamSet) {
streams.push_back(it);
it->retain();
}
}
for (auto it : streams) {
it->finish(cpu_wait);
it->release();
}
// Release freed memory for all memory pools on the device
ReleaseFreedMemory();
}
// ================================================================================================
bool Device::StreamCaptureBlocking() {
amd::ScopedLock lock(streamSetLock);
for (auto& it : streamSet) {
if (it->GetCaptureStatus() == hipStreamCaptureStatusActive && it->Flags() != hipStreamNonBlocking) {
return true;
}
}
return false;
}
// ================================================================================================
bool Device::existsActiveStreamForDevice() {
amd::ScopedLock lock(streamSetLock);
for (const auto& active_stream : streamSet) {
if (active_stream->GetQueueStatus()) {
return true;
}
}
return false;
}
// ================================================================================================
Device::~Device() {
if (default_mem_pool_ != nullptr) {
+1 -1
Dosyayı Görüntüle
@@ -610,7 +610,7 @@ hipError_t hipDeviceSetSharedMemConfig(hipSharedMemConfig config) {
hipError_t hipDeviceSynchronize() {
HIP_INIT_API(hipDeviceSynchronize);
constexpr bool kDoWaitForCpu = true;
hip::Stream::SyncAllStreams(hip::getCurrentDevice()->deviceId(), kDoWaitForCpu);
hip::getCurrentDevice()->SyncAllStreams(kDoWaitForCpu);
HIP_RETURN(hipSuccess);
}
+19 -12
Dosyayı Görüntüle
@@ -306,15 +306,9 @@ public:
/// Returns the CU mask for the current stream
const std::vector<uint32_t> GetCUMask() const { return cuMask_; }
/// Sync all streams
static void SyncAllStreams(int deviceId, bool cpu_wait = true);
/// Check whether any blocking stream running
static bool StreamCaptureBlocking();
/// Destroy all streams on a given device
static void destroyAllStreams(int deviceId);
static void Destroy(hip::Stream* stream);
/// Check Stream Capture status to make sure it is done
@@ -416,7 +410,6 @@ public:
parallelCaptureStreams_.erase(it);
}
}
static bool existsActiveStreamForDevice(hip::Device* device);
/// The stream should be destroyed via release() rather than delete
private:
@@ -426,6 +419,8 @@ public:
/// HIP Device class
class Device {
amd::Monitor lock_{"Device lock", true};
amd::Monitor streamSetLock{"Guards device stream set"};
std::unordered_set<hip::Stream*> streamSet;
/// ROCclr context
amd::Context* context_;
/// Device's ID
@@ -499,7 +494,7 @@ public:
amd::ScopedLock lock(lock_);
/// Either stream is active or device is active
if (isActive_) return true;
if (Stream::existsActiveStreamForDevice(this)) {
if (existsActiveStreamForDevice()) {
isActive_ = true;
return true;
}
@@ -540,6 +535,22 @@ public:
/// Returns true if memory pool is valid on this device
bool IsMemoryPoolValid(MemoryPool* pool);
void AddStream(Stream* stream);
void RemoveStream(Stream* stream);
bool StreamExists(Stream* stream);
void destroyAllStreams();
void SyncAllStreams( bool cpu_wait = true);
bool StreamCaptureBlocking();
bool existsActiveStreamForDevice();
/// Wait all active streams on the blocking queue. The method enqueues a wait command and
/// doesn't stall the current thread
void WaitActiveStreams(hip::Stream* blocking_stream, bool wait_null_stream = false);
};
/// Thread Local Storage Variables Aggregator Class
@@ -589,10 +600,6 @@ public:
extern void WaitThenDecrementSignal(hipStream_t stream, hipError_t status, void* user_data);
/// Wait all active streams on the blocking queue. The method enqueues a wait command and
/// doesn't stall the current thread
extern void iHipWaitActiveStreams(hip::Stream* blocking_stream, bool wait_null_stream = false);
extern std::vector<hip::Device*> g_devices;
extern hipError_t ihipDeviceGetCount(int* count);
extern int ihipGetDevice();
+4 -4
Dosyayı Görüntüle
@@ -76,7 +76,7 @@ hipError_t ihipFree(void *ptr) {
if (memory_object != nullptr) {
// Wait on the device, associated with the current memory object during allocation
auto device_id = memory_object->getUserData().deviceId;
hip::Stream::SyncAllStreams(device_id);
g_devices[device_id]->SyncAllStreams();
// Find out if memory belongs to any memory pool
if (!g_devices[device_id]->FreeMemory(memory_object, nullptr)) {
@@ -743,7 +743,7 @@ hipError_t ihipArrayDestroy(hipArray_t array) {
auto image = as_amd(memObj);
// Wait on the device, associated with the current memory object during allocation
hip::Stream::SyncAllStreams(image->getUserData().deviceId);
g_devices[image->getUserData().deviceId]->SyncAllStreams();
image->release();
delete array;
@@ -1252,7 +1252,7 @@ hipError_t ihipHostUnregister(void* hostPtr) {
if (mem != nullptr) {
// Wait on the device, associated with the current memory object during allocation
hip::Stream::SyncAllStreams(mem->getUserData().deviceId);
g_devices[mem->getUserData().deviceId]->SyncAllStreams();
amd::MemObjMap::RemoveMemObj(hostPtr);
for (const auto& device: g_devices) {
@@ -4304,7 +4304,7 @@ hipError_t ihipMipmappedArrayDestroy(hipMipmappedArray_t mipmapped_array_ptr) {
auto image = as_amd(mem_obj);
// Wait on the device, associated with the current memory object during allocation
hip::Stream::SyncAllStreams(image->getUserData().deviceId);
g_devices[image->getUserData().deviceId]->SyncAllStreams();
image->release();
delete mipmapped_array_ptr;
+9 -124
Dosyayı Görüntüle
@@ -25,8 +25,6 @@
#include "hip_prof_api.h"
namespace hip {
static amd::Monitor streamSetLock{"Guards global stream set"};
static std::unordered_set<hip::Stream*> streamSet;
// ================================================================================================
Stream::Stream(hip::Device* dev, Priority p, unsigned int f, bool null_stream,
@@ -43,8 +41,7 @@ Stream::Stream(hip::Device* dev, Priority p, unsigned int f, bool null_stream,
originStream_(false),
captureID_(0)
{
amd::ScopedLock lock(streamSetLock);
streamSet.insert(this);
device_->AddStream(this);
}
// ================================================================================================
@@ -76,10 +73,7 @@ bool Stream::Create() {
// ================================================================================================
void Stream::Destroy(hip::Stream* stream) {
{
amd::ScopedLock lock(streamSetLock);
streamSet.erase(stream);
}
stream->device_->RemoveStream(stream);
stream->release();
}
@@ -95,11 +89,12 @@ bool isValid(hipStream_t& stream) {
}
hip::Stream* s = reinterpret_cast<hip::Stream*>(stream);
amd::ScopedLock lock(streamSetLock);
if (streamSet.find(s) == streamSet.end()) {
return false;
for (auto& device : g_devices) {
if (device->StreamExists(s)) {
return true;
}
}
return true;
return false;
}
// ================================================================================================
@@ -122,53 +117,17 @@ int Stream::DeviceId(const hipStream_t hStream) {
}
// ================================================================================================
void Stream::SyncAllStreams(int deviceId, bool cpu_wait) {
// Make a local copy to avoid stalls for GPU finish with multiple threads
std::vector<hip::Stream*> streams;
streams.reserve(streamSet.size());
{
amd::ScopedLock lock(streamSetLock);
for (auto it : streamSet) {
if (it->DeviceId() == deviceId) {
streams.push_back(it);
it->retain();
}
}
}
for (auto it : streams) {
it->finish(cpu_wait);
it->release();
}
// Release freed memory for all memory pools on the device
g_devices[deviceId]->ReleaseFreedMemory();
}
// ================================================================================================
bool Stream::StreamCaptureBlocking() {
amd::ScopedLock lock(streamSetLock);
for (auto& it : streamSet) {
if (it->GetCaptureStatus() == hipStreamCaptureStatusActive && it->Flags() != hipStreamNonBlocking) {
for (auto& device : g_devices) {
if (device->StreamCaptureBlocking()) {
return true;
}
}
return false;
}
void Stream::destroyAllStreams(int deviceId) {
std::vector<Stream*> toBeDeleted;
{
amd::ScopedLock lock(streamSetLock);
for (auto& it : streamSet) {
if (it->Null() == false && it->DeviceId() == deviceId) {
toBeDeleted.push_back(it);
}
}
}
for (auto& it : toBeDeleted) {
hip::Stream::Destroy(it);
}
}
bool Stream::StreamCaptureOngoing(hipStream_t hStream) {
hip::Stream* s = reinterpret_cast<hip::Stream*>(hStream);
// Allow capture to be less restrictive one one changes the stream capture interaction
@@ -188,80 +147,6 @@ bool Stream::StreamCaptureOngoing(hipStream_t hStream) {
}
}
bool Stream::existsActiveStreamForDevice(hip::Device* device) {
amd::ScopedLock lock(streamSetLock);
for (const auto& active_stream : streamSet) {
if ((active_stream->GetDevice() == device) &&
active_stream->GetQueueStatus()) {
return true;
}
}
return false;
}
// ================================================================================================
void iHipWaitActiveStreams(hip::Stream* blocking_stream, bool wait_null_stream) {
amd::Command::EventWaitList eventWaitList(0);
bool submitMarker = 0;
auto waitForStream = [&submitMarker,
&eventWaitList](hip::Stream* stream) {
if (amd::Command *command = stream->getLastQueuedCommand(true)) {
amd::Event &event = command->event();
// Check HW status of the ROCcrl event.
// Note: not all ROCclr modes support HW status
bool ready = stream->device().IsHwEventReady(event);
if (!ready) {
ready = (command->status() == CL_COMPLETE);
}
submitMarker |= stream->vdev()->isFenceDirty();
// Check the current active status
if (!ready) {
command->notifyCmdQueue();
eventWaitList.push_back(command);
} else {
command->release();
}
}
};
if (wait_null_stream) {
if (hip::Stream* null_stream = blocking_stream->GetDevice()->GetNullStream()) {
waitForStream(null_stream);
}
} else {
amd::ScopedLock lock(streamSetLock);
for (const auto& active_stream : streamSet) {
// If it's the current device
if ((&active_stream->device() == &blocking_stream->device()) &&
// Make sure it's a default stream
((active_stream->Flags() & hipStreamNonBlocking) == 0) &&
// and it's not the current stream
(active_stream != blocking_stream)) {
// Get the last valid command
waitForStream(active_stream);
}
}
}
// Check if we have to wait anything
if (eventWaitList.size() > 0 || submitMarker) {
amd::Command* command = new amd::Marker(*blocking_stream, kMarkerDisableFlush, eventWaitList);
if (command != nullptr) {
command->enqueue();
command->release();
}
}
// Release all active commands. It's safe after the marker was enqueued
for (const auto& it : eventWaitList) {
it->release();
}
}
// ================================================================================================
void CL_CALLBACK ihipStreamCallback(cl_event event, cl_int command_exec_status, void* user_data) {
StreamCallback* cbo = reinterpret_cast<StreamCallback*>(user_data);