Offload sampling data (#190)

- update timemory submodule
  - support for load/save of ring_buffers
  - new output keys, e.g. `%nid%`
  - sampling allocator offloading data
- writing sampling data to temporary file
- new advanced config option `OMNITRACE_USE_TEMPORARY_FILES`
- new advanced config option `OMNITRACE_TMPDIR`
- SIGINT signal (i.e. `Ctrl+C`) triggers backtrace + finalization
  - this behavior is common to other profilers

* update output.md docs

* Update omnitrace-avail output keys handling

* update writing metadata

* str format in perfetto_counter_track

* Fix fail regex for mpi-example

* config updates

- OMNITRACE_USE_TEMPORARY_FILES
- OMNITRACE_TMPDIR
- Enable finalization with SIGINT
- code supporting creation of temp files

* sampling offloading to temporary file

* Disable creation of empty temporary files when off

[ROCm/rocprofiler-systems commit: b23b581563]
Cette révision appartient à :
Jonathan R. Madsen
2022-10-31 22:23:10 -05:00
révisé par GitHub
Parent c87e69e522
révision 7b9a527b7e
9 fichiers modifiés avec 270 ajouts et 25 suppressions
Sous-module projects/rocprofiler-systems/external/timemory mis-à-jour : 95df33c9c4...d5e3987802
+54 -14
Voir le fichier
@@ -272,25 +272,65 @@ main(int argc, char** argv)
{
auto _keys = tim::settings::output_keys(
tim::settings::shared_instance()->get_tag());
std::pair<size_t, size_t> _w = { 0, 0 };
std::tuple<size_t, size_t, size_t> _w = { 0, 0, 0 };
for(const auto& itr : _keys)
{
if(!is_selected(itr.first)) continue;
if(_show && !is_selected(itr.second)) continue;
_w.first = std::max(_w.first, itr.first.length());
_w.second = std::max(_w.second, itr.second.length());
if(!is_selected(itr.key)) continue;
if(_show && !is_selected(itr.value)) continue;
std::get<0>(_w) = std::max(std::get<0>(_w), itr.key.length());
std::get<1>(_w) = std::max(std::get<1>(_w), itr.value.length());
std::get<2>(_w) = std::max(std::get<2>(_w), itr.description.length());
}
std::stringstream _msg{};
_msg << "Output Keys:\n" << std::left;
for(const auto& itr : _keys)
_msg << std::left;
if(markdown)
{
if(!is_selected(itr.first)) continue;
if(_show && !is_selected(itr.second)) continue;
if(_show)
_msg << " " << std::setw(_w.first) << itr.first
<< " :: " << std::setw(_w.second) << itr.second << "\n";
else
_msg << " " << std::setw(_w.first) << itr.first << "\n";
_msg << "| " << std::setw(std::get<0>(_w) + 2) << "String";
if(_show) _msg << " | " << std::setw(std::get<1>(_w)) << "Value";
_msg << " | " << std::setw(std::get<2>(_w)) << "Encoding"
<< " |\n";
auto _dashes = [](int64_t _n) {
std::stringstream _dss{};
_dss.fill('-');
_dss << std::setw(_n + 2) << "";
return _dss.str();
};
_msg << "|" << _dashes(std::get<0>(_w) + 2);
if(_show) _msg << "|" << _dashes(std::get<1>(_w));
_msg << "|" << _dashes(std::get<2>(_w)) << "|\n";
for(const auto& itr : _keys)
{
if(!is_selected(itr.key)) continue;
if(_show && !is_selected(itr.value)) continue;
_msg << "| " << std::setw(std::get<0>(_w) + 2)
<< TIMEMORY_JOIN("", "`", itr.key, "`");
if(_show)
_msg << " | " << std::setw(std::get<1>(_w)) << itr.value;
_msg << " | " << std::setw(std::get<2>(_w)) << itr.description
<< " |\n";
}
}
else
{
_msg << "Output Keys:\n" << std::left;
for(const auto& itr : _keys)
{
if(!is_selected(itr.key)) continue;
if(_show && !is_selected(itr.value)) continue;
if(_show)
_msg << " " << std::setw(std::get<0>(_w)) << itr.key
<< " :: " << std::setw(std::get<1>(_w)) << itr.value
<< " :: " << std::setw(std::get<2>(_w))
<< itr.description << "\n";
else
_msg << " " << std::setw(std::get<0>(_w)) << itr.key
<< " :: " << std::setw(std::get<2>(_w))
<< itr.description << "\n";
}
}
std::cout << _msg.str();
}
+9 -5
Voir le fichier
@@ -261,21 +261,25 @@ set `OMNITRACE_OUTPUT_PREFIX="%argt%-"` and let omnitrace cleanly organize the o
| String | Encoding |
|-----------------|--------------------------------------------------------------------------------------------------------------------|
| `%arg<N>%` | Command line argument at position `<N>` (zero indexed), e.g. `%arg0%` for first argument. |
| `%arg<N>_hash%` | MD5 sum of `%arg<N>%` |
| `%argv%` | Entire command-line condensed into a single string |
| `%argv_hash%` | MD5 sum of `%argv%` |
| `%argt%` | Similar to `%argv%` except basename of first command line argument |
| `%argt_hash%` | MD5 sum if `%argt%` |
| `%args%` | All command line arguments condensed into a single string |
| `%args_hash%` | MD5 sum of `%args%` |
| `%tag%` | Basename of first command line argument |
| `%arg<N>%` | Command line argument at position `<N>` (zero indexed), e.g. `%arg0%` for first argument. |
| `%argv_hash%` | MD5 sum of `%argv%` |
| `%argt_hash%` | MD5 sum if `%argt%` |
| `%args_hash%` | MD5 sum of `%args%` |
| `%tag_hash%` | MD5 sum of `%tag%` |
| `%arg<N>_hash%` | MD5 sum of `%arg<N>%` |
| `%pid%` | Process identifier (i.e. `getpid()`) |
| `%ppid%` | Parent process identifier (i.e. `getppid()`) |
| `%pgid%` | Process group identifier (i.e. `getpgid(getpid())`) |
| `%psid%` | Process session identifier (i.e. `getsid(getpid())`) |
| `%psize%` | Number of sibling process (from reading `/proc/<PPID>/tasks/<PPID>/children`) |
| `%job%` | Value of `SLURM_JOB_ID` environment variable if exists, else `0` |
| `%rank%` | Value of `SLURM_PROCID` environment variable if exists, else `MPI_Comm_rank` (or `0` non-mpi) |
| `%size%` | `MPI_Comm_size` or `1` if non-mpi |
| `%nid%` | `%rank%` if possible, otherwise `%pid%` |
| `%launch_time%` | Launch date and time (uses `OMNITRACE_TIME_FORMAT`) |
| `%env{NAME}%` | Value of environment variable `NAME` (i.e. `getenv(NAME)`) |
| `%cfg{NAME}%` | Value of configuration variable `NAME` (e.g. `%cfg{OMNITRACE_SAMPLING_FREQ}%` would resolve to sampling frequency) |
+3 -2
Voir le fichier
@@ -991,9 +991,10 @@ omnitrace_finalize_hidden(void)
OMNITRACE_VERBOSE_F(1, "Finalizing timemory...\n");
tim::timemory_finalize(_timemory_manager.get());
auto _cfg = settings::compose_filename_config{};
_cfg.use_suffix = true;
_timemory_manager->write_metadata(settings::get_global_output_prefix(),
"omnitrace",
settings::default_process_suffix());
"omnitrace", _cfg);
}
if(_perfetto_output_error)
+86
Voir le fichier
@@ -56,6 +56,7 @@
#include <sstream>
#include <string>
#include <unistd.h>
#include <utility>
namespace omnitrace
{
@@ -597,6 +598,16 @@ configure_settings(bool _init)
std::string{ "perfetto-trace.proto" }, "perfetto", "io",
"filename", "advanced");
OMNITRACE_CONFIG_SETTING(bool, "OMNITRACE_USE_TEMPORARY_FILES",
"Write data to temporary files to minimize the memory usage "
"of omnitrace, e.g. call-stack samples will be periodically "
"written to a file and re-loaded during finalization",
true, "io", "data", "advanced");
OMNITRACE_CONFIG_SETTING(
std::string, "OMNITRACE_TMPDIR", "Base directory for temporary files",
get_env<std::string>("TMPDIR", "/tmp"), "io", "data", "advanced");
// set the defaults
_config->get_flamegraph_output() = false;
_config->get_ctest_notes() = false;
@@ -986,6 +997,7 @@ configure_signal_handler()
if(_config->get_enable_signal_handler())
{
tim::signals::disable_signal_detection();
signal_settings::enable(sys_signal::Interrupt);
signal_settings::set_exit_action(omnitrace_exit_action);
signal_settings::check_environment();
auto default_signals = signal_settings::get_default();
@@ -1964,6 +1976,80 @@ get_debug_pid()
_vlist.count(dmp::rank()) > 0;
return _v;
}
bool
get_use_tmp_files()
{
static auto _v = get_config()->find("OMNITRACE_USE_TEMPORARY_FILES");
return static_cast<tim::tsettings<bool>&>(*_v->second).get();
}
std::string
get_tmpdir()
{
static auto _v = get_config()->find("OMNITRACE_TMPDIR");
return static_cast<tim::tsettings<std::string>&>(*_v->second).get();
}
tmp_file::tmp_file(std::string _v)
: filename{ std::move(_v) }
{}
tmp_file::~tmp_file() { close(); }
void
tmp_file::open(std::ios::openmode _mode)
{
OMNITRACE_VERBOSE_F(2, "Opening temporary file '%s'...\n", filename.c_str());
if(!filepath::exists(filename))
{
// if the filepath does not exist, open in out mode to create it
std::ofstream _ofs{};
filepath::open(_ofs, filename);
}
stream.open(filename, _mode);
}
void
tmp_file::close()
{
if(stream.is_open()) stream.close();
}
std::shared_ptr<tmp_file>
get_tmp_file(std::string _basename, std::string _ext)
{
if(!get_use_tmp_files()) return std::shared_ptr<tmp_file>{};
static auto _existing_files =
std::unordered_map<std::string, std::shared_ptr<tmp_file>>{};
static std::mutex _mutex{};
std::unique_lock<std::mutex> _lk{ _mutex };
auto _cfg = settings::compose_filename_config{};
_cfg.use_suffix = true;
_cfg.suffix = "%pid%";
_cfg.explicit_path = get_tmpdir();
_cfg.subdirectory = JOIN('/', settings::output_path(), "%ppid%", "");
auto _fname =
settings::compose_output_filename(std::move(_basename), std::move(_ext), _cfg);
if(_fname.empty() || _fname.front() != '/')
{
OMNITRACE_THROW("Error! temporary file '%s' (based on '%s.%s') is either empty "
"or is not an absolute path",
_fname.c_str(), _basename.c_str(), _ext.c_str());
}
auto itr = _existing_files.find(_fname);
if(itr != _existing_files.end()) return itr->second;
auto _v = std::make_shared<tmp_file>(_fname);
_v->open();
_existing_files.emplace(_fname, std::move(_v));
return _existing_files.at(_fname);
}
} // namespace config
State&
+24
Voir le fichier
@@ -31,6 +31,7 @@
#include <timemory/backends/threading.hpp>
#include <timemory/macros/language.hpp>
#include <fstream>
#include <string>
#include <string_view>
#include <unordered_set>
@@ -330,6 +331,29 @@ get_trace_thread_join();
std::string
get_rocm_events();
bool
get_use_tmp_files();
std::string
get_tmpdir();
struct tmp_file
{
tmp_file(std::string);
~tmp_file();
void open(std::ios::openmode = std::ios::binary | std::ios::in | std::ios::out);
void close();
operator bool() const { return stream.is_open() && stream.good(); }
std::string filename = {};
std::fstream stream = {};
};
std::shared_ptr<tmp_file>
get_tmp_file(std::string _basename, std::string _ext = "dat");
} // namespace config
//
+3 -2
Voir le fichier
@@ -151,8 +151,9 @@ struct perfetto_counter_track
_css << " " << std::hex << std::setw(12) << std::left << eitr;
OMNITRACE_THROW("perfetto_counter_track emplace method for '%s' (%p) "
"invalidated C-string '%s' (%p).\n%8s: %s\n%8s: %s\n",
_v.c_str(), _name->c_str(), std::get<0>(itr).c_str(),
std::get<0>(itr).c_str(), "previous",
_v.c_str(), (void*) _name->c_str(),
std::get<0>(itr).c_str(),
(void*) std::get<0>(itr).c_str(), "previous",
_pss.str().c_str(), "current", _css.str().c_str());
}
}
+87
Voir le fichier
@@ -286,6 +286,69 @@ start_duration_thread()
}
}
auto&
get_offload_file()
{
static auto _v = config::get_tmp_file("sampling");
return _v;
}
std::mutex&
get_offload_mutex()
{
static auto _v = std::mutex{};
return _v;
}
using sampler_bundle_t = typename sampler_t::bundle_type;
using sampler_buffer_t = tim::data_storage::ring_buffer<sampler_bundle_t>;
void
offload_buffer(int64_t _seq, sampler_buffer_t&& _buf)
{
auto _lk = std::unique_lock<std::mutex>{ get_offload_mutex() };
auto& _file = get_offload_file();
if(!_file) return;
OMNITRACE_VERBOSE_F(3, "Saving sampling buffer for thread %li...\n", _seq);
auto& _fs = _file->stream;
_fs.write(reinterpret_cast<char*>(&_seq), sizeof(_seq));
auto _data = std::move(_buf);
_data.save(_fs);
_data.destroy();
_buf.destroy();
}
auto
load_offload_buffer()
{
auto _data = std::map<int64_t, std::vector<sampler_buffer_t>>{};
if(!get_use_tmp_files()) return _data;
auto _lk = std::unique_lock<std::mutex>{ get_offload_mutex() };
auto& _file = get_offload_file();
if(!_file) return _data;
auto& _fs = _file->stream;
_fs.close();
_file->open(std::ios::binary | std::ios::in);
while(!_fs.eof())
{
int64_t _seq = 0;
_fs.read(reinterpret_cast<char*>(&_seq), sizeof(_seq));
if(_fs.eof()) break;
sampler_buffer_t _buffer{};
_buffer.load(_fs);
OMNITRACE_VERBOSE_F(2, "Loading %zu samples for thread %li...\n", _buffer.count(),
_seq);
_data[_seq].emplace_back(std::move(_buffer));
}
_file.reset();
return _data;
}
std::set<int>
configure(bool _setup, int64_t _tid)
{
@@ -362,6 +425,12 @@ configure(bool _setup, int64_t _tid)
threading::get_sys_tid() });
}
if(get_use_tmp_files())
{
auto _file = get_offload_file();
if(_file && *_file) _sampler->set_offload(&offload_buffer);
}
static_assert(tim::trait::buffer_size<sampling::sampler_t>::value > 0,
"Error! Zero buffer size");
@@ -542,6 +611,14 @@ post_process()
size_t _total_data = 0;
size_t _total_threads = 0;
for(size_t i = 0; i < max_supported_threads; ++i)
{
auto& _sampler = get_sampler(i);
if(_sampler) _sampler->set_offload(nullptr);
}
auto _loaded_data = load_offload_buffer();
for(size_t i = 0; i < max_supported_threads; ++i)
{
auto& _sampler = get_sampler(i);
@@ -574,6 +651,16 @@ post_process()
_sampler->stop();
auto& _raw_data = _sampler->get_data();
for(auto litr : _loaded_data[i])
{
while(!litr.is_empty())
{
auto _v = sampler_bundle_t{};
litr.read(&_v);
_raw_data.emplace_back(std::move(_v));
}
litr.destroy();
}
OMNITRACE_VERBOSE(2 || get_debug_sampling(),
"Sampler data for thread %lu has %zu initial entries...\n", i,
+3 -1
Voir le fichier
@@ -811,7 +811,9 @@ if(OMNITRACE_USE_MPI OR OMNITRACE_USE_MPI_HEADERS)
ENVIRONMENT "${_base_environment}"
REWRITE_RUN_PASS_REGEX
"(/[A-Za-z-]+/perfetto-trace-0.proto).*(/[A-Za-z-]+/wall_clock-0.txt')"
REWRITE_RUN_FAIL_REGEX "-[0-9][0-9]+.(json|txt|proto)")
REWRITE_RUN_FAIL_REGEX
"(perfetto-trace|trip_count|sampling_percent|sampling_cpu_clock|sampling_wall_clock|wall_clock)-[0-9][0-9]+.(json|txt|proto)"
)
omnitrace_add_test(
SKIP_RUNTIME SKIP_SAMPLING