aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaxim Kovalev <maxkovalev@ydb.tech>2023-12-21 15:13:28 +0300
committerGitHub <noreply@github.com>2023-12-21 15:13:28 +0300
commit3facfc056204106d2436a8178dd992e7adbbf315 (patch)
treeb58d00a45424c08d4967a3b58a369a8dec9f5a25
parentb97c74bcf1cf0f259b62d40cc3698eaae1fc530a (diff)
downloadydb-3facfc056204106d2436a8178dd992e7adbbf315.tar.gz
YQL-17250: Add operation information into hybrid statistics (#597)
* YQL-17250: Add operation information into hybrid statistics * Fix review comments
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.cpp11
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.h2
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp11
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp30
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp36
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_provider.h1
6 files changed, 67 insertions, 24 deletions
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp
index 00a7ea0662..634a2a7066 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.cpp
+++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp
@@ -1264,14 +1264,16 @@ void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& sta
writer.OnEndMap();
}
-void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics) {
+void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics, bool addExternalMap) {
if (statistics.empty()) {
return;
}
THashMap<TString, std::tuple<i64, i64, i64, TMaybe<i64>>> total; // sum, count, max, min
- writer.OnBeginMap();
+ if (addExternalMap) {
+ writer.OnBeginMap();
+ }
for (const auto& opStatistics : statistics) {
for (auto& el : opStatistics.second.Entries) {
@@ -1331,8 +1333,9 @@ void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<
writer.OnEndMap();
}
writer.OnEndMap(); // total
-
- writer.OnEndMap();
+ if (addExternalMap) {
+ writer.OnEndMap();
+ }
}
bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx) {
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h
index e5f8d8e034..c4d8fa771b 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.h
+++ b/ydb/library/yql/providers/common/provider/yql_provider.h
@@ -179,7 +179,7 @@ void WriteStreams(NYson::TYsonWriter& writer, TStringBuf name, const NNodes::TCo
double GetDataReplicationFactor(const TExprNode& lambda, TExprContext& ctx);
-void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics);
+void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics, bool addExternalMap = true);
void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& statistics);
bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx);
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp
index b76aeaba4d..ac8b7b9d44 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp
@@ -160,7 +160,16 @@ public:
return false;
}
- NCommon::WriteStatistics(writer, totalOnly, State_->Statistics);
+ writer.OnBeginMap();
+ NCommon::WriteStatistics(writer, totalOnly, State_->Statistics, false);
+ writer.OnKeyedItem("Hybrid");
+ writer.OnBeginMap();
+ for (const auto& [opName, stats] : State_->HybridStatistics) {
+ writer.OnKeyedItem(opName);
+ NCommon::WriteStatistics(writer, totalOnly, {{0, stats}});
+ }
+ writer.OnEndMap();
+ writer.OnEndMap();
return true;
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
index 617a719770..74e69aed08 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
@@ -57,21 +57,23 @@ bool NeedFallback(const TIssues& issues) {
TIssue WrapIssuesOnHybridFallback(TPosition pos, const TIssues& issues) {
TIssue result(pos, "Hybrid execution fallback on YT");
- result.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING);
+ result.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_INFO);
- const std::function<void(TIssue& issue)> toWarning = [&](TIssue& issue) {
- if (issue.Severity == TSeverityIds::S_ERROR || issue.Severity == TSeverityIds::S_FATAL) {
- issue.Severity = TSeverityIds::S_WARNING;
+ const std::function<void(TIssue& issue)> toInfo = [&](TIssue& issue) {
+ if (issue.Severity == TSeverityIds::S_ERROR
+ || issue.Severity == TSeverityIds::S_FATAL
+ || issue.Severity == TSeverityIds::S_WARNING) {
+ issue.Severity = TSeverityIds::S_INFO;
}
for (const auto& subissue : issue.GetSubIssues()) {
- toWarning(*subissue);
+ toInfo(*subissue);
}
};
for (const auto& issue : issues) {
- TIssuePtr warning(new TIssue(issue));
- toWarning(*warning);
- result.AddSubIssue(std::move(warning));
+ TIssuePtr info(new TIssue(issue));
+ toInfo(*info);
+ result.AddSubIssue(std::move(info));
}
return result;
@@ -125,6 +127,11 @@ private:
static TExprNode::TPtr FinalizeOutputOp(const TYtState::TPtr& state, const TString& operationHash,
const IYtGateway::TRunResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, bool markFinished)
{
+ if (markFinished && !TYtDqProcessWrite::Match(input.Get())) {
+ with_lock(state->StatisticsMutex) {
+ state->HybridStatistics[input->Content()].Entries.emplace_back(TString{"YtExecution"}, 0, 0, 0, 0, 1);
+ }
+ }
auto outSection = TYtOutputOpBase(input).Output();
YQL_ENSURE(outSection.Size() == res.OutTableStats.size(), "Invalid output table count in IYtGateway::TRunResult");
TExprNode::TListType newOutTables;
@@ -286,14 +293,21 @@ private:
State_->Statistics[Max<ui32>()].Entries.emplace_back(TString{name}, 0, 0, 0, 0, 1);
}
};
+ auto hybridStatWriter = [this](TStringBuf statName, TStringBuf opName) {
+ with_lock(State_->StatisticsMutex) {
+ State_->HybridStatistics[opName].Entries.emplace_back(TString{statName}, 0, 0, 0, 0, 1);
+ }
+ };
switch (input->Head().GetState()) {
case TExprNode::EState::ExecutionComplete:
statWriter("HybridExecution");
+ hybridStatWriter("Execution", input->TailPtr()->Content());
output = input->HeadPtr();
break;
case TExprNode::EState::Error: {
statWriter("HybridFallback");
+ hybridStatWriter("Fallback", input->TailPtr()->Content());
if (State_->Configuration->HybridDqExecutionFallback.Get().GetOrElse(true)) {
output = input->TailPtr();
} else {
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
index 5ca774525f..7f79548d85 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
@@ -275,10 +275,12 @@ private:
if (const auto stat = CanReadHybrid(sort.Input().Item(0))) {
if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) {
YQL_CLOG(INFO, ProviderYt) << "Sort on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks.";
- PushStat("Hybrid_Sort_try");
+ PushStat("HybridTry");
+ PushHybridStat("Try", node.Raw()->Content());
return MakeYtSortByDq(sort, ctx);
}
- PushStat("Hybrid_Sort_over_limits");
+ PushStat("HybridSkipOverLimits");
+ PushHybridStat("SkipOverLimits", node.Raw()->Content());
}
}
}
@@ -294,10 +296,12 @@ private:
if (const auto stat = CanReadHybrid(merge.Input().Item(0))) {
if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) {
YQL_CLOG(INFO, ProviderYt) << "Merge on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks.";
- PushStat("Hybrid_Merge_try");
+ PushStat("HybridTry");
+ PushHybridStat("Try", node.Raw()->Content());
return MakeYtSortByDq(merge, ctx);
}
- PushStat("Hybrid_Merge_over_limits");
+ PushStat("HybridSkipOverLimits");
+ PushHybridStat("SkipOverLimits", node.Raw()->Content());
}
}
}
@@ -347,7 +351,8 @@ private:
if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) {
if (CanExecuteInHybrid(map.Mapper().Ptr(), chunksLimit, sizeLimit)) {
YQL_CLOG(INFO, ProviderYt) << "Map on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks.";
- PushStat("Hybrid_Map_try");
+ PushStat("HybridTry");
+ PushHybridStat("Try", node.Raw()->Content());
TSyncMap syncList;
const auto& paths = map.Input().Item(0).Paths();
for (auto i = 0U; i < paths.Size(); ++i) {
@@ -430,7 +435,8 @@ private:
.Done();
}
}
- PushStat("Hybrid_Map_over_limits");
+ PushStat("HybridOverLimits");
+ PushHybridStat("SkipOverLimits", node.Raw()->Content());
}
}
@@ -626,12 +632,14 @@ private:
if (CanExecuteInHybrid(reduce.Reducer().Ptr(), chunksLimit, sizeLimit)) {
if (ETypeAnnotationKind::Struct == GetSeqItemType(*reduce.Reducer().Args().Arg(0).Ref().GetTypeAnn()).GetKind()) {
YQL_CLOG(INFO, ProviderYt) << "Reduce on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks.";
- PushStat("Hybrid_Reduce_try");
+ PushStat("HybridTry");
+ PushHybridStat("Try", node.Raw()->Content());
return MakeYtReduceByDq(reduce, ctx);
}
}
}
- PushStat("Hybrid_Reduce_over_limits");
+ PushStat("HybridSkipOverLimits");
+ PushHybridStat("SkipOverLimits", node.Raw()->Content());
}
}
@@ -647,12 +655,14 @@ private:
if (CanExecuteInHybrid(mapReduce.Reducer().Ptr(), chunksLimit, sizeLimit) && CanExecuteInHybrid(mapReduce.Mapper().Ptr(), chunksLimit, sizeLimit)) {
if (ETypeAnnotationKind::Struct == GetSeqItemType(*mapReduce.Reducer().Args().Arg(0).Ref().GetTypeAnn()).GetKind()) {
YQL_CLOG(INFO, ProviderYt) << "MapReduce on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks.";
- PushStat("Hybrid_MapReduce_try");
+ PushHybridStat("Try", node.Raw()->Content());
+ PushStat("HybridTry");
return MakeYtReduceByDq(mapReduce, ctx);
}
}
}
- PushStat("Hybrid_MapReduce_over_limits");
+ PushHybridStat("SkipOverLimits", node.Raw()->Content());
+ PushStat("HybridOverLimits");
}
}
@@ -665,6 +675,12 @@ private:
}
};
+ void PushHybridStat(TStringBuf statName, TStringBuf opName) const {
+ with_lock(State_->StatisticsMutex) {
+ State_->HybridStatistics[opName].Entries.emplace_back(TString{statName}, 0, 0, 0, 0, 1);
+ }
+ };
+
const TYtState::TPtr State_;
const THolder<IGraphTransformer> Finalizer_;
};
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider.h b/ydb/library/yql/providers/yt/provider/yql_yt_provider.h
index 90ad1d58fa..06d9d8f747 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_provider.h
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider.h
@@ -96,6 +96,7 @@ struct TYtState : public TThrRefBase {
THashMap<std::pair<TString, TString>, TString> AnonymousLabels; // cluster + label -> name
std::unordered_map<ui64, TString> NodeHash; // unique id -> hash
THashMap<ui32, TOperationStatistics> Statistics; // public id -> stat
+ THashMap<TString, TOperationStatistics> HybridStatistics; // operation name -> stat
TMutex StatisticsMutex;
THashSet<std::pair<TString, TString>> Checkpoints; // Set of checkpoint tables
THolder<IDqIntegration> DqIntegration_;