SWDEV-476512: Further rocsys fixes

Change-Id: I9ae8534a2b6b23f76514920a6c4d39025bc882bc


[ROCm/rocprofiler commit: c4adefccde]
This commit is contained in:
Giovanni LB
2024-08-02 03:20:34 -03:00
کامیت شده توسط Giovanni Baraldi
والد 9915421ec0
کامیت 6defbd0f2e
6فایلهای تغییر یافته به همراه99 افزوده شده و 73 حذف شده
@@ -134,10 +134,12 @@ std::string get_kernel_name(rocprofiler_record_profiler_t& profiler_record) {
class perfetto_plugin_t {
public:
perfetto_plugin_t() {
perfetto_plugin_t(int filename_ext) {
const char* output_dir = getenv("OUTPUT_PATH");
const char* temp_file_name = getenv("OUT_FILE_NAME");
std::string output_file_name = temp_file_name ? std::string(temp_file_name) + "_" : "";
if (filename_ext > 0)
output_file_name += "_run"+std::to_string(filename_ext)+"_";
if (output_dir == nullptr) output_dir = "./";
@@ -193,12 +195,16 @@ class perfetto_plugin_t {
is_valid_ = true;
}
void delete_perfetto_plugin() {
if (is_valid_ && tracing_session_ && internal_buffer_finished.load(std::memory_order_acquire)) {
bool delete_perfetto_plugin() {
if (is_valid_ && tracing_session_)
{
tracing_session_->StopBlocking();
is_valid_ = false;
// close(file_descriptor_);
close(file_descriptor_);
tracing_session_.reset();
return true;
}
return false;
}
const char* GetDomainName(rocprofiler_tracer_activity_domain_t domain) {
@@ -579,7 +585,6 @@ class perfetto_plugin_t {
}
rocprofiler_next_record(begin, &begin, session_id, buffer_id);
}
internal_buffer_finished.exchange(true, std::memory_order_acq_rel);
return 0;
}
@@ -592,8 +597,6 @@ class perfetto_plugin_t {
bool is_valid_{false};
size_t roctx_track_entries_{0};
std::atomic<bool> internal_buffer_finished{false};
// Correlate stream id(s) with correlation id(s) to identify the stream id of every HIP activity
std::unordered_map<uint64_t, uint64_t> stream_ids_;
@@ -644,10 +647,15 @@ int rocprofiler_plugin_initialize(uint32_t rocprofiler_major_version,
rocprofiler_minor_version > ROCPROFILER_VERSION_MINOR)
return -1;
std::lock_guard<std::mutex> lock(writing_lock);
if (perfetto_plugin != nullptr) return -1;
//if (perfetto_plugin != nullptr && perfetto_plugin->IsValid()) return -1;
std::lock_guard<std::mutex> lock(writing_lock);
if (perfetto_plugin != nullptr)
perfetto_plugin->delete_perfetto_plugin();
static int perfetto_init_count = 0;
perfetto_plugin = new perfetto_plugin_t(perfetto_init_count++);
perfetto_plugin = new perfetto_plugin_t();
if (perfetto_plugin->IsValid())
return 0;
@@ -660,9 +668,11 @@ void rocprofiler_plugin_finalize()
{
std::lock_guard<std::mutex> lock(writing_lock);
if (!perfetto_plugin) return;
perfetto_plugin->delete_perfetto_plugin();
// delete perfetto_plugin;
//perfetto_plugin = nullptr;
if (perfetto_plugin->delete_perfetto_plugin())
{
delete perfetto_plugin;
perfetto_plugin = nullptr;
}
}
ROCPROFILER_EXPORT int rocprofiler_plugin_write_buffer_records(
@@ -538,10 +538,7 @@ void Queue::ResetSessionID(rocprofiler_session_id_t id)
{
std::unique_lock<std::shared_mutex> session_id_lock(session_id_mutex);
session_id = id;
if (session_id.handle != 0)
session = rocprofiler::ROCProfiler_Singleton::GetInstance().GetSession(session_id);
else
session = nullptr;
session = nullptr;
}
bool Queue::CheckNeededProfileConfigs()
@@ -618,7 +615,8 @@ void Queue::WriteInterceptor(const void* packets, uint64_t pkt_count, uint64_t u
std::shared_lock<std::shared_mutex> session_id_lock(session_id_mutex);
if (session_id.handle == 0 || session_id.handle != rocprofiler::ROCProfiler_Singleton::GetInstance().GetCurrentSessionId().handle)
if (session == nullptr || session_id.handle == 0 ||
session_id.handle != rocprofiler::ROCProfiler_Singleton::GetInstance().GetCurrentSessionId().handle)
{
session_id_lock.unlock();
CheckNeededProfileConfigs();
@@ -57,7 +57,7 @@ void AttTracer::AddPendingSignals(
kernel_object,
original_completion_signal,
new_completion_signal,
session_id_,
session_id,
buffer_id,
profile,
kernel_properties,
@@ -125,8 +125,6 @@ void Profiler::AddPendingSignals(
uint64_t correlation_id)
{
std::lock_guard<std::mutex> lock(sessions_pending_signals_lock_);
if (bIsSessionDestroying.load())
return;
if (sessions_pending_signals_.find(writer_id) == sessions_pending_signals_.end())
sessions_pending_signals_.emplace(writer_id, std::vector<pending_signal_ptr_t>{});
@@ -134,7 +132,7 @@ void Profiler::AddPendingSignals(
sessions_pending_signals_.at(writer_id).emplace_back(
new pending_signal_t{
kernel_object, original_completion_signal, new_completion_signal,
session_id_, buffer_id, session_data_count, std::move(profile),
session_id, buffer_id, session_data_count, std::move(profile),
kernel_properties, thread_id, queue_index, correlation_id
}
);
@@ -143,12 +141,10 @@ void Profiler::AddPendingSignals(
std::vector<pending_signal_ptr_t> Profiler::MovePendingSignals(uint32_t writer_id)
{
std::lock_guard<std::mutex> lock(sessions_pending_signals_lock_);
auto it = sessions_pending_signals_.find(writer_id);
if (it == sessions_pending_signals_.end())
{
rocprofiler::warning("writer_id is not found in the pending_signals");
return {};
}
auto move_pending = std::move(it->second);
sessions_pending_signals_.erase(writer_id);
@@ -160,8 +156,8 @@ std::vector<pending_signal_ptr_t> Profiler::MovePendingSignals(uint32_t writer_i
void Profiler::WaitForPendingAndDestroy()
{
bIsSessionDestroying.store(true);
std::unique_lock<std::mutex> lk(sessions_pending_signals_lock_);
bIsSessionDestroying.store(true);
if (sessions_pending_signals_.size() == 0)
return;
@@ -241,5 +241,6 @@ int main(int argc, char* argv[]) {
report("ROCSYS:: Error: Not possible to reach here, please report(invalid sys_type)!\n", 1);
}
}
msync(shmd, sizeof(shmd->command), MS_SYNC | MS_INVALIDATE);
return 1;
}
@@ -75,7 +75,7 @@ namespace fs = rocprofiler::common::filesystem;
#define CHECK_ROCPROFILER(call) \
do { \
if ((call) != ROCPROFILER_STATUS_SUCCESS) \
rocprofiler::fatal("Error: ROCProfiler API Call Error!"); \
{std::cerr << "ERROR IN " << __LINE__ << std::endl; abort(); } \
} while (false)
TRACE_BUFFER_INSTANTIATE();
namespace {
@@ -84,8 +84,6 @@ struct shmd_t {
int command;
};
static const char* roc_sys_session_id;
static int shm_fd_sn = -1;
struct shmd_t* shmd;
uint64_t flush_interval, trace_time_length, trace_delay, trace_interval;
@@ -122,10 +120,15 @@ class rocprofiler_plugin_t {
rocprofiler_plugin_finalize_fn = reinterpret_cast<decltype(rocprofiler_plugin_finalize)*>(
dlsym(plugin_handle_, "rocprofiler_plugin_finalize"));
if (!rocprofiler_plugin_finalize_fn) return;
if (auto* initialize = reinterpret_cast<decltype(rocprofiler_plugin_initialize)*>(
dlsym(plugin_handle_, "rocprofiler_plugin_initialize"));
initialize != nullptr)
valid_ = initialize(ROCPROFILER_VERSION_MAJOR, ROCPROFILER_VERSION_MINOR, data.userdata) == 0;
initialize_fn = reinterpret_cast<decltype(initialize_fn)>(dlsym(plugin_handle_, "rocprofiler_plugin_initialize"));
initialize(data.userdata);
}
void initialize(void* userdata)
{
if (initialize_fn)
valid_ |= initialize_fn(ROCPROFILER_VERSION_MAJOR, ROCPROFILER_VERSION_MINOR, userdata) == 0;
}
~rocprofiler_plugin_t() {
@@ -154,6 +157,7 @@ class rocprofiler_plugin_t {
bool valid_{false};
void* plugin_handle_;
std::string plugin_name_;
decltype(rocprofiler_plugin_initialize)* initialize_fn = nullptr;
decltype(rocprofiler_plugin_write_buffer_records)* rocprofiler_plugin_write_buffer_records_;
decltype(rocprofiler_plugin_write_record)* rocprofiler_plugin_write_record_;
@@ -360,32 +364,31 @@ void finish()
{
std::lock_guard<std::mutex> lock(finish_lock);
if (!rocprof_started.load(std::memory_order_seq_cst)) return;
if (!rocprof_started.load()) return;
if (trace_period_thread_control.load(std::memory_order_acquire)) {
trace_period_thread_control.exchange(false, std::memory_order_release);
trace_period_thread_control.store(false);
if (trace_period_thread.joinable())
trace_period_thread.join();
}
if (flush_thread_control.load(std::memory_order_relaxed)) {
flush_thread_control.exchange(false, std::memory_order_release);
flush_thread_control.store(false);
if (flush_thread.joinable())
flush_thread.join();
}
for ([[maybe_unused]] rocprofiler_buffer_id_t buffer_id : buffer_ids) {
for ([[maybe_unused]] rocprofiler_buffer_id_t buffer_id : buffer_ids)
CHECK_ROCPROFILER(rocprofiler_flush_data(session_id, buffer_id));
}
if (roc_sys_handler.load(std::memory_order_acquire)) {
roc_sys_handler.exchange(false, std::memory_order_release);
roc_sys_handler.store(false);
if (wait_for_start_shm.joinable())
wait_for_start_shm.join();
shm_unlink(std::to_string(*roc_sys_session_id).c_str());
}
if (session_created.load(std::memory_order_acquire)) {
session_created.exchange(false, std::memory_order_release);
bool expect = true;
if (session_created.compare_exchange_strong(expect, false)) {
rocprofiler::TraceBufferBase::FlushAll();
CHECK_ROCPROFILER(rocprofiler_terminate_session(session_id));
}
if (plugin->is_valid()) plugin->rocprofiler_plugin_finalize_fn();
rocprof_started.exchange(false, std::memory_order_seq_cst);
rocprof_started.exchange(false);
}
static bool env_var_search(std::string& s) {
@@ -505,28 +508,45 @@ void sync_api_trace_callback(rocprofiler_record_tracer_t tracer_record,
}
}
void wait_for_rocsys() {
while (roc_sys_handler.load(std::memory_order_acquire)) {
shm_fd_sn = shm_open(roc_sys_session_id, O_RDONLY, 0666);
void wait_for_rocsys()
{
using namespace std::chrono_literals;
int64_t run_increment_counter = -1;
while (roc_sys_handler.load())
{
run_increment_counter++;
int shm_fd_sn = shm_open(roc_sys_session_id, O_RDONLY, 0666);
std::this_thread::sleep_for(10ms);
if (shm_fd_sn < 0) {
continue;
}
shmd = reinterpret_cast<struct shmd_t*>(mmap(0, 1024, PROT_READ, MAP_SHARED, shm_fd_sn, 0));
shmd_t* shmd = reinterpret_cast<struct shmd_t*>(mmap(0, 1024, PROT_READ, MAP_SHARED, shm_fd_sn, 0));
msync(shmd, sizeof(shmd->command), MS_SYNC | MS_INVALIDATE);
bool flag{false};
if (shmd && (sizeof(shmd->command) == sizeof(int))) {
if (shmd && (sizeof(shmd->command) == sizeof(int)) && run_increment_counter > 0)
{
switch (shmd->command) {
// Start
case 4: {
printf("ROCSYS:: Starting Tools Session...\n");
CHECK_ROCPROFILER(rocprofiler_start_session(session_id));
session_created.exchange(true, std::memory_order_release);
bool expect = false;
if (session_created.compare_exchange_strong(expect, true))
{
plugin->initialize(nullptr);
CHECK_ROCPROFILER(rocprofiler_start_session(session_id));
}
break;
}
// Stop
case 5: {
printf("ROCSYS:: Stopping Tools Session...\n");
if (session_created.load(std::memory_order_acquire)) {
session_created.exchange(false, std::memory_order_release);
bool expect = true;
if (session_created.compare_exchange_strong(expect, false))
{
CHECK_ROCPROFILER(rocprofiler_terminate_session(session_id));
for ([[maybe_unused]] rocprofiler_buffer_id_t buffer_id : buffer_ids) {
CHECK_ROCPROFILER(rocprofiler_flush_data(session_id, buffer_id));
@@ -537,16 +557,18 @@ void wait_for_rocsys() {
}
// Exit
case 6: {
printf("ROCSYS:: Exiting Tools Session...Application might still be finishng up..\n");
if (session_created.load(std::memory_order_acquire)) {
session_created.exchange(false, std::memory_order_release);
printf("ROCSYS:: Exiting Tools Session...\n");
bool expect = true;
if (session_created.compare_exchange_strong(expect, false))
{
CHECK_ROCPROFILER(rocprofiler_terminate_session(session_id));
for ([[maybe_unused]] rocprofiler_buffer_id_t buffer_id : buffer_ids) {
CHECK_ROCPROFILER(rocprofiler_flush_data(session_id, buffer_id));
}
rocprofiler::TraceBufferBase::FlushAll();
}
roc_sys_handler.exchange(false, std::memory_order_release);
roc_sys_handler.store(false);
flag = true;
}
}
@@ -574,7 +596,7 @@ static int info_callback(const rocprofiler_counter_info_t info, const char* gpu_
// Sleeps thread for amount of time without hanging
void sleep_while_condition(int64_t time_length, std::atomic<bool>& condition) {
int64_t time_slept = 0;
while (time_slept < time_length && condition.load(std::memory_order_relaxed)) {
while (time_slept < time_length && condition.load()) {
int64_t sleep_amount = std::min(SLEEP_CYCLE_LENGTH, time_length - time_slept);
time_slept += sleep_amount;
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_amount));
@@ -582,7 +604,7 @@ void sleep_while_condition(int64_t time_length, std::atomic<bool>& condition) {
}
void flush_interval_func() {
while (flush_thread_control.load(std::memory_order_acquire)) {
while (flush_thread_control.load()) {
sleep_while_condition(flush_interval, flush_thread_control);
for ([[maybe_unused]] rocprofiler_buffer_id_t buffer_id : buffer_ids) {
CHECK_ROCPROFILER(rocprofiler_flush_data(session_id, buffer_id));
@@ -593,20 +615,19 @@ void flush_interval_func() {
void trace_period_func() {
using namespace std::chrono;
std::atomic_thread_fence(std::memory_order_acquire);
sleep_while_condition(trace_delay, trace_period_thread_control);
int64_t num_sleeps = 0;
auto start_time = system_clock::now();
while (trace_period_thread_control.load(std::memory_order_relaxed)) {
while (trace_period_thread_control.load()) {
num_sleeps += 1;
CHECK_ROCPROFILER(rocprofiler_start_session(session_id));
session_created.exchange(true, std::memory_order_release);
session_created.exchange(true);
sleep_while_condition(trace_time_length, trace_period_thread_control);
session_created.exchange(false, std::memory_order_release);
session_created.exchange(false);
rocprofiler::TraceBufferBase::FlushAll();
CHECK_ROCPROFILER(rocprofiler_terminate_session(session_id));
@@ -636,7 +657,7 @@ ROCPROFILER_EXPORT bool OnLoad(void* table, uint64_t runtime_version, uint64_t f
warning("the ROCProfiler API version is not compatible with this tool");
return true;
}
rocprof_started.exchange(true, std::memory_order_seq_cst);
rocprof_started.exchange(true);
std::atexit(finish);
@@ -644,7 +665,7 @@ ROCPROFILER_EXPORT bool OnLoad(void* table, uint64_t runtime_version, uint64_t f
if (roc_sys_session_id != nullptr) {
printf("ROCSYS Session Created!\n");
wait_for_start_shm = std::thread{wait_for_rocsys};
roc_sys_handler.exchange(true, std::memory_order_release);
roc_sys_handler.exchange(true);
}
CHECK_ROCPROFILER(rocprofiler_initialize());
@@ -850,17 +871,17 @@ ROCPROFILER_EXPORT bool OnLoad(void* table, uint64_t runtime_version, uint64_t f
// Flush buffers every given interval
if (flush_interval > 0) {
flush_thread_control.exchange(true, std::memory_order_release);
flush_thread_control.exchange(true);
flush_thread = std::thread{flush_interval_func};
}
// Let session run for a given period of time
if (trace_time_length > 0) {
trace_period_thread_control.exchange(true, std::memory_order_release);
trace_period_thread_control.exchange(true);
trace_period_thread = std::thread{trace_period_func};
} else if (roc_sys_session_id == nullptr) {
CHECK_ROCPROFILER(rocprofiler_start_session(session_id));
session_created.exchange(true, std::memory_order_release);
session_created.exchange(true);
}
return true;
}