summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/interface/job_statistics.cpp
diff options
context:
space:
mode:
authormax42 <[email protected]>2023-06-30 11:13:34 +0300
committermax42 <[email protected]>2023-06-30 11:13:34 +0300
commit3e1899838408bbad47622007aa382bc8a2b01f87 (patch)
tree0f21c1e6add187ddb6c3ccc048a7d640ce03fb87 /yt/cpp/mapreduce/interface/job_statistics.cpp
parent5463eb3f5e72a86f858a3d27c886470a724ede34 (diff)
Revert "YT-19324: move YT provider to ydb/library/yql"
This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12.
Diffstat (limited to 'yt/cpp/mapreduce/interface/job_statistics.cpp')
-rw-r--r--yt/cpp/mapreduce/interface/job_statistics.cpp361
1 files changed, 0 insertions, 361 deletions
diff --git a/yt/cpp/mapreduce/interface/job_statistics.cpp b/yt/cpp/mapreduce/interface/job_statistics.cpp
deleted file mode 100644
index bd9791672d4..00000000000
--- a/yt/cpp/mapreduce/interface/job_statistics.cpp
+++ /dev/null
@@ -1,361 +0,0 @@
-#include "job_statistics.h"
-
-#include "operation.h"
-
-#include <library/cpp/yson/node/node.h>
-#include <library/cpp/yson/node/serialize.h>
-
-#include <library/cpp/yson/writer.h>
-
-#include <util/datetime/base.h>
-#include <util/generic/hash_set.h>
-#include <util/generic/ptr.h>
-#include <util/stream/file.h>
-#include <util/string/cast.h>
-#include <util/string/subst.h>
-#include <util/system/file.h>
-
-namespace NYT {
-
-////////////////////////////////////////////////////////////////////
-
-template <>
-i64 ConvertJobStatisticsEntry(i64 value)
-{
- return value;
-}
-
-template <>
-TDuration ConvertJobStatisticsEntry(i64 value)
-{
- return TDuration::MilliSeconds(value);
-}
-
-////////////////////////////////////////////////////////////////////
-
-static TTaskName JobTypeToTaskName(EJobType jobType)
-{
- switch (jobType) {
- case EJobType::PartitionMap:
- return ETaskName::PartitionMap0;
- case EJobType::Partition:
- return ETaskName::Partition0;
- default:
- return ToString(jobType);
- }
-}
-
-static TTaskName FixTaskName(TString taskName)
-{
- if (taskName == "partition") {
- return ETaskName::Partition0;
- } else if (taskName == "partition_map") {
- return ETaskName::PartitionMap0;
- }
- return taskName;
-}
-
-////////////////////////////////////////////////////////////////////
-
-class TJobStatistics::TData
- : public TThrRefBase
-{
-public:
- using TTaskName2Data = THashMap<TString, TJobStatistics::TDataEntry>;
- using TState2TaskName2Data = THashMap<EJobState, TTaskName2Data>;
- using TName2State2TaskName2Data = THashMap<TString, TState2TaskName2Data>;
-
-public:
- TName2State2TaskName2Data Name2State2TaskName2Data;
-
-public:
- TData() = default;
-
- TData(const TNode& statisticsNode)
- {
- ParseNode(statisticsNode, TString(), &Name2State2TaskName2Data);
- }
-
- static void Aggregate(TJobStatistics::TDataEntry* result, const TJobStatistics::TDataEntry& other)
- {
- result->Max = Max(result->Max, other.Max);
- result->Min = Min(result->Min, other.Min);
- result->Sum += other.Sum;
- result->Count += other.Count;
- }
-
- static void ParseNode(const TNode& node, TState2TaskName2Data* output)
- {
- auto getInt = [] (const TNode& theNode, TStringBuf key) {
- const auto& nodeAsMap = theNode.AsMap();
- auto it = nodeAsMap.find(key);
- if (it == nodeAsMap.end()) {
- ythrow yexception() << "Key '" << key << "' is not found";
- }
- const auto& valueNode = it->second;
- if (!valueNode.IsInt64()) {
- ythrow yexception() << "Key '" << key << "' is not of int64 type";
- }
- return valueNode.AsInt64();
- };
-
- for (const auto& [stateStr, taskName2DataNode] : node.AsMap()) {
- EJobState state;
- if (!TryFromString(stateStr, state)) {
- continue;
- }
- for (const auto& [taskName, dataNode] : taskName2DataNode.AsMap()) {
- auto fixedTaskName = FixTaskName(taskName);
- auto& data = (*output)[state][fixedTaskName.Get()];
- data.Max = getInt(dataNode, "max");
- data.Min = getInt(dataNode, "min");
- data.Sum = getInt(dataNode, "sum");
- data.Count = getInt(dataNode, "count");
- }
- }
- }
-
- static void ParseNode(const TNode& node, const TString& curPath, TName2State2TaskName2Data* output)
- {
- Y_VERIFY(node.IsMap());
-
- for (const auto& [key, value] : node.AsMap()) {
- if (key == "$"sv) {
- ParseNode(value, &(*output)[curPath]);
- } else {
- TString childPath = curPath;
- if (!childPath.empty()) {
- childPath.push_back('/');
- }
- if (key.find_first_of('/') != key.npos) {
- TString keyCopy(key);
- SubstGlobal(keyCopy, "/", "\\/");
- childPath += keyCopy;
- } else {
- childPath += key;
- }
- ParseNode(value, childPath, output);
- }
- }
- }
-};
-
-////////////////////////////////////////////////////////////////////
-
-struct TJobStatistics::TFilter
- : public TThrRefBase
-{
- TVector<TTaskName> TaskNameFilter;
- TVector<EJobState> JobStateFilter = {EJobState::Completed};
-};
-
-////////////////////////////////////////////////////////////////////
-
-const TString TJobStatistics::CustomStatisticsNamePrefix_ = "custom/";
-
-TJobStatistics::TJobStatistics()
- : Data_(::MakeIntrusive<TData>())
- , Filter_(::MakeIntrusive<TFilter>())
-{ }
-
-
-TJobStatistics::TJobStatistics(const NYT::TNode& statisticsNode)
- : Data_(::MakeIntrusive<TData>(statisticsNode))
- , Filter_(::MakeIntrusive<TFilter>())
-{ }
-
-TJobStatistics::TJobStatistics(::TIntrusivePtr<TData> data, ::TIntrusivePtr<TFilter> filter)
- : Data_(data)
- , Filter_(::MakeIntrusive<TFilter>(*filter))
-{ }
-
-TJobStatistics::TJobStatistics(const TJobStatistics& jobStatistics) = default;
-TJobStatistics::TJobStatistics(TJobStatistics&&) = default;
-
-TJobStatistics& TJobStatistics::operator=(const TJobStatistics& jobStatistics) = default;
-TJobStatistics& TJobStatistics::operator=(TJobStatistics&& jobStatistics) = default;
-
-TJobStatistics::~TJobStatistics() = default;
-
-TJobStatistics TJobStatistics::TaskName(TVector<TTaskName> taskNames) const
-{
- auto newFilter = ::MakeIntrusive<TFilter>(*Filter_);
- newFilter->TaskNameFilter = std::move(taskNames);
- return TJobStatistics(Data_, std::move(newFilter));
-}
-
-TJobStatistics TJobStatistics::JobState(TVector<EJobState> jobStates) const
-{
- auto newFilter = ::MakeIntrusive<TFilter>(*Filter_);
- newFilter->JobStateFilter = std::move(jobStates);
- return TJobStatistics(Data_, std::move(newFilter));
-}
-
-TJobStatistics TJobStatistics::JobType(TVector<EJobType> jobTypes) const
-{
- TVector<TTaskName> taskNames;
- for (auto jobType : jobTypes) {
- taskNames.push_back(JobTypeToTaskName(jobType));
- }
- return TaskName(std::move(taskNames));
-}
-
-bool TJobStatistics::HasStatistics(TStringBuf name) const
-{
- return Data_->Name2State2TaskName2Data.contains(name);
-}
-
-TJobStatisticsEntry<i64> TJobStatistics::GetStatistics(TStringBuf name) const
-{
- return GetStatisticsAs<i64>(name);
-}
-
-TVector<TString> TJobStatistics::GetStatisticsNames() const
-{
- TVector<TString> result;
- result.reserve(Data_->Name2State2TaskName2Data.size());
- for (const auto& entry : Data_->Name2State2TaskName2Data) {
- result.push_back(entry.first);
- }
- return result;
-}
-
-bool TJobStatistics::HasCustomStatistics(TStringBuf name) const
-{
- return HasStatistics(CustomStatisticsNamePrefix_ + name);
-}
-
-TJobStatisticsEntry<i64> TJobStatistics::GetCustomStatistics(TStringBuf name) const
-{
- return GetCustomStatisticsAs<i64>(name);
-}
-
-TVector<TString> TJobStatistics::GetCustomStatisticsNames() const
-{
- TVector<TString> result;
- for (const auto& entry : Data_->Name2State2TaskName2Data) {
- if (entry.first.StartsWith(CustomStatisticsNamePrefix_)) {
- result.push_back(entry.first.substr(CustomStatisticsNamePrefix_.size()));
- }
- }
- return result;
-}
-
-TMaybe<TJobStatistics::TDataEntry> TJobStatistics::GetStatisticsImpl(TStringBuf name) const
-{
- auto name2State2TaskName2DataIt = Data_->Name2State2TaskName2Data.find(name);
- Y_ENSURE(
- name2State2TaskName2DataIt != Data_->Name2State2TaskName2Data.end(),
- "Statistics '" << name << "' are missing");
- const auto& state2TaskName2Data = name2State2TaskName2DataIt->second;
-
- TMaybe<TDataEntry> result;
- auto aggregate = [&] (const TDataEntry& data) {
- if (result) {
- TData::Aggregate(&result.GetRef(), data);
- } else {
- result = data;
- }
- };
-
- auto aggregateTaskName2Data = [&] (const TData::TTaskName2Data& taskName2Data) {
- if (Filter_->TaskNameFilter.empty()) {
- for (const auto& [taskName, data] : taskName2Data) {
- aggregate(data);
- }
- } else {
- for (const auto& taskName : Filter_->TaskNameFilter) {
- auto it = taskName2Data.find(taskName.Get());
- if (it == taskName2Data.end()) {
- continue;
- }
- const auto& data = it->second;
- aggregate(data);
- }
- }
- };
-
- if (Filter_->JobStateFilter.empty()) {
- for (const auto& [state, taskName2Data] : state2TaskName2Data) {
- aggregateTaskName2Data(taskName2Data);
- }
- } else {
- for (auto state : Filter_->JobStateFilter) {
- auto it = state2TaskName2Data.find(state);
- if (it == state2TaskName2Data.end()) {
- continue;
- }
- const auto& taskName2Data = it->second;
- aggregateTaskName2Data(taskName2Data);
- }
- }
-
- return result;
-}
-
-////////////////////////////////////////////////////////////////////
-
-namespace {
-
-constexpr int USER_STATISTICS_FILE_DESCRIPTOR = 5;
-constexpr char PATH_DELIMITER = '/';
-constexpr char ESCAPE = '\\';
-
-IOutputStream* GetStatisticsStream()
-{
- static TFile file = Duplicate(USER_STATISTICS_FILE_DESCRIPTOR);
- static TFileOutput stream(file);
- return &stream;
-}
-
-template <typename T>
-void WriteCustomStatisticsAny(TStringBuf path, const T& value)
-{
- ::NYson::TYsonWriter writer(GetStatisticsStream(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::ListFragment);
- int depth = 0;
- size_t begin = 0;
- size_t end = 0;
- TVector<TString> items;
- while (end <= path.size()) {
- if (end + 1 < path.size() && path[end] == ESCAPE && path[end + 1] == PATH_DELIMITER) {
- end += 2;
- continue;
- }
- if (end == path.size() || path[end] == PATH_DELIMITER) {
- writer.OnBeginMap();
- items.emplace_back(path.data() + begin, end - begin);
- SubstGlobal(items.back(), "\\/", "/");
- writer.OnKeyedItem(TStringBuf(items.back()));
- ++depth;
- begin = end + 1;
- }
- ++end;
- }
- Serialize(value, &writer);
- while (depth > 0) {
- writer.OnEndMap();
- --depth;
- }
-}
-
-}
-
-////////////////////////////////////////////////////////////////////
-
-void WriteCustomStatistics(const TNode& statistics)
-{
- ::NYson::TYsonWriter writer(GetStatisticsStream(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::ListFragment);
- Serialize(statistics, &writer);
-}
-
-void WriteCustomStatistics(TStringBuf path, i64 value)
-{
- WriteCustomStatisticsAny(path, value);
-}
-
-void FlushCustomStatisticsStream() {
- GetStatisticsStream()->Flush();
-}
-////////////////////////////////////////////////////////////////////
-
-} // namespace NYT