diff options
| author | max42 <[email protected]> | 2023-06-30 03:37:03 +0300 |
|---|---|---|
| committer | max42 <[email protected]> | 2023-06-30 03:37:03 +0300 |
| commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
| tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/interface/job_statistics.cpp | |
| parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/interface/job_statistics.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/interface/job_statistics.cpp | 361 |
1 files changed, 361 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/interface/job_statistics.cpp b/yt/cpp/mapreduce/interface/job_statistics.cpp new file mode 100644 index 00000000000..bd9791672d4 --- /dev/null +++ b/yt/cpp/mapreduce/interface/job_statistics.cpp @@ -0,0 +1,361 @@ +#include "job_statistics.h" + +#include "operation.h" + +#include <library/cpp/yson/node/node.h> +#include <library/cpp/yson/node/serialize.h> + +#include <library/cpp/yson/writer.h> + +#include <util/datetime/base.h> +#include <util/generic/hash_set.h> +#include <util/generic/ptr.h> +#include <util/stream/file.h> +#include <util/string/cast.h> +#include <util/string/subst.h> +#include <util/system/file.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////// + +template <> +i64 ConvertJobStatisticsEntry(i64 value) +{ + return value; +} + +template <> +TDuration ConvertJobStatisticsEntry(i64 value) +{ + return TDuration::MilliSeconds(value); +} + +//////////////////////////////////////////////////////////////////// + +static TTaskName JobTypeToTaskName(EJobType jobType) +{ + switch (jobType) { + case EJobType::PartitionMap: + return ETaskName::PartitionMap0; + case EJobType::Partition: + return ETaskName::Partition0; + default: + return ToString(jobType); + } +} + +static TTaskName FixTaskName(TString taskName) +{ + if (taskName == "partition") { + return ETaskName::Partition0; + } else if (taskName == "partition_map") { + return ETaskName::PartitionMap0; + } + return taskName; +} + +//////////////////////////////////////////////////////////////////// + +class TJobStatistics::TData + : public TThrRefBase +{ +public: + using TTaskName2Data = THashMap<TString, TJobStatistics::TDataEntry>; + using TState2TaskName2Data = THashMap<EJobState, TTaskName2Data>; + using TName2State2TaskName2Data = THashMap<TString, TState2TaskName2Data>; + +public: + TName2State2TaskName2Data Name2State2TaskName2Data; + +public: + TData() = default; + + TData(const TNode& statisticsNode) + { + ParseNode(statisticsNode, TString(), &Name2State2TaskName2Data); + } + + static void Aggregate(TJobStatistics::TDataEntry* result, const TJobStatistics::TDataEntry& other) + { + result->Max = Max(result->Max, other.Max); + result->Min = Min(result->Min, other.Min); + result->Sum += other.Sum; + result->Count += other.Count; + } + + static void ParseNode(const TNode& node, TState2TaskName2Data* output) + { + auto getInt = [] (const TNode& theNode, TStringBuf key) { + const auto& nodeAsMap = theNode.AsMap(); + auto it = nodeAsMap.find(key); + if (it == nodeAsMap.end()) { + ythrow yexception() << "Key '" << key << "' is not found"; + } + const auto& valueNode = it->second; + if (!valueNode.IsInt64()) { + ythrow yexception() << "Key '" << key << "' is not of int64 type"; + } + return valueNode.AsInt64(); + }; + + for (const auto& [stateStr, taskName2DataNode] : node.AsMap()) { + EJobState state; + if (!TryFromString(stateStr, state)) { + continue; + } + for (const auto& [taskName, dataNode] : taskName2DataNode.AsMap()) { + auto fixedTaskName = FixTaskName(taskName); + auto& data = (*output)[state][fixedTaskName.Get()]; + data.Max = getInt(dataNode, "max"); + data.Min = getInt(dataNode, "min"); + data.Sum = getInt(dataNode, "sum"); + data.Count = getInt(dataNode, "count"); + } + } + } + + static void ParseNode(const TNode& node, const TString& curPath, TName2State2TaskName2Data* output) + { + Y_VERIFY(node.IsMap()); + + for (const auto& [key, value] : node.AsMap()) { + if (key == "$"sv) { + ParseNode(value, &(*output)[curPath]); + } else { + TString childPath = curPath; + if (!childPath.empty()) { + childPath.push_back('/'); + } + if (key.find_first_of('/') != key.npos) { + TString keyCopy(key); + SubstGlobal(keyCopy, "/", "\\/"); + childPath += keyCopy; + } else { + childPath += key; + } + ParseNode(value, childPath, output); + } + } + } +}; + +//////////////////////////////////////////////////////////////////// + +struct TJobStatistics::TFilter + : public TThrRefBase +{ + TVector<TTaskName> TaskNameFilter; + TVector<EJobState> JobStateFilter = {EJobState::Completed}; +}; + +//////////////////////////////////////////////////////////////////// + +const TString TJobStatistics::CustomStatisticsNamePrefix_ = "custom/"; + +TJobStatistics::TJobStatistics() + : Data_(::MakeIntrusive<TData>()) + , Filter_(::MakeIntrusive<TFilter>()) +{ } + + +TJobStatistics::TJobStatistics(const NYT::TNode& statisticsNode) + : Data_(::MakeIntrusive<TData>(statisticsNode)) + , Filter_(::MakeIntrusive<TFilter>()) +{ } + +TJobStatistics::TJobStatistics(::TIntrusivePtr<TData> data, ::TIntrusivePtr<TFilter> filter) + : Data_(data) + , Filter_(::MakeIntrusive<TFilter>(*filter)) +{ } + +TJobStatistics::TJobStatistics(const TJobStatistics& jobStatistics) = default; +TJobStatistics::TJobStatistics(TJobStatistics&&) = default; + +TJobStatistics& TJobStatistics::operator=(const TJobStatistics& jobStatistics) = default; +TJobStatistics& TJobStatistics::operator=(TJobStatistics&& jobStatistics) = default; + +TJobStatistics::~TJobStatistics() = default; + +TJobStatistics TJobStatistics::TaskName(TVector<TTaskName> taskNames) const +{ + auto newFilter = ::MakeIntrusive<TFilter>(*Filter_); + newFilter->TaskNameFilter = std::move(taskNames); + return TJobStatistics(Data_, std::move(newFilter)); +} + +TJobStatistics TJobStatistics::JobState(TVector<EJobState> jobStates) const +{ + auto newFilter = ::MakeIntrusive<TFilter>(*Filter_); + newFilter->JobStateFilter = std::move(jobStates); + return TJobStatistics(Data_, std::move(newFilter)); +} + +TJobStatistics TJobStatistics::JobType(TVector<EJobType> jobTypes) const +{ + TVector<TTaskName> taskNames; + for (auto jobType : jobTypes) { + taskNames.push_back(JobTypeToTaskName(jobType)); + } + return TaskName(std::move(taskNames)); +} + +bool TJobStatistics::HasStatistics(TStringBuf name) const +{ + return Data_->Name2State2TaskName2Data.contains(name); +} + +TJobStatisticsEntry<i64> TJobStatistics::GetStatistics(TStringBuf name) const +{ + return GetStatisticsAs<i64>(name); +} + +TVector<TString> TJobStatistics::GetStatisticsNames() const +{ + TVector<TString> result; + result.reserve(Data_->Name2State2TaskName2Data.size()); + for (const auto& entry : Data_->Name2State2TaskName2Data) { + result.push_back(entry.first); + } + return result; +} + +bool TJobStatistics::HasCustomStatistics(TStringBuf name) const +{ + return HasStatistics(CustomStatisticsNamePrefix_ + name); +} + +TJobStatisticsEntry<i64> TJobStatistics::GetCustomStatistics(TStringBuf name) const +{ + return GetCustomStatisticsAs<i64>(name); +} + +TVector<TString> TJobStatistics::GetCustomStatisticsNames() const +{ + TVector<TString> result; + for (const auto& entry : Data_->Name2State2TaskName2Data) { + if (entry.first.StartsWith(CustomStatisticsNamePrefix_)) { + result.push_back(entry.first.substr(CustomStatisticsNamePrefix_.size())); + } + } + return result; +} + +TMaybe<TJobStatistics::TDataEntry> TJobStatistics::GetStatisticsImpl(TStringBuf name) const +{ + auto name2State2TaskName2DataIt = Data_->Name2State2TaskName2Data.find(name); + Y_ENSURE( + name2State2TaskName2DataIt != Data_->Name2State2TaskName2Data.end(), + "Statistics '" << name << "' are missing"); + const auto& state2TaskName2Data = name2State2TaskName2DataIt->second; + + TMaybe<TDataEntry> result; + auto aggregate = [&] (const TDataEntry& data) { + if (result) { + TData::Aggregate(&result.GetRef(), data); + } else { + result = data; + } + }; + + auto aggregateTaskName2Data = [&] (const TData::TTaskName2Data& taskName2Data) { + if (Filter_->TaskNameFilter.empty()) { + for (const auto& [taskName, data] : taskName2Data) { + aggregate(data); + } + } else { + for (const auto& taskName : Filter_->TaskNameFilter) { + auto it = taskName2Data.find(taskName.Get()); + if (it == taskName2Data.end()) { + continue; + } + const auto& data = it->second; + aggregate(data); + } + } + }; + + if (Filter_->JobStateFilter.empty()) { + for (const auto& [state, taskName2Data] : state2TaskName2Data) { + aggregateTaskName2Data(taskName2Data); + } + } else { + for (auto state : Filter_->JobStateFilter) { + auto it = state2TaskName2Data.find(state); + if (it == state2TaskName2Data.end()) { + continue; + } + const auto& taskName2Data = it->second; + aggregateTaskName2Data(taskName2Data); + } + } + + return result; +} + +//////////////////////////////////////////////////////////////////// + +namespace { + +constexpr int USER_STATISTICS_FILE_DESCRIPTOR = 5; +constexpr char PATH_DELIMITER = '/'; +constexpr char ESCAPE = '\\'; + +IOutputStream* GetStatisticsStream() +{ + static TFile file = Duplicate(USER_STATISTICS_FILE_DESCRIPTOR); + static TFileOutput stream(file); + return &stream; +} + +template <typename T> +void WriteCustomStatisticsAny(TStringBuf path, const T& value) +{ + ::NYson::TYsonWriter writer(GetStatisticsStream(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::ListFragment); + int depth = 0; + size_t begin = 0; + size_t end = 0; + TVector<TString> items; + while (end <= path.size()) { + if (end + 1 < path.size() && path[end] == ESCAPE && path[end + 1] == PATH_DELIMITER) { + end += 2; + continue; + } + if (end == path.size() || path[end] == PATH_DELIMITER) { + writer.OnBeginMap(); + items.emplace_back(path.data() + begin, end - begin); + SubstGlobal(items.back(), "\\/", "/"); + writer.OnKeyedItem(TStringBuf(items.back())); + ++depth; + begin = end + 1; + } + ++end; + } + Serialize(value, &writer); + while (depth > 0) { + writer.OnEndMap(); + --depth; + } +} + +} + +//////////////////////////////////////////////////////////////////// + +void WriteCustomStatistics(const TNode& statistics) +{ + ::NYson::TYsonWriter writer(GetStatisticsStream(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::ListFragment); + Serialize(statistics, &writer); +} + +void WriteCustomStatistics(TStringBuf path, i64 value) +{ + WriteCustomStatisticsAny(path, value); +} + +void FlushCustomStatisticsStream() { + GetStatisticsStream()->Flush(); +} +//////////////////////////////////////////////////////////////////// + +} // namespace NYT |
