diff options
author | Maxim Kovalev <maxkovalev@ydb.tech> | 2023-12-21 15:13:28 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-21 15:13:28 +0300 |
commit | 3facfc056204106d2436a8178dd992e7adbbf315 (patch) | |
tree | b58d00a45424c08d4967a3b58a369a8dec9f5a25 | |
parent | b97c74bcf1cf0f259b62d40cc3698eaae1fc530a (diff) | |
download | ydb-3facfc056204106d2436a8178dd992e7adbbf315.tar.gz |
YQL-17250: Add operation information into hybrid statistics (#597)
* YQL-17250: Add operation information into hybrid statistics
* Fix review comments
6 files changed, 67 insertions, 24 deletions
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index 00a7ea0662..634a2a7066 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -1264,14 +1264,16 @@ void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& sta writer.OnEndMap(); } -void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics) { +void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics, bool addExternalMap) { if (statistics.empty()) { return; } THashMap<TString, std::tuple<i64, i64, i64, TMaybe<i64>>> total; // sum, count, max, min - writer.OnBeginMap(); + if (addExternalMap) { + writer.OnBeginMap(); + } for (const auto& opStatistics : statistics) { for (auto& el : opStatistics.second.Entries) { @@ -1331,8 +1333,9 @@ void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap< writer.OnEndMap(); } writer.OnEndMap(); // total - - writer.OnEndMap(); + if (addExternalMap) { + writer.OnEndMap(); + } } bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx) { diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h index e5f8d8e034..c4d8fa771b 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.h +++ b/ydb/library/yql/providers/common/provider/yql_provider.h @@ -179,7 +179,7 @@ void WriteStreams(NYson::TYsonWriter& writer, TStringBuf name, const NNodes::TCo double GetDataReplicationFactor(const TExprNode& lambda, TExprContext& ctx); -void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics); +void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics, bool addExternalMap = true); void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& statistics); bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx); diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp index b76aeaba4d..ac8b7b9d44 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp @@ -160,7 +160,16 @@ public: return false; } - NCommon::WriteStatistics(writer, totalOnly, State_->Statistics); + writer.OnBeginMap(); + NCommon::WriteStatistics(writer, totalOnly, State_->Statistics, false); + writer.OnKeyedItem("Hybrid"); + writer.OnBeginMap(); + for (const auto& [opName, stats] : State_->HybridStatistics) { + writer.OnKeyedItem(opName); + NCommon::WriteStatistics(writer, totalOnly, {{0, stats}}); + } + writer.OnEndMap(); + writer.OnEndMap(); return true; } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp index 617a719770..74e69aed08 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp @@ -57,21 +57,23 @@ bool NeedFallback(const TIssues& issues) { TIssue WrapIssuesOnHybridFallback(TPosition pos, const TIssues& issues) { TIssue result(pos, "Hybrid execution fallback on YT"); - result.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING); + result.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_INFO); - const std::function<void(TIssue& issue)> toWarning = [&](TIssue& issue) { - if (issue.Severity == TSeverityIds::S_ERROR || issue.Severity == TSeverityIds::S_FATAL) { - issue.Severity = TSeverityIds::S_WARNING; + const std::function<void(TIssue& issue)> toInfo = [&](TIssue& issue) { + if (issue.Severity == TSeverityIds::S_ERROR + || issue.Severity == TSeverityIds::S_FATAL + || issue.Severity == TSeverityIds::S_WARNING) { + issue.Severity = TSeverityIds::S_INFO; } for (const auto& subissue : issue.GetSubIssues()) { - toWarning(*subissue); + toInfo(*subissue); } }; for (const auto& issue : issues) { - TIssuePtr warning(new TIssue(issue)); - toWarning(*warning); - result.AddSubIssue(std::move(warning)); + TIssuePtr info(new TIssue(issue)); + toInfo(*info); + result.AddSubIssue(std::move(info)); } return result; @@ -125,6 +127,11 @@ private: static TExprNode::TPtr FinalizeOutputOp(const TYtState::TPtr& state, const TString& operationHash, const IYtGateway::TRunResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, bool markFinished) { + if (markFinished && !TYtDqProcessWrite::Match(input.Get())) { + with_lock(state->StatisticsMutex) { + state->HybridStatistics[input->Content()].Entries.emplace_back(TString{"YtExecution"}, 0, 0, 0, 0, 1); + } + } auto outSection = TYtOutputOpBase(input).Output(); YQL_ENSURE(outSection.Size() == res.OutTableStats.size(), "Invalid output table count in IYtGateway::TRunResult"); TExprNode::TListType newOutTables; @@ -286,14 +293,21 @@ private: State_->Statistics[Max<ui32>()].Entries.emplace_back(TString{name}, 0, 0, 0, 0, 1); } }; + auto hybridStatWriter = [this](TStringBuf statName, TStringBuf opName) { + with_lock(State_->StatisticsMutex) { + State_->HybridStatistics[opName].Entries.emplace_back(TString{statName}, 0, 0, 0, 0, 1); + } + }; switch (input->Head().GetState()) { case TExprNode::EState::ExecutionComplete: statWriter("HybridExecution"); + hybridStatWriter("Execution", input->TailPtr()->Content()); output = input->HeadPtr(); break; case TExprNode::EState::Error: { statWriter("HybridFallback"); + hybridStatWriter("Fallback", input->TailPtr()->Content()); if (State_->Configuration->HybridDqExecutionFallback.Get().GetOrElse(true)) { output = input->TailPtr(); } else { diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp index 5ca774525f..7f79548d85 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp @@ -275,10 +275,12 @@ private: if (const auto stat = CanReadHybrid(sort.Input().Item(0))) { if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) { YQL_CLOG(INFO, ProviderYt) << "Sort on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("Hybrid_Sort_try"); + PushStat("HybridTry"); + PushHybridStat("Try", node.Raw()->Content()); return MakeYtSortByDq(sort, ctx); } - PushStat("Hybrid_Sort_over_limits"); + PushStat("HybridSkipOverLimits"); + PushHybridStat("SkipOverLimits", node.Raw()->Content()); } } } @@ -294,10 +296,12 @@ private: if (const auto stat = CanReadHybrid(merge.Input().Item(0))) { if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) { YQL_CLOG(INFO, ProviderYt) << "Merge on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("Hybrid_Merge_try"); + PushStat("HybridTry"); + PushHybridStat("Try", node.Raw()->Content()); return MakeYtSortByDq(merge, ctx); } - PushStat("Hybrid_Merge_over_limits"); + PushStat("HybridSkipOverLimits"); + PushHybridStat("SkipOverLimits", node.Raw()->Content()); } } } @@ -347,7 +351,8 @@ private: if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) { if (CanExecuteInHybrid(map.Mapper().Ptr(), chunksLimit, sizeLimit)) { YQL_CLOG(INFO, ProviderYt) << "Map on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("Hybrid_Map_try"); + PushStat("HybridTry"); + PushHybridStat("Try", node.Raw()->Content()); TSyncMap syncList; const auto& paths = map.Input().Item(0).Paths(); for (auto i = 0U; i < paths.Size(); ++i) { @@ -430,7 +435,8 @@ private: .Done(); } } - PushStat("Hybrid_Map_over_limits"); + PushStat("HybridOverLimits"); + PushHybridStat("SkipOverLimits", node.Raw()->Content()); } } @@ -626,12 +632,14 @@ private: if (CanExecuteInHybrid(reduce.Reducer().Ptr(), chunksLimit, sizeLimit)) { if (ETypeAnnotationKind::Struct == GetSeqItemType(*reduce.Reducer().Args().Arg(0).Ref().GetTypeAnn()).GetKind()) { YQL_CLOG(INFO, ProviderYt) << "Reduce on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("Hybrid_Reduce_try"); + PushStat("HybridTry"); + PushHybridStat("Try", node.Raw()->Content()); return MakeYtReduceByDq(reduce, ctx); } } } - PushStat("Hybrid_Reduce_over_limits"); + PushStat("HybridSkipOverLimits"); + PushHybridStat("SkipOverLimits", node.Raw()->Content()); } } @@ -647,12 +655,14 @@ private: if (CanExecuteInHybrid(mapReduce.Reducer().Ptr(), chunksLimit, sizeLimit) && CanExecuteInHybrid(mapReduce.Mapper().Ptr(), chunksLimit, sizeLimit)) { if (ETypeAnnotationKind::Struct == GetSeqItemType(*mapReduce.Reducer().Args().Arg(0).Ref().GetTypeAnn()).GetKind()) { YQL_CLOG(INFO, ProviderYt) << "MapReduce on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("Hybrid_MapReduce_try"); + PushHybridStat("Try", node.Raw()->Content()); + PushStat("HybridTry"); return MakeYtReduceByDq(mapReduce, ctx); } } } - PushStat("Hybrid_MapReduce_over_limits"); + PushHybridStat("SkipOverLimits", node.Raw()->Content()); + PushStat("HybridOverLimits"); } } @@ -665,6 +675,12 @@ private: } }; + void PushHybridStat(TStringBuf statName, TStringBuf opName) const { + with_lock(State_->StatisticsMutex) { + State_->HybridStatistics[opName].Entries.emplace_back(TString{statName}, 0, 0, 0, 0, 1); + } + }; + const TYtState::TPtr State_; const THolder<IGraphTransformer> Finalizer_; }; diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider.h b/ydb/library/yql/providers/yt/provider/yql_yt_provider.h index 90ad1d58fa..06d9d8f747 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_provider.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider.h @@ -96,6 +96,7 @@ struct TYtState : public TThrRefBase { THashMap<std::pair<TString, TString>, TString> AnonymousLabels; // cluster + label -> name std::unordered_map<ui64, TString> NodeHash; // unique id -> hash THashMap<ui32, TOperationStatistics> Statistics; // public id -> stat + THashMap<TString, TOperationStatistics> HybridStatistics; // operation name -> stat TMutex StatisticsMutex; THashSet<std::pair<TString, TString>> Checkpoints; // Set of checkpoint tables THolder<IDqIntegration> DqIntegration_; |