diff options
| author | max42 <[email protected]> | 2023-06-30 11:13:34 +0300 |
|---|---|---|
| committer | max42 <[email protected]> | 2023-06-30 11:13:34 +0300 |
| commit | 3e1899838408bbad47622007aa382bc8a2b01f87 (patch) | |
| tree | 0f21c1e6add187ddb6c3ccc048a7d640ce03fb87 /yt/cpp/mapreduce/interface/job_statistics.cpp | |
| parent | 5463eb3f5e72a86f858a3d27c886470a724ede34 (diff) | |
Revert "YT-19324: move YT provider to ydb/library/yql"
This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing
changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12.
Diffstat (limited to 'yt/cpp/mapreduce/interface/job_statistics.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/interface/job_statistics.cpp | 361 |
1 files changed, 0 insertions, 361 deletions
diff --git a/yt/cpp/mapreduce/interface/job_statistics.cpp b/yt/cpp/mapreduce/interface/job_statistics.cpp deleted file mode 100644 index bd9791672d4..00000000000 --- a/yt/cpp/mapreduce/interface/job_statistics.cpp +++ /dev/null @@ -1,361 +0,0 @@ -#include "job_statistics.h" - -#include "operation.h" - -#include <library/cpp/yson/node/node.h> -#include <library/cpp/yson/node/serialize.h> - -#include <library/cpp/yson/writer.h> - -#include <util/datetime/base.h> -#include <util/generic/hash_set.h> -#include <util/generic/ptr.h> -#include <util/stream/file.h> -#include <util/string/cast.h> -#include <util/string/subst.h> -#include <util/system/file.h> - -namespace NYT { - -//////////////////////////////////////////////////////////////////// - -template <> -i64 ConvertJobStatisticsEntry(i64 value) -{ - return value; -} - -template <> -TDuration ConvertJobStatisticsEntry(i64 value) -{ - return TDuration::MilliSeconds(value); -} - -//////////////////////////////////////////////////////////////////// - -static TTaskName JobTypeToTaskName(EJobType jobType) -{ - switch (jobType) { - case EJobType::PartitionMap: - return ETaskName::PartitionMap0; - case EJobType::Partition: - return ETaskName::Partition0; - default: - return ToString(jobType); - } -} - -static TTaskName FixTaskName(TString taskName) -{ - if (taskName == "partition") { - return ETaskName::Partition0; - } else if (taskName == "partition_map") { - return ETaskName::PartitionMap0; - } - return taskName; -} - -//////////////////////////////////////////////////////////////////// - -class TJobStatistics::TData - : public TThrRefBase -{ -public: - using TTaskName2Data = THashMap<TString, TJobStatistics::TDataEntry>; - using TState2TaskName2Data = THashMap<EJobState, TTaskName2Data>; - using TName2State2TaskName2Data = THashMap<TString, TState2TaskName2Data>; - -public: - TName2State2TaskName2Data Name2State2TaskName2Data; - -public: - TData() = default; - - TData(const TNode& statisticsNode) - { - ParseNode(statisticsNode, TString(), &Name2State2TaskName2Data); - } - - static void Aggregate(TJobStatistics::TDataEntry* result, const TJobStatistics::TDataEntry& other) - { - result->Max = Max(result->Max, other.Max); - result->Min = Min(result->Min, other.Min); - result->Sum += other.Sum; - result->Count += other.Count; - } - - static void ParseNode(const TNode& node, TState2TaskName2Data* output) - { - auto getInt = [] (const TNode& theNode, TStringBuf key) { - const auto& nodeAsMap = theNode.AsMap(); - auto it = nodeAsMap.find(key); - if (it == nodeAsMap.end()) { - ythrow yexception() << "Key '" << key << "' is not found"; - } - const auto& valueNode = it->second; - if (!valueNode.IsInt64()) { - ythrow yexception() << "Key '" << key << "' is not of int64 type"; - } - return valueNode.AsInt64(); - }; - - for (const auto& [stateStr, taskName2DataNode] : node.AsMap()) { - EJobState state; - if (!TryFromString(stateStr, state)) { - continue; - } - for (const auto& [taskName, dataNode] : taskName2DataNode.AsMap()) { - auto fixedTaskName = FixTaskName(taskName); - auto& data = (*output)[state][fixedTaskName.Get()]; - data.Max = getInt(dataNode, "max"); - data.Min = getInt(dataNode, "min"); - data.Sum = getInt(dataNode, "sum"); - data.Count = getInt(dataNode, "count"); - } - } - } - - static void ParseNode(const TNode& node, const TString& curPath, TName2State2TaskName2Data* output) - { - Y_VERIFY(node.IsMap()); - - for (const auto& [key, value] : node.AsMap()) { - if (key == "$"sv) { - ParseNode(value, &(*output)[curPath]); - } else { - TString childPath = curPath; - if (!childPath.empty()) { - childPath.push_back('/'); - } - if (key.find_first_of('/') != key.npos) { - TString keyCopy(key); - SubstGlobal(keyCopy, "/", "\\/"); - childPath += keyCopy; - } else { - childPath += key; - } - ParseNode(value, childPath, output); - } - } - } -}; - -//////////////////////////////////////////////////////////////////// - -struct TJobStatistics::TFilter - : public TThrRefBase -{ - TVector<TTaskName> TaskNameFilter; - TVector<EJobState> JobStateFilter = {EJobState::Completed}; -}; - -//////////////////////////////////////////////////////////////////// - -const TString TJobStatistics::CustomStatisticsNamePrefix_ = "custom/"; - -TJobStatistics::TJobStatistics() - : Data_(::MakeIntrusive<TData>()) - , Filter_(::MakeIntrusive<TFilter>()) -{ } - - -TJobStatistics::TJobStatistics(const NYT::TNode& statisticsNode) - : Data_(::MakeIntrusive<TData>(statisticsNode)) - , Filter_(::MakeIntrusive<TFilter>()) -{ } - -TJobStatistics::TJobStatistics(::TIntrusivePtr<TData> data, ::TIntrusivePtr<TFilter> filter) - : Data_(data) - , Filter_(::MakeIntrusive<TFilter>(*filter)) -{ } - -TJobStatistics::TJobStatistics(const TJobStatistics& jobStatistics) = default; -TJobStatistics::TJobStatistics(TJobStatistics&&) = default; - -TJobStatistics& TJobStatistics::operator=(const TJobStatistics& jobStatistics) = default; -TJobStatistics& TJobStatistics::operator=(TJobStatistics&& jobStatistics) = default; - -TJobStatistics::~TJobStatistics() = default; - -TJobStatistics TJobStatistics::TaskName(TVector<TTaskName> taskNames) const -{ - auto newFilter = ::MakeIntrusive<TFilter>(*Filter_); - newFilter->TaskNameFilter = std::move(taskNames); - return TJobStatistics(Data_, std::move(newFilter)); -} - -TJobStatistics TJobStatistics::JobState(TVector<EJobState> jobStates) const -{ - auto newFilter = ::MakeIntrusive<TFilter>(*Filter_); - newFilter->JobStateFilter = std::move(jobStates); - return TJobStatistics(Data_, std::move(newFilter)); -} - -TJobStatistics TJobStatistics::JobType(TVector<EJobType> jobTypes) const -{ - TVector<TTaskName> taskNames; - for (auto jobType : jobTypes) { - taskNames.push_back(JobTypeToTaskName(jobType)); - } - return TaskName(std::move(taskNames)); -} - -bool TJobStatistics::HasStatistics(TStringBuf name) const -{ - return Data_->Name2State2TaskName2Data.contains(name); -} - -TJobStatisticsEntry<i64> TJobStatistics::GetStatistics(TStringBuf name) const -{ - return GetStatisticsAs<i64>(name); -} - -TVector<TString> TJobStatistics::GetStatisticsNames() const -{ - TVector<TString> result; - result.reserve(Data_->Name2State2TaskName2Data.size()); - for (const auto& entry : Data_->Name2State2TaskName2Data) { - result.push_back(entry.first); - } - return result; -} - -bool TJobStatistics::HasCustomStatistics(TStringBuf name) const -{ - return HasStatistics(CustomStatisticsNamePrefix_ + name); -} - -TJobStatisticsEntry<i64> TJobStatistics::GetCustomStatistics(TStringBuf name) const -{ - return GetCustomStatisticsAs<i64>(name); -} - -TVector<TString> TJobStatistics::GetCustomStatisticsNames() const -{ - TVector<TString> result; - for (const auto& entry : Data_->Name2State2TaskName2Data) { - if (entry.first.StartsWith(CustomStatisticsNamePrefix_)) { - result.push_back(entry.first.substr(CustomStatisticsNamePrefix_.size())); - } - } - return result; -} - -TMaybe<TJobStatistics::TDataEntry> TJobStatistics::GetStatisticsImpl(TStringBuf name) const -{ - auto name2State2TaskName2DataIt = Data_->Name2State2TaskName2Data.find(name); - Y_ENSURE( - name2State2TaskName2DataIt != Data_->Name2State2TaskName2Data.end(), - "Statistics '" << name << "' are missing"); - const auto& state2TaskName2Data = name2State2TaskName2DataIt->second; - - TMaybe<TDataEntry> result; - auto aggregate = [&] (const TDataEntry& data) { - if (result) { - TData::Aggregate(&result.GetRef(), data); - } else { - result = data; - } - }; - - auto aggregateTaskName2Data = [&] (const TData::TTaskName2Data& taskName2Data) { - if (Filter_->TaskNameFilter.empty()) { - for (const auto& [taskName, data] : taskName2Data) { - aggregate(data); - } - } else { - for (const auto& taskName : Filter_->TaskNameFilter) { - auto it = taskName2Data.find(taskName.Get()); - if (it == taskName2Data.end()) { - continue; - } - const auto& data = it->second; - aggregate(data); - } - } - }; - - if (Filter_->JobStateFilter.empty()) { - for (const auto& [state, taskName2Data] : state2TaskName2Data) { - aggregateTaskName2Data(taskName2Data); - } - } else { - for (auto state : Filter_->JobStateFilter) { - auto it = state2TaskName2Data.find(state); - if (it == state2TaskName2Data.end()) { - continue; - } - const auto& taskName2Data = it->second; - aggregateTaskName2Data(taskName2Data); - } - } - - return result; -} - -//////////////////////////////////////////////////////////////////// - -namespace { - -constexpr int USER_STATISTICS_FILE_DESCRIPTOR = 5; -constexpr char PATH_DELIMITER = '/'; -constexpr char ESCAPE = '\\'; - -IOutputStream* GetStatisticsStream() -{ - static TFile file = Duplicate(USER_STATISTICS_FILE_DESCRIPTOR); - static TFileOutput stream(file); - return &stream; -} - -template <typename T> -void WriteCustomStatisticsAny(TStringBuf path, const T& value) -{ - ::NYson::TYsonWriter writer(GetStatisticsStream(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::ListFragment); - int depth = 0; - size_t begin = 0; - size_t end = 0; - TVector<TString> items; - while (end <= path.size()) { - if (end + 1 < path.size() && path[end] == ESCAPE && path[end + 1] == PATH_DELIMITER) { - end += 2; - continue; - } - if (end == path.size() || path[end] == PATH_DELIMITER) { - writer.OnBeginMap(); - items.emplace_back(path.data() + begin, end - begin); - SubstGlobal(items.back(), "\\/", "/"); - writer.OnKeyedItem(TStringBuf(items.back())); - ++depth; - begin = end + 1; - } - ++end; - } - Serialize(value, &writer); - while (depth > 0) { - writer.OnEndMap(); - --depth; - } -} - -} - -//////////////////////////////////////////////////////////////////// - -void WriteCustomStatistics(const TNode& statistics) -{ - ::NYson::TYsonWriter writer(GetStatisticsStream(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::ListFragment); - Serialize(statistics, &writer); -} - -void WriteCustomStatistics(TStringBuf path, i64 value) -{ - WriteCustomStatisticsAny(path, value); -} - -void FlushCustomStatisticsStream() { - GetStatisticsStream()->Flush(); -} -//////////////////////////////////////////////////////////////////// - -} // namespace NYT |
