aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-01-31 00:08:46 +0300
committerhor911 <hor911@ydb.tech>2023-01-31 00:08:46 +0300
commitf2503a071d7eef6f5247897c9d081db479bf4d54 (patch)
tree287adaaacd7f043a18e52c607ee36cc21d70b107
parent6e0c0bbc2c14b56205ce2293a951c7edd06d7587 (diff)
downloadydb-f2503a071d7eef6f5247897c9d081db479bf4d54.tar.gz
In progress stat
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp56
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller_impl.h18
-rw-r--r--ydb/library/yql/providers/dq/api/protos/dqs.proto1
3 files changed, 63 insertions, 12 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index daa370d952d..f2ec084fe55 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -265,6 +265,7 @@ struct TEvaluationGraphInfo {
NActors::TActorId ControlId;
NActors::TActorId ResultId;
NThreading::TPromise<NYql::IDqGateway::TResult> Result;
+ ui64 Index;
};
class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
@@ -812,9 +813,32 @@ private:
}
void Handle(TEvDqStats::TPtr& ev) {
- if (ev->Get()->Record.issues_size()) {
+ auto& proto = ev->Get()->Record;
+
+ TString GraphKey;
+ auto it = EvalInfos.find(ev->Sender);
+ if (it != EvalInfos.end()) {
+ GraphKey = "Precompute=" + ToString(it->second.Index);
+ } else {
+ if (ev->Sender != ExecuterId) {
+ LOG_E("TEvDqStats received from UNKNOWN Actor (TDqExecuter?) " << ev->Sender);
+ return;
+ }
+ GraphKey = "Graph=" + ToString(DqGraphIndex);
+ }
+
+ if (proto.issues_size() || proto.metric_size()) {
Fq::Private::PingTaskRequest request;
- *request.mutable_transient_issues() = ev->Get()->Record.issues();
+ if (proto.issues_size()) {
+ *request.mutable_transient_issues() = proto.issues();
+ }
+ if (proto.metric_size()) {
+ TString statistics;
+ if (SaveAndPackStatistics(GraphKey, proto.metric(), statistics)) {
+ QueryStateUpdateRequest.set_statistics(statistics);
+ request.set_statistics(statistics);
+ }
+ }
Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0);
}
}
@@ -913,7 +937,7 @@ private:
Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0, SetLoadFromCheckpointModeCookie);
}
- TString BuildNormalizedStatistics(const NDqProto::TQueryResponse& response) {
+ TString BuildNormalizedStatistics(const ::google::protobuf::RepeatedPtrField<::NYql::NDqProto::TMetric> metrics) {
struct TStatisticsNode {
std::map<TString, TStatisticsNode> Children;
@@ -948,7 +972,7 @@ private:
TStringStream out;
TStatisticsNode statistics;
- for (const auto& metric : response.GetMetric()) {
+ for (const auto& metric : metrics) {
auto longName = metric.GetName();
TString prefix;
TString name;
@@ -984,10 +1008,10 @@ private:
return out.Str();
}
- void SaveStatistics(const TString& graphKey, const NYql::NDqProto::TQueryResponse& result) {
+ bool SaveAndPackStatistics(const TString& graphKey, const ::google::protobuf::RepeatedPtrField<::NYql::NDqProto::TMetric> metrics, TString& statistics) {
// Yson routines are very strict, so it's better to try-catch them
try {
- Statistics.emplace_back(graphKey, BuildNormalizedStatistics(result));
+ Statistics[graphKey] = BuildNormalizedStatistics(metrics);
TStringStream out;
NYson::TYsonWriter writer(&out);
writer.OnBeginMap();
@@ -996,9 +1020,11 @@ private:
writer.OnRaw(p.second);
}
writer.OnEndMap();
- QueryStateUpdateRequest.set_statistics(NJson2Yson::ConvertYson2Json(out.Str()));
+ statistics = NJson2Yson::ConvertYson2Json(out.Str());
+ return true;
} catch (NYson::TYsonException& ex) {
LOG_E(ex.what());
+ return false;
}
}
@@ -1030,8 +1056,10 @@ private:
QueryStateUpdateRequest.mutable_result_id()->set_value(Params.ResultId);
- SaveStatistics("Graph=" + ToString(DqGraphIndex), result);
-
+ TString statistics;
+ if (SaveAndPackStatistics("Graph=" + ToString(DqGraphIndex), result.metric(), statistics)) {
+ QueryStateUpdateRequest.set_statistics(statistics);
+ }
KillExecuter();
}
@@ -1043,7 +1071,7 @@ private:
auto& result = ev->Get()->Record;
- LOG_D("Query evaluation " << DqEvalIndex << " response. Issues count: " << result.IssuesSize()
+ LOG_D("Query evaluation " << it->second.Index << " response. Issues count: " << result.IssuesSize()
<< ". Rows count: " << result.GetRowsCount());
queryResult.Data = result.yson();
@@ -1061,7 +1089,10 @@ private:
queryResult.SetSuccess();
}
- SaveStatistics("Precompute=" + ToString(DqEvalIndex++), result);
+ TString statistics;
+ if (SaveAndPackStatistics("Precompute=" + ToString(DqGraphIndex), result.metric(), statistics)) {
+ QueryStateUpdateRequest.set_statistics(statistics);
+ }
queryResult.AddIssues(issues);
queryResult.Truncated = result.GetTruncated();
@@ -1252,6 +1283,7 @@ private:
TEvaluationGraphInfo info;
+ info.Index = DqEvalIndex++;
info.ExecuterId = Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), false));
if (dqGraphParams.GetResultType()) {
@@ -1937,7 +1969,7 @@ private:
// Consumers creation
TVector<NYql::NPq::NProto::TDqPqTopicSource> TopicsForConsumersCreation;
TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> CredentialsForConsumersCreation;
- TVector<std::pair<TString, TString>> Statistics;
+ TMap<TString, TString> Statistics;
NActors::TActorId ReadRulesCreatorId;
// Rate limiter resource creation
diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h
index 3d59b9c26d8..2015e4c7186 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller_impl.h
+++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h
@@ -127,6 +127,20 @@ private:
Send(ExecuterId, req.Release());
}
+ void SendNonFinalStat() {
+ auto ev = MakeHolder<TEvDqStats>();
+ FinalStat().CopyCounters(ev->Record);
+ Send(ExecuterId, ev.Release());
+ }
+
+ void TrySendNonFinalStat() {
+ auto now = Now();
+ if (now - LastStatReport > PingPeriod) {
+ SendNonFinalStat();
+ LastStatReport = now;
+ }
+ }
+
public:
void OnComputeActorState(NDq::TEvDqCompute::TEvState::TPtr& ev) {
TActorId computeActor = ev->Sender;
@@ -143,12 +157,14 @@ public:
if (state.HasStats() && TryAddStatsFromExtra(state.GetStats())) {
if (ServiceCounters.Counters && !AggrPeriod) {
ExportStats(TaskStat, taskId);
+ TrySendNonFinalStat();
}
} else if (state.HasStats() && state.GetStats().GetTasks().size()) {
YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " AddStats " << taskId;
AddStats(state.GetStats());
if (ServiceCounters.Counters && !AggrPeriod) {
ExportStats(TaskStat, taskId);
+ TrySendNonFinalStat();
}
}
@@ -212,6 +228,7 @@ public:
if (ServiceCounters.Counters) {
ExportStats(AggregateQueryStatsByStage(TaskStat, Stages), 0);
}
+ SendNonFinalStat();
Schedule(AggrPeriod, new TEvents::TEvWakeup(AGGR_TIMER_TAG));
}
break;
@@ -633,6 +650,7 @@ private:
TDuration AggrPeriod = TDuration::Zero();
NYql::NDq::GroupedIssues Issues;
ui64 PingCookie = 0;
+ TInstant LastStatReport;
};
} /* namespace NYql */
diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto
index 783b1b5b624..4279bf529a1 100644
--- a/ydb/library/yql/providers/dq/api/protos/dqs.proto
+++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto
@@ -190,6 +190,7 @@ message TDqFailure {
message TDqStats {
repeated Ydb.Issue.IssueMessage Issues = 1;
+ repeated TMetric Metric = 2;
};
message TGraphRequest {