diff options
author | hor911 <hor911@ydb.tech> | 2023-01-31 00:08:46 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-01-31 00:08:46 +0300 |
commit | f2503a071d7eef6f5247897c9d081db479bf4d54 (patch) | |
tree | 287adaaacd7f043a18e52c607ee36cc21d70b107 | |
parent | 6e0c0bbc2c14b56205ce2293a951c7edd06d7587 (diff) | |
download | ydb-f2503a071d7eef6f5247897c9d081db479bf4d54.tar.gz |
In progress stat
-rw-r--r-- | ydb/core/yq/libs/actors/run_actor.cpp | 56 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/actors/task_controller_impl.h | 18 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/api/protos/dqs.proto | 1 |
3 files changed, 63 insertions, 12 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index daa370d952d..f2ec084fe55 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -265,6 +265,7 @@ struct TEvaluationGraphInfo { NActors::TActorId ControlId; NActors::TActorId ResultId; NThreading::TPromise<NYql::IDqGateway::TResult> Result; + ui64 Index; }; class TRunActor : public NActors::TActorBootstrapped<TRunActor> { @@ -812,9 +813,32 @@ private: } void Handle(TEvDqStats::TPtr& ev) { - if (ev->Get()->Record.issues_size()) { + auto& proto = ev->Get()->Record; + + TString GraphKey; + auto it = EvalInfos.find(ev->Sender); + if (it != EvalInfos.end()) { + GraphKey = "Precompute=" + ToString(it->second.Index); + } else { + if (ev->Sender != ExecuterId) { + LOG_E("TEvDqStats received from UNKNOWN Actor (TDqExecuter?) " << ev->Sender); + return; + } + GraphKey = "Graph=" + ToString(DqGraphIndex); + } + + if (proto.issues_size() || proto.metric_size()) { Fq::Private::PingTaskRequest request; - *request.mutable_transient_issues() = ev->Get()->Record.issues(); + if (proto.issues_size()) { + *request.mutable_transient_issues() = proto.issues(); + } + if (proto.metric_size()) { + TString statistics; + if (SaveAndPackStatistics(GraphKey, proto.metric(), statistics)) { + QueryStateUpdateRequest.set_statistics(statistics); + request.set_statistics(statistics); + } + } Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0); } } @@ -913,7 +937,7 @@ private: Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0, SetLoadFromCheckpointModeCookie); } - TString BuildNormalizedStatistics(const NDqProto::TQueryResponse& response) { + TString BuildNormalizedStatistics(const ::google::protobuf::RepeatedPtrField<::NYql::NDqProto::TMetric> metrics) { struct TStatisticsNode { std::map<TString, TStatisticsNode> Children; @@ -948,7 +972,7 @@ private: TStringStream out; TStatisticsNode statistics; - for (const auto& metric : response.GetMetric()) { + for (const auto& metric : metrics) { auto longName = metric.GetName(); TString prefix; TString name; @@ -984,10 +1008,10 @@ private: return out.Str(); } - void SaveStatistics(const TString& graphKey, const NYql::NDqProto::TQueryResponse& result) { + bool SaveAndPackStatistics(const TString& graphKey, const ::google::protobuf::RepeatedPtrField<::NYql::NDqProto::TMetric> metrics, TString& statistics) { // Yson routines are very strict, so it's better to try-catch them try { - Statistics.emplace_back(graphKey, BuildNormalizedStatistics(result)); + Statistics[graphKey] = BuildNormalizedStatistics(metrics); TStringStream out; NYson::TYsonWriter writer(&out); writer.OnBeginMap(); @@ -996,9 +1020,11 @@ private: writer.OnRaw(p.second); } writer.OnEndMap(); - QueryStateUpdateRequest.set_statistics(NJson2Yson::ConvertYson2Json(out.Str())); + statistics = NJson2Yson::ConvertYson2Json(out.Str()); + return true; } catch (NYson::TYsonException& ex) { LOG_E(ex.what()); + return false; } } @@ -1030,8 +1056,10 @@ private: QueryStateUpdateRequest.mutable_result_id()->set_value(Params.ResultId); - SaveStatistics("Graph=" + ToString(DqGraphIndex), result); - + TString statistics; + if (SaveAndPackStatistics("Graph=" + ToString(DqGraphIndex), result.metric(), statistics)) { + QueryStateUpdateRequest.set_statistics(statistics); + } KillExecuter(); } @@ -1043,7 +1071,7 @@ private: auto& result = ev->Get()->Record; - LOG_D("Query evaluation " << DqEvalIndex << " response. Issues count: " << result.IssuesSize() + LOG_D("Query evaluation " << it->second.Index << " response. Issues count: " << result.IssuesSize() << ". Rows count: " << result.GetRowsCount()); queryResult.Data = result.yson(); @@ -1061,7 +1089,10 @@ private: queryResult.SetSuccess(); } - SaveStatistics("Precompute=" + ToString(DqEvalIndex++), result); + TString statistics; + if (SaveAndPackStatistics("Precompute=" + ToString(DqGraphIndex), result.metric(), statistics)) { + QueryStateUpdateRequest.set_statistics(statistics); + } queryResult.AddIssues(issues); queryResult.Truncated = result.GetTruncated(); @@ -1252,6 +1283,7 @@ private: TEvaluationGraphInfo info; + info.Index = DqEvalIndex++; info.ExecuterId = Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), false)); if (dqGraphParams.GetResultType()) { @@ -1937,7 +1969,7 @@ private: // Consumers creation TVector<NYql::NPq::NProto::TDqPqTopicSource> TopicsForConsumersCreation; TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> CredentialsForConsumersCreation; - TVector<std::pair<TString, TString>> Statistics; + TMap<TString, TString> Statistics; NActors::TActorId ReadRulesCreatorId; // Rate limiter resource creation diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h index 3d59b9c26d8..2015e4c7186 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller_impl.h +++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h @@ -127,6 +127,20 @@ private: Send(ExecuterId, req.Release()); } + void SendNonFinalStat() { + auto ev = MakeHolder<TEvDqStats>(); + FinalStat().CopyCounters(ev->Record); + Send(ExecuterId, ev.Release()); + } + + void TrySendNonFinalStat() { + auto now = Now(); + if (now - LastStatReport > PingPeriod) { + SendNonFinalStat(); + LastStatReport = now; + } + } + public: void OnComputeActorState(NDq::TEvDqCompute::TEvState::TPtr& ev) { TActorId computeActor = ev->Sender; @@ -143,12 +157,14 @@ public: if (state.HasStats() && TryAddStatsFromExtra(state.GetStats())) { if (ServiceCounters.Counters && !AggrPeriod) { ExportStats(TaskStat, taskId); + TrySendNonFinalStat(); } } else if (state.HasStats() && state.GetStats().GetTasks().size()) { YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " AddStats " << taskId; AddStats(state.GetStats()); if (ServiceCounters.Counters && !AggrPeriod) { ExportStats(TaskStat, taskId); + TrySendNonFinalStat(); } } @@ -212,6 +228,7 @@ public: if (ServiceCounters.Counters) { ExportStats(AggregateQueryStatsByStage(TaskStat, Stages), 0); } + SendNonFinalStat(); Schedule(AggrPeriod, new TEvents::TEvWakeup(AGGR_TIMER_TAG)); } break; @@ -633,6 +650,7 @@ private: TDuration AggrPeriod = TDuration::Zero(); NYql::NDq::GroupedIssues Issues; ui64 PingCookie = 0; + TInstant LastStatReport; }; } /* namespace NYql */ diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto index 783b1b5b624..4279bf529a1 100644 --- a/ydb/library/yql/providers/dq/api/protos/dqs.proto +++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto @@ -190,6 +190,7 @@ message TDqFailure { message TDqStats { repeated Ydb.Issue.IssueMessage Issues = 1; + repeated TMetric Metric = 2; }; message TGraphRequest { |