diff options
| author | ermolovd <[email protected]> | 2023-10-02 16:28:29 +0300 |
|---|---|---|
| committer | ermolovd <[email protected]> | 2023-10-02 17:46:23 +0300 |
| commit | 2dd58a8a79545037b218d32588215396fc850f9f (patch) | |
| tree | a44470c4215792eca451ceb95f7230c1589a73d0 /yt/cpp/mapreduce | |
| parent | 45d9242d89e0811f346018242fde0dcfd1786202 (diff) | |
YT-19578: Move user_job_statistics to yt/cpp/mapreduce/library
Diffstat (limited to 'yt/cpp/mapreduce')
9 files changed, 294 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/library/CMakeLists.txt b/yt/cpp/mapreduce/library/CMakeLists.txt index bc51bb2b478..104ec2b6535 100644 --- a/yt/cpp/mapreduce/library/CMakeLists.txt +++ b/yt/cpp/mapreduce/library/CMakeLists.txt @@ -7,3 +7,4 @@ add_subdirectory(table_schema) +add_subdirectory(user_job_statistics) diff --git a/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.darwin-x86_64.txt b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..ac13f5f6df0 --- /dev/null +++ b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,18 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(mapreduce-library-user_job_statistics) +target_link_libraries(mapreduce-library-user_job_statistics PUBLIC + contrib-libs-cxxsupp + yutil + cpp-mapreduce-common +) +target_sources(mapreduce-library-user_job_statistics PRIVATE + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp +) diff --git a/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.linux-aarch64.txt b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..2c62e9c60c2 --- /dev/null +++ b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.linux-aarch64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(mapreduce-library-user_job_statistics) +target_link_libraries(mapreduce-library-user_job_statistics PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-mapreduce-common +) +target_sources(mapreduce-library-user_job_statistics PRIVATE + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp +) diff --git a/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.linux-x86_64.txt b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..2c62e9c60c2 --- /dev/null +++ b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.linux-x86_64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(mapreduce-library-user_job_statistics) +target_link_libraries(mapreduce-library-user_job_statistics PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-mapreduce-common +) +target_sources(mapreduce-library-user_job_statistics PRIVATE + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp +) diff --git a/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.txt b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.windows-x86_64.txt b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..ac13f5f6df0 --- /dev/null +++ b/yt/cpp/mapreduce/library/user_job_statistics/CMakeLists.windows-x86_64.txt @@ -0,0 +1,18 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(mapreduce-library-user_job_statistics) +target_link_libraries(mapreduce-library-user_job_statistics PUBLIC + contrib-libs-cxxsupp + yutil + cpp-mapreduce-common +) +target_sources(mapreduce-library-user_job_statistics PRIVATE + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp +) diff --git a/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp new file mode 100644 index 00000000000..56ab88ee9da --- /dev/null +++ b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp @@ -0,0 +1,133 @@ +#include "user_job_statistics.h" +#include <yt/cpp/mapreduce/common/helpers.h> +#include <util/stream/null.h> +#include <util/string/builder.h> +#include <util/system/mutex.h> +#include <util/system/env.h> + +using namespace NYtTools; + +static TMutex GlobalStatsWritingMutex; + +#if defined(_unix_) +const FHANDLE TUserJobStatsProxy::JobStatisticsHandle = 5; +#elif defined(_win_) +const FHANDLE TUserJobStatsProxy::JobStatisticsHandle = nullptr; +#endif + +static IOutputStream* CorrectHandle(const FHANDLE h) { +#if defined(_unix_) + if (fcntl(h, F_GETFD) == -1) { + return &Cerr; + } + return nullptr; +#elif defined(_win_) + return &Cerr; +#endif +} + +static TString PrintNodeSimple(const NYT::TNode& n) { + return NYT::NodeToYsonString(n, NYson::EYsonFormat::Text); +} + +void TUserJobStatsProxy::Init(IOutputStream * usingStream) { + if (usingStream == nullptr) { + usingStream = CorrectHandle(JobStatisticsHandle); + } + + if (usingStream == nullptr && GetEnv("YT_JOB_ID").empty()) { + usingStream = &Cerr; + } + + + if (usingStream == nullptr) { + TFileHandle fixedDesrc(JobStatisticsHandle); + FetchedOut = MakeHolder<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate())); + UsingStream = FetchedOut.Get(); + fixedDesrc.Release(); + } else { + UsingStream = usingStream; + } +} + +void TUserJobStatsProxy::InitChecked(IOutputStream* def) { + IOutputStream* usingStream = CorrectHandle(JobStatisticsHandle); + + if (usingStream == nullptr && !GetEnv("YT_JOB_ID").empty()) { + TFileHandle fixedDesrc(JobStatisticsHandle); + FetchedOut = MakeHolder<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate())); + UsingStream = FetchedOut.Get(); + fixedDesrc.Release(); + } else { + UsingStream = def; + } +} + +void TUserJobStatsProxy::InitIfNotInited(IOutputStream * usingStream) { + if (UsingStream == nullptr) { + Init(usingStream); + } +} + +void TUserJobStatsProxy::CommitStats() { + if (Stats.empty()) { + return; + } + + auto res = NYT::TNode::CreateMap(); + for (auto& p : Stats) { + res[p.first] = p.second; + } + for (auto& p : TimeStats) { + res[p.first] = p.second.MilliSeconds(); + } + with_lock(GlobalStatsWritingMutex) { + *UsingStream << PrintNodeSimple(res) << ";" << Endl; + } + Stats.clear(); +} + + +TTimeStatHolder TUserJobStatsProxy::TimerStart(TString name, bool commitOnFinish) { + return THolder(new TTimeStat(this, name, commitOnFinish)); +} + +void TUserJobStatsProxy::WriteStat(TString name, i64 val) { + auto res = NYT::TNode {} (name, val); + with_lock(GlobalStatsWritingMutex) { + *UsingStream << PrintNodeSimple(res) << ";" << Endl; + } +} + +void TUserJobStatsProxy::WriteStatNoFlush(TString name, i64 val) { + auto res = NYT::TNode {} (name, val); + with_lock(GlobalStatsWritingMutex) { + *UsingStream << (TStringBuilder{} << PrintNodeSimple(res) << ";\n"); + } +} + +TTimeStat::TTimeStat(TUserJobStatsProxy* parent, TString name, bool commit) + : Parent(parent) + , Name(name) + , Commit(commit) {} + +TTimeStat::~TTimeStat() { + Finish(); +} + +void TTimeStat::Cancel() { + Parent = nullptr; +} + +void TTimeStat::Finish() { + if (!Parent) { + return; + } + + if (Commit) { + Parent->WriteStatNoFlush(Name, Timer.Get().MilliSeconds()); + } else { + Parent->TimeStats[Name] += Timer.Get(); + } + Cancel(); +} diff --git a/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h new file mode 100644 index 00000000000..6939d20417b --- /dev/null +++ b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h @@ -0,0 +1,58 @@ +#pragma once + +#include <util/stream/file.h> +#include <util/generic/hash.h> +#include <util/datetime/cputimer.h> + +namespace NYtTools { + class TTimeStat; + using TTimeStatHolder = THolder<TTimeStat>; + + class TUserJobStatsProxy { + public: + static const FHANDLE JobStatisticsHandle; + private: + THolder<IOutputStream> FetchedOut; + IOutputStream* UsingStream = &Cerr; + public: + // TODO: add inheritance + THashMap<TString, i64> Stats;//will be dumped in CommitStats or desctructor + THashMap<TString, TDuration> TimeStats;//will be dumped in CommitStats or desctructor + + TUserJobStatsProxy() { Init(nullptr); } + ~TUserJobStatsProxy() { + CommitStats(); + } + TUserJobStatsProxy (IOutputStream* usingStream) {Init(usingStream);} + + void Init(IOutputStream* usingStream); + void InitChecked(IOutputStream* ifNotInJob); + void InitIfNotInited(IOutputStream* usingStream); + IOutputStream* GetStream() const { return UsingStream; } + void CommitStats(); + void WriteStat(TString name, i64 val); //immidiatly wirtes stat + void WriteStatNoFlush(TString name, i64 val); //immidiatly wirtes stat but do not flush it + + //@param name name of statistic to be written in millisecs from creation to destruction + //@param commitOnFinish if false: will update state/write on job finish; if true: write stat in destructor + TTimeStatHolder TimerStart(TString name, bool commitOnFinish = false); + }; + + class TTimeStat { + TUserJobStatsProxy* Parent; + TString Name; + bool Commit; + + TTimeStat(TUserJobStatsProxy* parent, TString name, bool commit); + friend class TUserJobStatsProxy; + + TSimpleTimer Timer; + public: + ~TTimeStat(); + TDuration Get() const { + return Timer.Get(); + } + void Cancel(); + void Finish(); + }; +} diff --git a/yt/cpp/mapreduce/library/user_job_statistics/ya.make b/yt/cpp/mapreduce/library/user_job_statistics/ya.make new file mode 100644 index 00000000000..7179660b317 --- /dev/null +++ b/yt/cpp/mapreduce/library/user_job_statistics/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + user_job_statistics.cpp +) + +PEERDIR( + yt/cpp/mapreduce/common +) + +END() |
