summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce
diff options
context:
space:
mode:
authorermolovd <[email protected]>2023-10-02 16:28:29 +0300
committerermolovd <[email protected]>2023-10-02 17:46:23 +0300
commit2dd58a8a79545037b218d32588215396fc850f9f (patch)
treea44470c4215792eca451ceb95f7230c1589a73d0 /yt/cpp/mapreduce
parent45d9242d89e0811f346018242fde0dcfd1786202 (diff)
YT-19578: Move user_job_statistics to yt/cpp/mapreduce/library
Diffstat (limited to 'yt/cpp/mapreduce')
-rw-r--r--yt/cpp/mapreduce/library/CMakeLists.txt1
-rw-r--r--yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.darwin-x86_64.txt18
-rw-r--r--yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.linux-aarch64.txt19
-rw-r--r--yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.linux-x86_64.txt19
-rw-r--r--yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.txt17
-rw-r--r--yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.windows-x86_64.txt18
-rw-r--r--yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp133
-rw-r--r--yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h58
-rw-r--r--yt/cpp/mapreduce/library/user_job_statistics/ya.make11
9 files changed, 294 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/library/CMakeLists.txt b/yt/cpp/mapreduce/library/CMakeLists.txt
index bc51bb2b478..104ec2b6535 100644
--- a/yt/cpp/mapreduce/library/CMakeLists.txt
+++ b/yt/cpp/mapreduce/library/CMakeLists.txt
@@ -7,3 +7,4 @@
add_subdirectory(table_schema)
+add_subdirectory(user_job_statistics)
diff --git a/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.darwin-x86_64.txt b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..ac13f5f6df0
--- /dev/null
+++ b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,18 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(mapreduce-library-user_job_statistics)
+target_link_libraries(mapreduce-library-user_job_statistics PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-mapreduce-common
+)
+target_sources(mapreduce-library-user_job_statistics PRIVATE
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp
+)
diff --git a/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.linux-aarch64.txt b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..2c62e9c60c2
--- /dev/null
+++ b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,19 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(mapreduce-library-user_job_statistics)
+target_link_libraries(mapreduce-library-user_job_statistics PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-mapreduce-common
+)
+target_sources(mapreduce-library-user_job_statistics PRIVATE
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp
+)
diff --git a/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.linux-x86_64.txt b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..2c62e9c60c2
--- /dev/null
+++ b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,19 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(mapreduce-library-user_job_statistics)
+target_link_libraries(mapreduce-library-user_job_statistics PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-mapreduce-common
+)
+target_sources(mapreduce-library-user_job_statistics PRIVATE
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp
+)
diff --git a/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.txt b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.windows-x86_64.txt b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..ac13f5f6df0
--- /dev/null
+++ b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,18 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(mapreduce-library-user_job_statistics)
+target_link_libraries(mapreduce-library-user_job_statistics PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-mapreduce-common
+)
+target_sources(mapreduce-library-user_job_statistics PRIVATE
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp
+)
diff --git a/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp
new file mode 100644
index 00000000000..56ab88ee9da
--- /dev/null
+++ b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp
@@ -0,0 +1,133 @@
+#include "user_job_statistics.h"
+#include <yt/cpp/mapreduce/common/helpers.h>
+#include <util/stream/null.h>
+#include <util/string/builder.h>
+#include <util/system/mutex.h>
+#include <util/system/env.h>
+
+using namespace NYtTools;
+
+static TMutex GlobalStatsWritingMutex;
+
+#if defined(_unix_)
+const FHANDLE TUserJobStatsProxy::JobStatisticsHandle = 5;
+#elif defined(_win_)
+const FHANDLE TUserJobStatsProxy::JobStatisticsHandle = nullptr;
+#endif
+
+static IOutputStream* CorrectHandle(const FHANDLE h) {
+#if defined(_unix_)
+ if (fcntl(h, F_GETFD) == -1) {
+ return &Cerr;
+ }
+ return nullptr;
+#elif defined(_win_)
+ return &Cerr;
+#endif
+}
+
+static TString PrintNodeSimple(const NYT::TNode& n) {
+ return NYT::NodeToYsonString(n, NYson::EYsonFormat::Text);
+}
+
+void TUserJobStatsProxy::Init(IOutputStream * usingStream) {
+ if (usingStream == nullptr) {
+ usingStream = CorrectHandle(JobStatisticsHandle);
+ }
+
+ if (usingStream == nullptr && GetEnv("YT_JOB_ID").empty()) {
+ usingStream = &Cerr;
+ }
+
+
+ if (usingStream == nullptr) {
+ TFileHandle fixedDesrc(JobStatisticsHandle);
+ FetchedOut = MakeHolder<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate()));
+ UsingStream = FetchedOut.Get();
+ fixedDesrc.Release();
+ } else {
+ UsingStream = usingStream;
+ }
+}
+
+void TUserJobStatsProxy::InitChecked(IOutputStream* def) {
+ IOutputStream* usingStream = CorrectHandle(JobStatisticsHandle);
+
+ if (usingStream == nullptr && !GetEnv("YT_JOB_ID").empty()) {
+ TFileHandle fixedDesrc(JobStatisticsHandle);
+ FetchedOut = MakeHolder<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate()));
+ UsingStream = FetchedOut.Get();
+ fixedDesrc.Release();
+ } else {
+ UsingStream = def;
+ }
+}
+
+void TUserJobStatsProxy::InitIfNotInited(IOutputStream * usingStream) {
+ if (UsingStream == nullptr) {
+ Init(usingStream);
+ }
+}
+
+void TUserJobStatsProxy::CommitStats() {
+ if (Stats.empty()) {
+ return;
+ }
+
+ auto res = NYT::TNode::CreateMap();
+ for (auto& p : Stats) {
+ res[p.first] = p.second;
+ }
+ for (auto& p : TimeStats) {
+ res[p.first] = p.second.MilliSeconds();
+ }
+ with_lock(GlobalStatsWritingMutex) {
+ *UsingStream << PrintNodeSimple(res) << ";" << Endl;
+ }
+ Stats.clear();
+}
+
+
+TTimeStatHolder TUserJobStatsProxy::TimerStart(TString name, bool commitOnFinish) {
+ return THolder(new TTimeStat(this, name, commitOnFinish));
+}
+
+void TUserJobStatsProxy::WriteStat(TString name, i64 val) {
+ auto res = NYT::TNode {} (name, val);
+ with_lock(GlobalStatsWritingMutex) {
+ *UsingStream << PrintNodeSimple(res) << ";" << Endl;
+ }
+}
+
+void TUserJobStatsProxy::WriteStatNoFlush(TString name, i64 val) {
+ auto res = NYT::TNode {} (name, val);
+ with_lock(GlobalStatsWritingMutex) {
+ *UsingStream << (TStringBuilder{} << PrintNodeSimple(res) << ";\n");
+ }
+}
+
+TTimeStat::TTimeStat(TUserJobStatsProxy* parent, TString name, bool commit)
+ : Parent(parent)
+ , Name(name)
+ , Commit(commit) {}
+
+TTimeStat::~TTimeStat() {
+ Finish();
+}
+
+void TTimeStat::Cancel() {
+ Parent = nullptr;
+}
+
+void TTimeStat::Finish() {
+ if (!Parent) {
+ return;
+ }
+
+ if (Commit) {
+ Parent->WriteStatNoFlush(Name, Timer.Get().MilliSeconds());
+ } else {
+ Parent->TimeStats[Name] += Timer.Get();
+ }
+ Cancel();
+}
diff --git a/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h
new file mode 100644
index 00000000000..6939d20417b
--- /dev/null
+++ b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h
@@ -0,0 +1,58 @@
+#pragma once
+
+#include <util/stream/file.h>
+#include <util/generic/hash.h>
+#include <util/datetime/cputimer.h>
+
+namespace NYtTools {
+ class TTimeStat;
+ using TTimeStatHolder = THolder<TTimeStat>;
+
+ class TUserJobStatsProxy {
+ public:
+ static const FHANDLE JobStatisticsHandle;
+ private:
+ THolder<IOutputStream> FetchedOut;
+ IOutputStream* UsingStream = &Cerr;
+ public:
+ // TODO: add inheritance
+ THashMap<TString, i64> Stats;//will be dumped in CommitStats or desctructor
+ THashMap<TString, TDuration> TimeStats;//will be dumped in CommitStats or desctructor
+
+ TUserJobStatsProxy() { Init(nullptr); }
+ ~TUserJobStatsProxy() {
+ CommitStats();
+ }
+ TUserJobStatsProxy (IOutputStream* usingStream) {Init(usingStream);}
+
+ void Init(IOutputStream* usingStream);
+ void InitChecked(IOutputStream* ifNotInJob);
+ void InitIfNotInited(IOutputStream* usingStream);
+ IOutputStream* GetStream() const { return UsingStream; }
+ void CommitStats();
+ void WriteStat(TString name, i64 val); //immidiatly wirtes stat
+ void WriteStatNoFlush(TString name, i64 val); //immidiatly wirtes stat but do not flush it
+
+ //@param name name of statistic to be written in millisecs from creation to destruction
+ //@param commitOnFinish if false: will update state/write on job finish; if true: write stat in destructor
+ TTimeStatHolder TimerStart(TString name, bool commitOnFinish = false);
+ };
+
+ class TTimeStat {
+ TUserJobStatsProxy* Parent;
+ TString Name;
+ bool Commit;
+
+ TTimeStat(TUserJobStatsProxy* parent, TString name, bool commit);
+ friend class TUserJobStatsProxy;
+
+ TSimpleTimer Timer;
+ public:
+ ~TTimeStat();
+ TDuration Get() const {
+ return Timer.Get();
+ }
+ void Cancel();
+ void Finish();
+ };
+}
diff --git a/yt/cpp/mapreduce/library/user_job_statistics/ya.make b/yt/cpp/mapreduce/library/user_job_statistics/ya.make
new file mode 100644
index 00000000000..7179660b317
--- /dev/null
+++ b/yt/cpp/mapreduce/library/user_job_statistics/ya.make
@@ -0,0 +1,11 @@
+LIBRARY()
+
+SRCS(
+ user_job_statistics.cpp
+)
+
+PEERDIR(
+ yt/cpp/mapreduce/common
+)
+
+END()