Clean-up the list of blocking streams

- Insert the stream into the list on the host queue creation,
instead of stream creation

Change-Id: Ib25053019f7df97e5bc786922a6587b9514852d3
Bu işleme şunda yer alıyor:
German Andryeyev
2020-04-29 02:11:37 -04:00
işlemeyi yapan: Christophe Paquot
ebeveyn 89b9befe42
işleme bc65ca64fc
3 değiştirilmiş dosya ile 63 ekleme ve 47 silme
+1 -5
Dosyayı Görüntüle
@@ -217,18 +217,14 @@ hipError_t hipEventRecord(hipEvent_t event, hipStream_t stream) {
HIP_RETURN(hipErrorInvalidHandle);
}
hip::Event* e = reinterpret_cast<hip::Event*>(event);
hip::Stream* s = reinterpret_cast<hip::Stream*>(stream);
amd::HostQueue* queue = hip::getQueue(stream);
amd::Command* command = queue->getLastQueuedCommand(true);
if (command == nullptr) {
command = new amd::Marker(*queue, false);
command->enqueue();
}
hip::Event* e = reinterpret_cast<hip::Event*>(event);
e->addMarker(queue, command);
HIP_RETURN(hipSuccess);
+3 -3
Dosyayı Görüntüle
@@ -90,10 +90,10 @@ namespace hip {
public:
Stream(Device* dev, amd::CommandQueue::Priority p, unsigned int f = 0, bool null_stream = false);
bool create();
bool Create();
amd::HostQueue* asHostQueue();
void destroy();
void finish() const;
void Destroy();
void Finish() const;
/// Get device ID associated with the current stream;
int DeviceId() const;
/// Returns if stream is null stream
+59 -39
Dosyayı Görüntüle
@@ -42,51 +42,71 @@ class StreamCallback {
namespace hip {
// ================================================================================================
Stream::Stream(hip::Device* dev, amd::CommandQueue::Priority p,
unsigned int f, bool null_stream)
: queue_(nullptr), lock_("Stream Callback lock"), device_(dev),
priority_(p), flags_(f), null_(null_stream) {}
bool Stream::create() {
// ================================================================================================
bool Stream::Create() {
cl_command_queue_properties properties = CL_QUEUE_PROFILING_ENABLE;
queue_ = new amd::HostQueue(*device_->asContext(), *device_->devices()[0], properties,
amd::CommandQueue::RealTimeDisabled, priority_);
assert(queue_ != nullptr);
return queue_->create();
}
amd::HostQueue* Stream::asHostQueue() {
if (queue_ == nullptr) {
if (!create()) {
return nullptr;
} else if (Null()) {
// Make sure the null stream is inserted into the list of default/blocking streams
// Create a host queue
bool result = (queue_ != nullptr) ? queue_->create() : false;
// Insert just created stream into the list of the blocking queues
if (result) {
if (!(flags_ & hipStreamNonBlocking)) {
amd::ScopedLock lock(streamSetLock);
streamSet.insert(this);
}
} else {
Destroy();
}
return result;
}
// ================================================================================================
amd::HostQueue* Stream::asHostQueue() {
// Access to the stream object is lock protected, because possible allocation
amd::ScopedLock l(Lock());
if (queue_ == nullptr) {
// Create the host queue for the first time
if (!Create()) {
return nullptr;
}
}
return queue_;
}
void Stream::destroy() {
// ================================================================================================
void Stream::Destroy() {
if (queue_ != nullptr) {
queue_->release();
queue_ = nullptr;
amd::ScopedLock lock(streamSetLock);
streamSet.erase(this);
}
delete this;
}
void Stream::finish() const {
// ================================================================================================
void Stream::Finish() const {
if (queue_ != nullptr) {
queue_->finish();
}
}
// ================================================================================================
int Stream::DeviceId() const {
return device_->deviceId();
}
};
// ================================================================================================
void iHipWaitActiveStreams(amd::HostQueue* blocking_queue, bool wait_null_stream) {
amd::Command::EventWaitList eventWaitList;
{
@@ -95,12 +115,12 @@ void iHipWaitActiveStreams(amd::HostQueue* blocking_queue, bool wait_null_stream
for (const auto& stream : streamSet) {
amd::HostQueue* active_queue = stream->asHostQueue();
// If it's the current device
if ((active_queue != nullptr) && (&active_queue->device() == &blocking_queue->device()) &&
if ((&active_queue->device() == &blocking_queue->device()) &&
// and it's not the current stream
(active_queue != blocking_queue) &&
// check for a wait on the null stream
(stream->Null() == wait_null_stream)) {
// Get the last valid so command
// Get the last valid command
amd::Command* command = active_queue->getLastQueuedCommand(true);
if ((command != nullptr) &&
// Check the current active status
@@ -126,6 +146,7 @@ void iHipWaitActiveStreams(amd::HostQueue* blocking_queue, bool wait_null_stream
}
}
// ================================================================================================
void CL_CALLBACK ihipStreamCallback(cl_event event, cl_int command_exec_status, void* user_data) {
hipError_t status = hipSuccess;
StreamCallback* cbo = reinterpret_cast<StreamCallback*>(user_data);
@@ -137,18 +158,15 @@ void CL_CALLBACK ihipStreamCallback(cl_event event, cl_int command_exec_status,
delete cbo;
}
static hipError_t ihipStreamCreate(hipStream_t *stream, unsigned int flags, amd::CommandQueue::Priority priority) {
// ================================================================================================
static hipError_t ihipStreamCreate(hipStream_t* stream,
unsigned int flags, amd::CommandQueue::Priority priority) {
hip::Stream* hStream = new hip::Stream(hip::getCurrentDevice(), priority, flags);
if (hStream == nullptr) {
return hipErrorOutOfMemory;
}
if (!(flags & hipStreamNonBlocking)) {
amd::ScopedLock lock(streamSetLock);
streamSet.insert(hStream);
}
*stream = reinterpret_cast<hipStream_t>(hStream);
ClPrint(amd::LOG_INFO, amd::LOG_API, "ihipStreamCreate: %zx", hStream);
@@ -156,18 +174,21 @@ static hipError_t ihipStreamCreate(hipStream_t *stream, unsigned int flags, amd:
return hipSuccess;
}
// ================================================================================================
hipError_t hipStreamCreateWithFlags(hipStream_t *stream, unsigned int flags) {
HIP_INIT_API(hipStreamCreateWithFlags, stream, flags);
HIP_RETURN(ihipStreamCreate(stream, flags, amd::CommandQueue::Priority::Normal));
}
// ================================================================================================
hipError_t hipStreamCreate(hipStream_t *stream) {
HIP_INIT_API(hipStreamCreate, stream);
HIP_RETURN(ihipStreamCreate(stream, hipStreamDefault, amd::CommandQueue::Priority::Normal));
}
// ================================================================================================
hipError_t hipStreamCreateWithPriority(hipStream_t* stream, unsigned int flags, int priority) {
HIP_INIT_API(hipStreamCreateWithPriority, stream, flags, priority);
@@ -180,6 +201,7 @@ hipError_t hipStreamCreateWithPriority(hipStream_t* stream, unsigned int flags,
return HIP_RETURN(ihipStreamCreate(stream, flags, static_cast<amd::CommandQueue::Priority>(priority)));
}
// ================================================================================================
hipError_t hipDeviceGetStreamPriorityRange(int* leastPriority, int* greatestPriority) {
HIP_INIT_API(hipDeviceGetStreamPriorityRange, leastPriority, greatestPriority);
@@ -193,13 +215,12 @@ hipError_t hipDeviceGetStreamPriorityRange(int* leastPriority, int* greatestPrio
return HIP_RETURN(hipSuccess);
}
// ================================================================================================
hipError_t hipStreamGetFlags(hipStream_t stream, unsigned int* flags) {
HIP_INIT_API(hipStreamGetFlags, stream, flags);
hip::Stream* hStream = reinterpret_cast<hip::Stream*>(stream);
if (flags != nullptr && hStream != nullptr) {
*flags = hStream->Flags();
if ((flags != nullptr) && (stream != nullptr)) {
*flags = reinterpret_cast<hip::Stream*>(stream)->Flags();
} else {
HIP_RETURN(hipErrorInvalidValue);
}
@@ -207,15 +228,17 @@ hipError_t hipStreamGetFlags(hipStream_t stream, unsigned int* flags) {
HIP_RETURN(hipSuccess);
}
// ================================================================================================
hipError_t hipStreamSynchronize(hipStream_t stream) {
HIP_INIT_API(hipStreamSynchronize, stream);
amd::HostQueue* hostQueue = hip::getQueue(stream);
hostQueue->finish();
// Wait for the current host queue
hip::getQueue(stream)->finish();
HIP_RETURN(hipSuccess);
}
// ================================================================================================
hipError_t hipStreamDestroy(hipStream_t stream) {
HIP_INIT_API(hipStreamDestroy, stream);
@@ -223,32 +246,27 @@ hipError_t hipStreamDestroy(hipStream_t stream) {
HIP_RETURN(hipErrorInvalidHandle);
}
amd::ScopedLock lock(streamSetLock);
hip::Stream* hStream = reinterpret_cast<hip::Stream*>(stream);
hStream->destroy();
streamSet.erase(hStream);
delete hStream;
reinterpret_cast<hip::Stream*>(stream)->Destroy();
HIP_RETURN(hipSuccess);
}
// ================================================================================================
hipError_t hipStreamWaitEvent(hipStream_t stream, hipEvent_t event, unsigned int flags) {
HIP_INIT_API(hipStreamWaitEvent, stream, event, flags);
amd::HostQueue* queue = hip::getQueue(stream);
if (event == nullptr) {
HIP_RETURN(hipErrorInvalidHandle);
}
amd::HostQueue* queue = hip::getQueue(stream);
hip::Event* e = reinterpret_cast<hip::Event*>(event);
return HIP_RETURN(e->streamWait(queue, flags));
HIP_RETURN(e->streamWait(queue, flags));
}
// ================================================================================================
hipError_t hipStreamQuery(hipStream_t stream) {
HIP_INIT_API(hipStreamQuery, stream);
@@ -256,6 +274,7 @@ hipError_t hipStreamQuery(hipStream_t stream) {
amd::Command* command = hostQueue->getLastQueuedCommand(true);
if (command == nullptr) {
// Nothing was submitted to the queue
HIP_RETURN(hipSuccess);
}
@@ -268,11 +287,12 @@ hipError_t hipStreamQuery(hipStream_t stream) {
HIP_RETURN(status);
}
// ================================================================================================
hipError_t hipStreamAddCallback(hipStream_t stream, hipStreamCallback_t callback, void* userData,
unsigned int flags) {
HIP_INIT_API(hipStreamAddCallback, stream, callback, userData, flags);
amd::HostQueue* hostQueue = reinterpret_cast<hip::Stream*>(stream)->asHostQueue();
amd::HostQueue* hostQueue = hip::getQueue(stream);
amd::Command* command = hostQueue->getLastQueuedCommand(true);
if (command == nullptr) {
amd::Command::EventWaitList eventWaitList;