SWDEV-380703 - sync all streams individually

Avoid syncing blocking streams with the default stream,
since that introduces extra command dependencies and
doesn't allow to destroy memory after last submission

Change-Id: I618e9bd2091c4cf9157125612d8c4759030c5a80


[ROCm/clr commit: 1e88d2c52f]
This commit is contained in:
German
2023-03-22 19:14:15 -04:00
committed by Maneesh Gupta
orang tua 235eb22491
melakukan 56dedad98b
7 mengubah file dengan 49 tambahan dan 71 penghapusan
+3 -3
Melihat File
@@ -26,8 +26,8 @@
namespace hip {
// ================================================================================================
hip::Stream* Device::NullStream(bool skip_alloc) {
if (null_stream_ == nullptr && !skip_alloc) {
hip::Stream* Device::NullStream() {
if (null_stream_ == nullptr) {
null_stream_ = new Stream(this, Stream::Priority::Normal, 0, true);
}
@@ -138,7 +138,7 @@ Device::~Device() {
}
if (null_stream_!= nullptr) {
null_stream_->release();
hip::Stream::Destroy(null_stream_);
}
}
@@ -513,23 +513,9 @@ hipError_t hipDeviceSetSharedMemConfig ( hipSharedMemConfig config ) {
HIP_RETURN(hipErrorNotSupported);
}
hipError_t hipDeviceSynchronize ( void ) {
hipError_t hipDeviceSynchronize() {
HIP_INIT_API(hipDeviceSynchronize);
hip::Stream* stream = hip::getNullStream();
if (!stream) {
HIP_RETURN(hipErrorOutOfMemory);
}
if (hip::Stream::StreamCaptureOngoing(reinterpret_cast<hipStream_t>(stream)) == true) {
HIP_RETURN(hipErrorStreamCaptureUnsupported);
}
stream->finish();
hip::Stream::syncNonBlockingStreams(hip::getCurrentDevice()->deviceId());
hip::Stream::SyncAllStreams(hip::getCurrentDevice()->deviceId());
HIP_RETURN(hipSuccess);
}
@@ -465,7 +465,7 @@ hipError_t hipGraphExec::CreateStreams(uint32_t num_streams) {
hip::Stream::Priority::Normal, hipStreamNonBlocking);
if (stream == nullptr || !stream->Create()) {
if (stream != nullptr) {
stream->release();
hip::Stream::Destroy(stream);
}
ClPrint(amd::LOG_ERROR, amd::LOG_CODE, "[hipGraph] Failed to create parallel stream!\n");
return hipErrorOutOfMemory;
@@ -578,7 +578,7 @@ struct hipGraphExec {
// terminated after it complete execution
for (auto stream : parallel_streams_) {
if (stream != nullptr) {
stream->release();
hip::Stream::Destroy(stream);
}
}
for (auto it = clonedNodes_.begin(); it != clonedNodes_.end(); it++) delete it->second;
+6 -6
Melihat File
@@ -259,7 +259,7 @@ namespace hip {
std::unordered_set<hipEvent_t> captureEvents_;
unsigned long long captureID_;
static inline CommandQueue::Priority convertToQueuePriority(Priority p){
static inline CommandQueue::Priority convertToQueuePriority(Priority p) {
return p == Priority::High ? amd::CommandQueue::Priority::High : p == Priority::Low ?
amd::CommandQueue::Priority::Low : amd::CommandQueue::Priority::Normal;
}
@@ -271,7 +271,6 @@ namespace hip {
/// Creates the hip stream object, including AMD host queue
bool Create();
virtual bool terminate() override;
/// Get device ID associated with the current stream;
int DeviceId() const;
/// Get HIP device associated with the stream
@@ -289,8 +288,8 @@ namespace hip {
/// Returns the CU mask for the current stream
const std::vector<uint32_t> GetCUMask() const { return cuMask_; }
/// Sync all non-blocking streams
static void syncNonBlockingStreams(int deviceId);
/// Sync all streams
static void SyncAllStreams(int deviceId);
/// Check whether any blocking stream running
static bool StreamCaptureBlocking();
@@ -298,6 +297,8 @@ namespace hip {
/// Destroy all streams on a given device
static void destroyAllStreams(int deviceId);
static void Destroy(hip::Stream* stream);
/// Check Stream Capture status to make sure it is done
static bool StreamCaptureOngoing(hipStream_t hStream);
@@ -450,10 +451,9 @@ namespace hip {
void setFlags(unsigned int flags) { flags_ = flags; }
void Reset();
hip::Stream* NullStream(bool skip_alloc = false);
hip::Stream* NullStream();
Stream* GetNullStream();
bool GetActiveStatus() {
amd::ScopedLock lock(lock_);
if (isActive_) return true;
+13 -32
Melihat File
@@ -75,14 +75,8 @@ hipError_t ihipFree(void *ptr) {
if (memory_object != nullptr) {
// Wait on the device, associated with the current memory object during allocation
auto device_id = memory_object->getUserData().deviceId;
auto dev = g_devices[device_id];
// Skip stream allocation, since if it wasn't allocated until free, then the device wasn't used
constexpr bool SkipStreamAlloc = true;
hip::Stream* stream = dev->NullStream(SkipStreamAlloc);
if (stream != nullptr) {
stream->finish();
}
hip::Stream::syncNonBlockingStreams(device_id);
hip::Stream::SyncAllStreams(device_id);
// Find out if memory belongs to any memory pool
if (!g_devices[device_id]->FreeMemory(memory_object, nullptr)) {
// External mem is not svm.
@@ -710,14 +704,11 @@ hipError_t ihipArrayDestroy(hipArray* array) {
return hipErrorInvalidValue;
}
for (auto& dev : g_devices) {
hip::Stream* stream = dev->NullStream(true);
if (stream != nullptr) {
stream->finish();
}
}
auto image = as_amd(memObj);
// Wait on the device, associated with the current memory object during allocation
hip::Stream::SyncAllStreams(image->getUserData().deviceId);
image->release();
as_amd(memObj)->release();
delete array;
return hipSuccess;
}
@@ -984,7 +975,8 @@ amd::Image* ihipImageCreate(const cl_channel_order channelOrder,
delete image;
return nullptr;
}
// Save device ID image was creted on
image->getUserData().deviceId = hip::getCurrentDevice()->deviceId();
return image;
}
@@ -1204,12 +1196,7 @@ hipError_t ihipHostUnregister(void* hostPtr) {
if (mem != nullptr) {
// Wait on the device, associated with the current memory object during allocation
auto device_id = mem->getUserData().deviceId;
hip::Stream* stream = g_devices[device_id]->NullStream(true);
if (stream != nullptr) {
stream->finish();
}
hip::Stream::SyncAllStreams(mem->getUserData().deviceId);
amd::MemObjMap::RemoveMemObj(hostPtr);
for (const auto& device: g_devices) {
@@ -3992,7 +3979,6 @@ hipError_t ihipMipmapArrayCreate(hipMipmappedArray_t* mipmapped_array_pptr,
}
hipError_t ihipMipmappedArrayDestroy(hipMipmappedArray_t mipmapped_array_ptr) {
if (mipmapped_array_ptr == nullptr) {
return hipErrorInvalidValue;
}
@@ -4002,17 +3988,12 @@ hipError_t ihipMipmappedArrayDestroy(hipMipmappedArray_t mipmapped_array_ptr) {
return hipErrorInvalidValue;
}
for (auto& dev : g_devices) {
hip::Stream* stream = dev->NullStream(true);
if (stream != nullptr) {
stream->finish();
}
}
as_amd(mem_obj)->release();
auto image = as_amd(mem_obj);
// Wait on the device, associated with the current memory object during allocation
hip::Stream::SyncAllStreams(image->getUserData().deviceId);
image->release();
delete mipmapped_array_ptr;
return hipSuccess;
}
+23 -12
Melihat File
@@ -75,12 +75,12 @@ bool Stream::Create() {
}
// ================================================================================================
bool Stream::terminate() {
void Stream::Destroy(hip::Stream* stream) {
{
amd::ScopedLock lock(streamSetLock);
streamSet.erase(this);
streamSet.erase(stream);
}
return HostQueue::terminate();
stream->release();
}
// ================================================================================================
@@ -107,6 +107,7 @@ int Stream::DeviceId() const {
return device_->deviceId();
}
// ================================================================================================
int Stream::DeviceId(const hipStream_t hStream) {
// Copying locally into non-const variable just to get const away
hipStream_t inputStream = hStream;
@@ -120,17 +121,27 @@ int Stream::DeviceId(const hipStream_t hStream) {
return deviceId;
}
void Stream::syncNonBlockingStreams(int deviceId) {
amd::ScopedLock lock(streamSetLock);
for (auto& it : streamSet) {
if (it->Flags() & hipStreamNonBlocking) {
// ================================================================================================
void Stream::SyncAllStreams(int deviceId) {
// Make a local copy to avoid stalls for GPU finish with multiple threads
std::vector<hip::Stream*> streams;
streams.reserve(streamSet.size());
{
amd::ScopedLock lock(streamSetLock);
for (auto it : streamSet) {
if (it->DeviceId() == deviceId) {
it->finish();
streams.push_back(it);
it->retain();
}
}
}
for (auto it : streams) {
it->finish();
it->release();
}
}
// ================================================================================================
bool Stream::StreamCaptureBlocking() {
amd::ScopedLock lock(streamSetLock);
for (auto& it : streamSet) {
@@ -152,7 +163,7 @@ void Stream::destroyAllStreams(int deviceId) {
}
}
for (auto& it : toBeDeleted) {
it->release();
hip::Stream::Destroy(it);
}
}
@@ -271,7 +282,7 @@ static hipError_t ihipStreamCreate(hipStream_t* stream,
return hipErrorOutOfMemory;
}
else if (!hStream->Create()) {
hStream->release();
hip::Stream::Destroy(hStream);
return hipErrorOutOfMemory;
}
@@ -292,7 +303,7 @@ stream_per_thread::stream_per_thread() {
stream_per_thread::~stream_per_thread() {
for (auto &stream:m_streams) {
if (stream != nullptr && hip::isValid(stream)) {
reinterpret_cast<hip::Stream*>(stream)->release();
hip::Stream::Destroy(reinterpret_cast<hip::Stream*>(stream));
stream = nullptr;
}
}
@@ -482,7 +493,7 @@ hipError_t hipStreamDestroy(hipStream_t stream) {
if (l_it != hip::tls.capture_streams_.end()) {
hip::tls.capture_streams_.erase(l_it);
}
s->release();
hip::Stream::Destroy(s);
HIP_RETURN(hipSuccess);
}