diff --git a/include/hip/amd_detail/hip_runtime_api.h b/include/hip/amd_detail/hip_runtime_api.h index 3bdc20a12a..e179a667c1 100644 --- a/include/hip/amd_detail/hip_runtime_api.h +++ b/include/hip/amd_detail/hip_runtime_api.h @@ -98,9 +98,9 @@ typedef struct hipIpcMemHandle_st { char reserved[HIP_IPC_HANDLE_SIZE]; } hipIpcMemHandle_t; -// TODO: IPC event handle currently unsupported -struct ihipIpcEventHandle_t; -typedef struct ihipIpcEventHandle_t* hipIpcEventHandle_t; +typedef struct hipIpcEventHandle_st { + char reserved[HIP_IPC_HANDLE_SIZE]; +} hipIpcEventHandle_t; typedef struct ihipModule_t* hipModule_t; diff --git a/rocclr/CMakeLists.txt b/rocclr/CMakeLists.txt index 471c9dd4b6..b5e7ed66af 100755 --- a/rocclr/CMakeLists.txt +++ b/rocclr/CMakeLists.txt @@ -217,7 +217,6 @@ if(__HIP_ENABLE_PCH) endif() target_link_libraries(amdhip64 PRIVATE amdrocclr_static Threads::Threads dl hsa-runtime64::hsa-runtime64) - add_custom_command(TARGET amdhip64 POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy ${PROJECT_BINARY_DIR}/.hipInfo ${PROJECT_BINARY_DIR}/lib/.hipInfo) add_custom_command(TARGET amdhip64 POST_BUILD COMMAND cp -rf diff --git a/rocclr/hip_device_runtime.cpp b/rocclr/hip_device_runtime.cpp index fe27f0b54b..f469bcac60 100755 --- a/rocclr/hip_device_runtime.cpp +++ b/rocclr/hip_device_runtime.cpp @@ -502,22 +502,6 @@ hipError_t hipGetDeviceFlags ( unsigned int* flags ) { HIP_RETURN(hipSuccess); } -hipError_t hipIpcGetEventHandle ( hipIpcEventHandle_t* handle, hipEvent_t event ) { - HIP_INIT_API(hipIpcGetEventHandle, handle, event); - - assert(0 && "Unimplemented"); - - HIP_RETURN(hipErrorNotSupported); -} - -hipError_t hipIpcOpenEventHandle ( hipEvent_t* event, hipIpcEventHandle_t handle ) { - HIP_INIT_API(hipIpcOpenEventHandle, event, handle); - - assert(0 && "Unimplemented"); - - HIP_RETURN(hipErrorNotSupported); -} - hipError_t hipSetDevice ( int device ) { HIP_INIT_API(hipSetDevice, device); diff --git a/rocclr/hip_event.cpp b/rocclr/hip_event.cpp index 4d93f0740b..45879b52b8 100755 --- a/rocclr/hip_event.cpp +++ b/rocclr/hip_event.cpp @@ -21,6 +21,17 @@ #include #include "hip_event.hpp" +#include +#include +#include +#include + +void ipcEventCallback(hipStream_t stream, hipError_t status, void* user_data) +{ + std::atomic *signal = reinterpret_cast*>(user_data); + signal->store(0); + return; +} namespace hip { @@ -179,9 +190,13 @@ hipError_t ihipEventCreateWithFlags(hipEvent_t* event, unsigned flags) { if (event == nullptr) { return hipErrorInvalidValue; } - +#if !defined(_MSC_VER) + unsigned supportedFlags = hipEventDefault | hipEventBlockingSync | hipEventDisableTiming | + hipEventReleaseToDevice | hipEventReleaseToSystem | hipEventInterprocess; +#else unsigned supportedFlags = hipEventDefault | hipEventBlockingSync | hipEventDisableTiming | hipEventReleaseToDevice | hipEventReleaseToSystem; +#endif const unsigned releaseFlags = (hipEventReleaseToDevice | hipEventReleaseToSystem); const bool illegalFlags = @@ -190,7 +205,6 @@ hipError_t ihipEventCreateWithFlags(hipEvent_t* event, unsigned flags) { if (!illegalFlags) { hip::Event* e = new hip::Event(flags); - if (e == nullptr) { return hipErrorOutOfMemory; } @@ -208,19 +222,27 @@ hipError_t ihipEventQuery(hipEvent_t event) { } hip::Event* e = reinterpret_cast(event); - - return e->query(); + if ((e->flags & hipEventInterprocess) && (e->ipc_evt_.ipc_shmem_)) { + int prev_read_idx = e->ipc_evt_.ipc_shmem_->read_index; + if (prev_read_idx > 0) { + int offset = (prev_read_idx % IPC_SIGNALS_PER_EVENT); + while ((e->ipc_evt_.ipc_shmem_->read_index < prev_read_idx + IPC_SIGNALS_PER_EVENT) + && (e->ipc_evt_.ipc_shmem_->signal[offset] != 0)) { + } + } + return hipSuccess; + } else { + return e->query(); + } } hipError_t hipEventCreateWithFlags(hipEvent_t* event, unsigned flags) { HIP_INIT_API(hipEventCreateWithFlags, event, flags); - HIP_RETURN(ihipEventCreateWithFlags(event, flags), *event); } hipError_t hipEventCreate(hipEvent_t* event) { HIP_INIT_API(hipEventCreate, event); - HIP_RETURN(ihipEventCreateWithFlags(event, 0), *event); } @@ -231,8 +253,15 @@ hipError_t hipEventDestroy(hipEvent_t event) { HIP_RETURN(hipErrorInvalidHandle); } - delete reinterpret_cast(event); - + hip::Event* e = reinterpret_cast(event); + if ((e->flags & hipEventInterprocess) && (e->ipc_evt_.ipc_shmem_)) { + int owners = -- e->ipc_evt_.ipc_shmem_->owners; + if (!amd::Os::MemoryUnmapFile(e->ipc_evt_.ipc_shmem_,sizeof(hip::ihipIpcEventShmem_t))) { + HIP_RETURN(hipErrorInvalidHandle); + } + } else { + delete e; + } HIP_RETURN(hipSuccess); } @@ -254,6 +283,24 @@ hipError_t hipEventElapsedTime(float *ms, hipEvent_t start, hipEvent_t stop) { } // ================================================================================================ +bool createIpcEventShmemIfNeeded(hip::Event::ihipIpcEvent_t& ipc_evt) { + char name_template[] = "/tmp/eventXXXXXX"; + int temp_fd = mkstemp(name_template); + ipc_evt.ipc_name_ = name_template; + ipc_evt.ipc_name_.replace(0, 5, "/hip_"); + if (!amd::Os::MemoryMapFileTruncated(ipc_evt.ipc_name_.c_str(), + const_cast (reinterpret_cast(&(ipc_evt.ipc_shmem_))), sizeof(hip::ihipIpcEventShmem_t))) { + return false; + } + ipc_evt.ipc_shmem_->owners = 1; + ipc_evt.ipc_shmem_->read_index = -1; + ipc_evt.ipc_shmem_->write_index = 0; + for (uint32_t sig_idx = 0; sig_idx < IPC_SIGNALS_PER_EVENT; ++sig_idx) { + ipc_evt.ipc_shmem_->signal[sig_idx] = 0; + } + return true; +} + hipError_t hipEventRecord(hipEvent_t event, hipStream_t stream) { HIP_INIT_API(hipEventRecord, event, stream); @@ -263,8 +310,40 @@ hipError_t hipEventRecord(hipEvent_t event, hipStream_t stream) { hip::Event* e = reinterpret_cast(event); amd::HostQueue* queue = hip::getQueue(stream); - - e->addMarker(queue, nullptr, true); + bool isRecorded = e->isRecorded(); + if ((e->flags & hipEventInterprocess) && !isRecorded) { + amd::Command* command = queue->getLastQueuedCommand(true); + if (command == nullptr) { + command = new amd::Marker(*queue, kMarkerDisableFlush); + command->enqueue(); + } + e->addMarker(queue, command, true); + amd::Event& tEvent = command->event(); + createIpcEventShmemIfNeeded(e->ipc_evt_); + int write_index = e->ipc_evt_.ipc_shmem_->write_index++; + int offset = write_index % IPC_SIGNALS_PER_EVENT; + while (e->ipc_evt_.ipc_shmem_->signal[offset] != 0) { + amd::Os::sleep(1); + } + // Lock signal. + e->ipc_evt_.ipc_shmem_->signal[offset] = 1; + std::atomic *signal = &e->ipc_evt_.ipc_shmem_->signal[offset]; + StreamCallback* cbo = new StreamCallback(stream, + reinterpret_cast (ipcEventCallback), signal, command); + command->enqueue(); + if (!tEvent.setCallback(CL_COMPLETE, ihipStreamCallback,cbo)) { + command->release(); + return hipErrorInvalidHandle; + } + tEvent.notifyCmdQueue(); + // Update read index to indicate new signal. + int expected = write_index - 1; + while (!e->ipc_evt_.ipc_shmem_->read_index.compare_exchange_weak(expected, write_index)) { + amd::Os::sleep(1); + } + } else { + e->addMarker(queue, nullptr, true); + } HIP_RETURN(hipSuccess); } @@ -277,12 +356,72 @@ hipError_t hipEventSynchronize(hipEvent_t event) { } hip::Event* e = reinterpret_cast(event); - - HIP_RETURN(e->synchronize()); + if ((e->flags & hipEventInterprocess) && (e->ipc_evt_.ipc_shmem_)) { + int prev_read_idx = e->ipc_evt_.ipc_shmem_->read_index; + if (prev_read_idx > 0) { + int offset = (prev_read_idx % IPC_SIGNALS_PER_EVENT); + while ((e->ipc_evt_.ipc_shmem_->read_index < prev_read_idx + IPC_SIGNALS_PER_EVENT) + && (e->ipc_evt_.ipc_shmem_->signal[offset] != 0)) { + amd::Os::sleep(1); + } + } + HIP_RETURN(hipSuccess); + } else { + HIP_RETURN(e->synchronize()); + } } hipError_t hipEventQuery(hipEvent_t event) { HIP_INIT_API(hipEventQuery, event); - HIP_RETURN(ihipEventQuery(event)); } + +hipError_t hipIpcGetEventHandle(hipIpcEventHandle_t* handle, hipEvent_t event) { + HIP_INIT_API(hipIpcGetEventHandle, handle, event); +#if !defined(_MSC_VER) + if (handle == nullptr || event == nullptr) { + HIP_RETURN(hipErrorInvalidValue); + } + hip::Event* e = reinterpret_cast(event); + if (!(e->flags & hipEventInterprocess)) { + HIP_RETURN(hipErrorInvalidConfiguration); + } + if (!createIpcEventShmemIfNeeded(e->ipc_evt_)) { + HIP_RETURN(hipErrorInvalidConfiguration); + } + ihipIpcEventHandle_t* iHandle = reinterpret_cast(handle); + memset(iHandle->shmem_name, 0, HIP_IPC_HANDLE_SIZE); + e->ipc_evt_.ipc_name_.copy(iHandle->shmem_name, std::string::npos); + HIP_RETURN(hipSuccess); +#else + assert(0 && "Unimplemented"); + HIP_RETURN(hipErrorNotSupported); +#endif +} + +hipError_t hipIpcOpenEventHandle(hipEvent_t* event, hipIpcEventHandle_t handle) { + HIP_INIT_API(NONE, event, handle); +#if !defined(_MSC_VER) + hipError_t hip_err = hipSuccess; + if (event == nullptr) { + HIP_RETURN(hipErrorInvalidValue); + } + hip_err = ihipEventCreateWithFlags(event, hipEventDisableTiming | hipEventInterprocess); + if (hip_err != hipSuccess) { + HIP_RETURN(hip_err); + } + hip::Event* e = reinterpret_cast(*event); + ihipIpcEventHandle_t* iHandle = reinterpret_cast(&handle); + hip::Event::ihipIpcEvent_t& ipc_evt = e->ipc_evt_; + ipc_evt.ipc_name_ = iHandle->shmem_name; + if (!amd::Os::MemoryMapFileTruncated(ipc_evt.ipc_name_.c_str(), + (const void**) &(ipc_evt.ipc_shmem_), sizeof(hip::ihipIpcEventShmem_t))) { + HIP_RETURN(hipErrorInvalidValue); + } + ipc_evt.ipc_shmem_->owners += 1; + HIP_RETURN(hipSuccess); +#else + assert(0 && "Unimplemented"); + HIP_RETURN(hipErrorNotSupported); +#endif +} diff --git a/rocclr/hip_event.hpp b/rocclr/hip_event.hpp index c8b17cbe93..b26088fcc9 100644 --- a/rocclr/hip_event.hpp +++ b/rocclr/hip_event.hpp @@ -24,6 +24,23 @@ #include "hip_internal.hpp" #include "thread/monitor.hpp" +// Internal structure for stream callback handler +class StreamCallback { + public: + StreamCallback(hipStream_t stream, hipStreamCallback_t callback, void* userData, + amd::Command* command) + : stream_(stream), callBack_(callback), + userData_(userData), command_(command) { + }; + hipStream_t stream_; + hipStreamCallback_t callBack_; + void* userData_; + amd::Command* command_; +}; + +void CL_CALLBACK ihipStreamCallback(cl_event event, cl_int command_exec_status, void* user_data); + + namespace hip { class ProfileMarker: public amd::Marker { @@ -37,6 +54,14 @@ public: } }; +#define IPC_SIGNALS_PER_EVENT 32 +typedef struct ihipIpcEventShmem_s { + std::atomic owners; + std::atomic read_index; + std::atomic write_index; + std::atomic signal[IPC_SIGNALS_PER_EVENT]; +} ihipIpcEventShmem_t; + class Event { public: Event(unsigned int flags) : flags(flags), lock_("hipEvent_t", true), @@ -57,9 +82,21 @@ public: hipError_t streamWait(amd::HostQueue* queue, uint flags); void addMarker(amd::HostQueue* queue, amd::Command* command, bool record); - + bool isRecorded() { return recorded_; } amd::Monitor& lock() { return lock_; } + //IPC Events + struct ihipIpcEvent_t { + std::string ipc_name_; + int ipc_fd_; + ihipIpcEventShmem_t* ipc_shmem_; + ihipIpcEvent_t(): ipc_name_("dummy"), ipc_fd_(0), ipc_shmem_(nullptr) { + } + void setipcname(const char* name) { + ipc_name_ = std::string(name); + } + }; + ihipIpcEvent_t ipc_evt_; private: amd::Monitor lock_; amd::HostQueue* stream_; diff --git a/rocclr/hip_formatting.hpp b/rocclr/hip_formatting.hpp index 8c26249e03..83f1467a7b 100644 --- a/rocclr/hip_formatting.hpp +++ b/rocclr/hip_formatting.hpp @@ -840,4 +840,14 @@ inline std::ostream& operator<<(std::ostream& os, const hipExtent& s) { << s.depth << '}'; return os; -} \ No newline at end of file +} + +inline std::ostream& operator<<(std::ostream& os, const hipIpcEventHandle_t& s) { + //TODO fill in later + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const hipIpcEventHandle_t* s) { + //TODO fill in later + return os; +} diff --git a/rocclr/hip_hcc.def.in b/rocclr/hip_hcc.def.in index f6c93581a4..86cd1a21f7 100755 --- a/rocclr/hip_hcc.def.in +++ b/rocclr/hip_hcc.def.in @@ -76,6 +76,8 @@ hipInit hipIpcCloseMemHandle hipIpcGetMemHandle hipIpcOpenMemHandle +hipIpcGetEventHandle +hipIpcOpenEventHandle hipMalloc hipMalloc3D hipMalloc3DArray diff --git a/rocclr/hip_hcc.map.in b/rocclr/hip_hcc.map.in index 82ddc72323..236bee0489 100755 --- a/rocclr/hip_hcc.map.in +++ b/rocclr/hip_hcc.map.in @@ -78,6 +78,8 @@ global: hipIpcCloseMemHandle; hipIpcGetMemHandle; hipIpcOpenMemHandle; + hipIpcGetEventHandle; + hipIpcOpenEventHandle; hipMalloc; hipMalloc3D; hipMalloc3DArray; diff --git a/rocclr/hip_internal.hpp b/rocclr/hip_internal.hpp index 857ae8e07f..6ea672666b 100755 --- a/rocclr/hip_internal.hpp +++ b/rocclr/hip_internal.hpp @@ -56,6 +56,15 @@ typedef struct ihipIpcMemHandle_st { char reserved[IHIP_IPC_MEM_RESERVED_SIZE]; } ihipIpcMemHandle_t; +#define IHIP_IPC_EVENT_HANDLE_SIZE 32 +#define IHIP_IPC_EVENT_RESERVED_SIZE LP64_SWITCH(28,24) +typedef struct ihipIpcEventHandle_st { + //hsa_amd_ipc_signal_t ipc_handle; ///< ipc signal handle on ROCr + //char ipc_handle[IHIP_IPC_EVENT_HANDLE_SIZE]; + //char reserved[IHIP_IPC_EVENT_RESERVED_SIZE]; + char shmem_name[IHIP_IPC_EVENT_HANDLE_SIZE]; +}ihipIpcEventHandle_t; + #ifdef _WIN32 inline int getpid() { return _getpid(); } #endif diff --git a/rocclr/hip_stream.cpp b/rocclr/hip_stream.cpp index 16a64dc2a9..604f3ea33d 100755 --- a/rocclr/hip_stream.cpp +++ b/rocclr/hip_stream.cpp @@ -28,21 +28,6 @@ extern api_callbacks_table_t callbacks_table; static amd::Monitor streamSetLock{"Guards global stream set"}; static std::unordered_set streamSet; - -// Internal structure for stream callback handler -class StreamCallback { - public: - StreamCallback(hipStream_t stream, hipStreamCallback_t callback, void* userData, - amd::Command* command) - : stream_(stream), callBack_(callback), - userData_(userData), command_(command) { - }; - hipStream_t stream_; - hipStreamCallback_t callBack_; - void* userData_; - amd::Command* command_; -}; - namespace hip { // ================================================================================================ @@ -306,6 +291,20 @@ hipError_t hipStreamDestroy(hipStream_t stream) { HIP_RETURN(hipSuccess); } +struct CallbackData { + int previous_read_index; + hip::ihipIpcEventShmem_t *shmem; +}; + +void WaitThenDecrementSignal(hipStream_t stream, hipError_t status, void* user_data){ + CallbackData *data = (CallbackData *)user_data; + int offset = data->previous_read_index % IPC_SIGNALS_PER_EVENT; + while (data->shmem->read_index < data->previous_read_index+IPC_SIGNALS_PER_EVENT + && data->shmem->signal[offset] != 0) { + } + delete data; +} + // ================================================================================================ hipError_t hipStreamWaitEvent(hipStream_t stream, hipEvent_t event, unsigned int flags) { HIP_INIT_API(hipStreamWaitEvent, stream, event, flags); @@ -317,8 +316,25 @@ hipError_t hipStreamWaitEvent(hipStream_t stream, hipEvent_t event, unsigned int amd::HostQueue* queue = hip::getQueue(stream); hip::Event* e = reinterpret_cast(event); - - HIP_RETURN(e->streamWait(queue, flags)); + if (e->flags & hipEventInterprocess) { + amd::Command* command = queue->getLastQueuedCommand(true); + if (command == nullptr) { + command = new amd::Marker(*queue, false); + command->enqueue(); + } + auto t{new CallbackData{e->ipc_evt_.ipc_shmem_->read_index, e->ipc_evt_.ipc_shmem_}}; + StreamCallback* cbo = new StreamCallback(stream, + reinterpret_cast (WaitThenDecrementSignal), t, command); + command->enqueue(); + if (!command->setCallback(CL_COMPLETE, ihipStreamCallback,cbo)) { + command->release(); + return hipErrorInvalidHandle; + } + command->awaitCompletion(); + HIP_RETURN(hipSuccess); + } else { + HIP_RETURN(e->streamWait(queue, flags)); + } } // ================================================================================================ @@ -482,4 +498,4 @@ hipError_t hipExtStreamGetCUMask(hipStream_t stream, uint32_t cuMaskSize, uint32 } } HIP_RETURN(hipSuccess); -} \ No newline at end of file +} diff --git a/tests/src/runtimeApi/event/hipEventIpc.cpp b/tests/src/runtimeApi/event/hipEventIpc.cpp index 5a0992dc20..f5aa92b719 100644 --- a/tests/src/runtimeApi/event/hipEventIpc.cpp +++ b/tests/src/runtimeApi/event/hipEventIpc.cpp @@ -24,7 +24,7 @@ THE SOFTWARE. // forces synchronization : set /* HIT_START - * BUILD: %t %s ../../test_common.cpp EXCLUDE_HIP_PLATFORM nvidia EXCLUDE_HIP_RUNTIME rocclr + * BUILD: %t %s ../../test_common.cpp EXCLUDE_HIP_PLATFORM nvidia * TEST: %t --iterations 10 * HIT_END */