aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoruzhas <uzhas@ydb.tech>2024-03-10 19:22:54 +0300
committeruzhas <uzhas@ydb.tech>2024-03-10 19:34:23 +0300
commit1ba5eab3ab1bbaa8e19a239d8a577f12746d77f2 (patch)
tree647018a595966279fe75970d3e89c2f80291cef2
parent13a34e8a2fe1c3498a9a3e1d56202bb29eb5d17b (diff)
downloadydb-1ba5eab3ab1bbaa8e19a239d8a577f12746d77f2.tar.gz
add job_profile.cpp
8571dcd93f2c4adc97e84f5a92373149b0d6e0f9
-rw-r--r--yt/cpp/mapreduce/client/job_profiler.cpp141
1 files changed, 141 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/job_profiler.cpp b/yt/cpp/mapreduce/client/job_profiler.cpp
new file mode 100644
index 0000000000..330674f87f
--- /dev/null
+++ b/yt/cpp/mapreduce/client/job_profiler.cpp
@@ -0,0 +1,141 @@
+#include "job_profiler.h"
+
+#include <yt/yt/library/ytprof/cpu_profiler.h>
+#include <yt/yt/library/ytprof/external_pprof.h>
+#include <yt/yt/library/ytprof/heap_profiler.h>
+#include <yt/yt/library/ytprof/profile.h>
+#include <yt/yt/library/ytprof/symbolize.h>
+
+#include <yt/cpp/mapreduce/interface/logging/logger.h>
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+#include <contrib/libs/tcmalloc/tcmalloc/malloc_extension.h>
+
+#include <util/system/env.h>
+#include <util/system/file.h>
+#include <util/system/shellcommand.h>
+
+namespace NYT {
+
+using namespace NYTProf;
+
+////////////////////////////////////////////////////////////////////////////////
+
+static void RunSubprocess(const std::vector<TString>& cmd)
+{
+ auto command = cmd[0];
+ auto args = TList<TString>(cmd.begin() + 1, cmd.end());
+
+ TShellCommand(command, args)
+ .Run()
+ .Wait();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TJobProfiler
+ : public IJobProfiler
+{
+public:
+ TJobProfiler()
+ {
+ try {
+ InitializeProfiler();
+ } catch (const std::exception& ex) {
+ YT_LOG_ERROR("Failed to initialize job profiler: %v",
+ ex.what());
+ }
+ }
+
+ void Start() override
+ {
+ if (CpuProfiler_) {
+ CpuProfiler_->Start();
+ }
+ }
+
+ void Stop() override
+ {
+ if (CpuProfiler_) {
+ CpuProfiler_->Stop();
+
+ auto profile = CpuProfiler_->ReadProfile();
+ SymbolizeAndWriteProfile(&profile);
+ }
+
+ if (MemoryProfilingToken_) {
+ auto profile = ConvertAllocationProfile(std::move(*MemoryProfilingToken_).Stop());
+ SymbolizeAndWriteProfile(&profile);
+ }
+
+ if (ProfilePeakMemoryUsage_) {
+ auto profile = ReadHeapProfile(tcmalloc::ProfileType::kPeakHeap);
+ SymbolizeAndWriteProfile(&profile);
+ }
+ }
+
+private:
+ std::unique_ptr<TCpuProfiler> CpuProfiler_;
+
+ std::optional<tcmalloc::MallocExtension::AllocationProfilingToken> MemoryProfilingToken_;
+
+ bool ProfilePeakMemoryUsage_ = false;
+
+ bool RunExternalSymbolizer_ = false;
+
+ void InitializeProfiler()
+ {
+ auto profilerSpecYson = GetEnv("YT_JOB_PROFILER_SPEC");
+ if (!profilerSpecYson) {
+ return;
+ }
+
+ auto profilerSpec = NodeFromYsonString(profilerSpecYson);
+ if (profilerSpec["type"] == "cpu") {
+ auto samplingFrequency = profilerSpec["sampling_frequency"].AsInt64();
+ CpuProfiler_ = std::make_unique<TCpuProfiler>(TCpuProfilerOptions{
+ .SamplingFrequency = static_cast<int>(samplingFrequency),
+ });
+ } else if (profilerSpec["type"] == "memory") {
+ MemoryProfilingToken_ = tcmalloc::MallocExtension::StartAllocationProfiling();
+ } else if (profilerSpec["type"] == "peak_memory") {
+ ProfilePeakMemoryUsage_ = true;
+ }
+
+ if (profilerSpec["run_external_symbolizer"] == true) {
+ RunExternalSymbolizer_ = true;
+ }
+ }
+
+ void SymbolizeAndWriteProfile(NYTProf::NProto::Profile* profile)
+ {
+ Symbolize(profile, /*filesOnly*/ true);
+ AddBuildInfo(profile, TBuildInfo::GetDefault());
+
+ if (RunExternalSymbolizer_) {
+ SymbolizeByExternalPProf(profile, TSymbolizationOptions{
+ .RunTool = RunSubprocess,
+ });
+ }
+
+ auto serializedProfile = SerializeProfile(*profile);
+
+ constexpr int ProfileFileDescriptor = 8;
+ TFile profileFile(ProfileFileDescriptor);
+ profileFile.Write(serializedProfile.data(), serializedProfile.size());
+ profileFile.FlushData();
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+std::unique_ptr<IJobProfiler> CreateJobProfiler()
+{
+ return std::make_unique<TJobProfiler>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT