aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwhcrc <whcrc@ydb.tech>2022-11-03 17:57:35 +0300
committerwhcrc <whcrc@ydb.tech>2022-11-03 17:57:35 +0300
commit9af9d54be754d0ad4e4de11f5c5aeccd3274c92b (patch)
tree94bef3b683c0d7232b6ffc172115fac6d6734976
parentc30a953bfdd09f1ef4ef16997dab04c664a182b5 (diff)
downloadydb-9af9d54be754d0ad4e4de11f5c5aeccd3274c92b.tar.gz
, stats aggregation in async CA, fixes [ProcessInit, MakeDqTaskRunner] counters
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp24
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h3
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h18
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp1
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_stats.proto7
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h20
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.cpp3
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller.cpp29
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp6
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp18
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h1
11 files changed, 82 insertions, 48 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 4ae2f85225e..2c2dda745ed 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -39,7 +39,8 @@ public:
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
const ::NMonitoring::TDynamicCounterPtr& taskCounters,
- const TActorId& quoterServiceActorId)
+ const TActorId& quoterServiceActorId,
+ bool ownCounters)
: TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false, false, taskCounters)
, TaskRunnerActorFactory(taskRunnerActorFactory)
, ReadyToCheckpointFlag(false)
@@ -47,6 +48,10 @@ public:
, QuoterServiceActorId(quoterServiceActorId)
{
InitExtraMonCounters(taskCounters);
+ if (ownCounters) {
+ CA_LOG_D("TDqAsyncComputeActor, make stat");
+ Stat = MakeHolder<NYql::TCounters>();
+ }
}
void DoBootstrap() {
@@ -429,7 +434,9 @@ private:
const auto& taskParams = ev->Get()->TaskParams;
const auto& typeEnv = ev->Get()->TypeEnv;
const auto& holderFactory = ev->Get()->HolderFactory;
-
+ if (Stat) {
+ Stat->AddCounters2(ev->Get()->Sensors);
+ }
TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv);
FillIoMaps(holderFactory, typeEnv, secureParams, taskParams);
@@ -457,6 +464,9 @@ private:
}
void OnRunFinished(NTaskRunnerActor::TEvTaskRunFinished::TPtr& ev, const NActors::TActorContext& ) {
+ if (Stat) {
+ Stat->AddCounters2(ev->Get()->Sensors);
+ }
ContinueRunInflight = false;
TrySendAsyncChannelsData(); // send from previous cycle
@@ -533,10 +543,9 @@ private:
}
void OnPopFinished(NTaskRunnerActor::TEvChannelPopFinished::TPtr& ev, const NActors::TActorContext&) {
- if (ev->Get()->Stats) {
- TaskRunnerStats = std::move(ev->Get()->Stats);
+ if (Stat) {
+ Stat->AddCounters2(ev->Get()->Sensors);
}
- CA_LOG_T("OnPopFinished, stats: " << *TaskRunnerStats.Get());
auto it = OutputChannelsMap.find(ev->Get()->ChannelId);
Y_VERIFY(it != OutputChannelsMap.end());
TOutputChannelInfo& outputChannel = it->second;
@@ -886,10 +895,11 @@ IActor* CreateDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
::NMonitoring::TDynamicCounterPtr taskCounters,
- const TActorId& quoterServiceActorId)
+ const TActorId& quoterServiceActorId,
+ bool ownCounters)
{
return new TDqAsyncComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory),
- functionRegistry, settings, memoryLimits, taskRunnerActorFactory, taskCounters, quoterServiceActorId);
+ functionRegistry, settings, memoryLimits, taskRunnerActorFactory, taskCounters, quoterServiceActorId, ownCounters);
}
} // namespace NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
index 16de5b51b13..2cc261b8657 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
@@ -23,7 +23,8 @@ NActors::IActor* CreateDqAsyncComputeActor(const NActors::TActorId& executerId,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
::NMonitoring::TDynamicCounterPtr taskCounters = nullptr,
- const NActors::TActorId& quoterServiceActorId = {});
+ const NActors::TActorId& quoterServiceActorId = {},
+ bool ownCounters = false);
} // namespace NDq
} // namespace NYql
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index eb2887d1604..7e583a8ed87 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -13,6 +13,7 @@
#include <ydb/core/base/wilson.h>
#include <ydb/core/protos/services.pb.h>
+#include <ydb/library/yql/providers/dq/counters/counters.h>
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/dq/common/dq_common.h>
#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
@@ -1853,7 +1854,21 @@ public:
dst->SetMkqlExtraMemoryRequests(GetProfileStats()->MkqlExtraMemoryRequests);
}
- if (auto* taskStats = GetTaskRunnerStats()) {
+ if (Stat) { // for task_runner_actor
+ Y_VERIFY(!dst->HasExtra());
+ NDqProto::TExtraStats extraStats;
+ for (const auto& [name, entry]: Stat->Get()) {
+ NDqProto::TDqStatsAggr metric;
+ metric.SetSum(entry.Sum);
+ metric.SetMax(entry.Max);
+ metric.SetMin(entry.Min);
+ //metric.SetAvg(entry.Avg);
+ metric.SetCnt(entry.Count);
+ (*extraStats.MutableStats())[name] = metric;
+ }
+ dst->MutableExtra()->PackFrom(extraStats);
+ Stat->Clear();
+ } else if (auto* taskStats = GetTaskRunnerStats()) { // for task_runner_actor_local
auto* protoTask = dst->AddTasks();
FillTaskRunnerStats(Task.GetId(), Task.GetStageId(), *taskStats, protoTask, (bool) GetProfileStats());
@@ -2011,6 +2026,7 @@ protected:
bool MonCountersProvided = false;
::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryUsage;
::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryLimit;
+ THolder<NYql::TCounters> Stat;
};
} // namespace NYql
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp
index f6bcd7a92f8..4de25a1f9a0 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp
@@ -12,7 +12,6 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase&
protoTask->SetStageId(stageId);
protoTask->SetCpuTimeUs(taskStats.ComputeCpuTime.MicroSeconds() + taskStats.BuildCpuTime.MicroSeconds());
protoTask->SetFinishTimeMs(taskStats.FinishTs.MilliSeconds());
- protoTask->SetProcessInit(taskStats.ProcessInit.MilliSeconds());
// Cerr << (TStringBuilder() << "FillTaskRunnerStats: " << taskStats.ComputeCpuTime << ", " << taskStats.BuildCpuTime << Endl);
if (Y_UNLIKELY(withProfileStats)) {
diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto
index 280bc90a935..1ddaaabfbac 100644
--- a/ydb/library/yql/dq/actors/protos/dq_stats.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto
@@ -107,6 +107,8 @@ message TDqMkqlStat {
}
message TDqTaskStats {
+ reserved 154;
+
// basic stats
uint64 TaskId = 1;
uint32 StageId = 2;
@@ -140,7 +142,6 @@ message TDqTaskStats {
repeated TDqInputChannelStats InputChannels = 151;
repeated TDqAsyncOutputBufferStats Sinks = 152;
repeated TDqOutputChannelStats OutputChannels = 153;
- uint64 ProcessInit = 154;
google.protobuf.Any Extra = 200;
}
@@ -166,6 +167,10 @@ message TDqStatsAggr {
uint64 Cnt = 4;
}
+message TExtraStats {
+ map<string, TDqStatsAggr> Stats = 1;
+}
+
message TDqStatsMinMax {
uint64 Min = 1;
uint64 Max = 2;
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index 24b9ad5c47c..433f46edb08 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -75,7 +75,6 @@ struct TTaskRunnerStatsBase {
TDuration ComputeCpuTime;
TRunStatusTimeMetrics RunStatusTimeMetrics; // ComputeCpuTime + RunStatusTimeMetrics == 100% time
- TDuration ProcessInit;
// profile stats
TDuration WaitTime; // wall time of waiting for input, scans & output
@@ -184,24 +183,13 @@ class TDqTaskRunnerStatsView {
public:
TDqTaskRunnerStatsView() : IsDefined(false) {}
- TDqTaskRunnerStatsView(TDqTaskRunnerStatsInplace&& stats) // used in TTaskRunnerActor, cause it constructs stats on-the-fly, and cannot own it due to threaded implementation
- : StatsInplace(std::move(stats))
- , StatsPtr(nullptr)
- , IsInplace(true)
- , IsDefined(true) {
- }
-
TDqTaskRunnerStatsView(const TDqTaskRunnerStats* stats) // used in TLocalTaskRunnerActor, cause it holds this stats, and does not modify it asyncronously from TDqAsyncComputeActor
- : StatsInplace()
- , StatsPtr(stats)
- , IsInplace(false)
+ : StatsPtr(stats)
, IsDefined(true) {
}
TDqTaskRunnerStatsView(const TDqTaskRunnerStats* stats, THashMap<ui32, const TDqAsyncOutputBufferStats*>&& sinkStats)
- : StatsInplace()
- , StatsPtr(stats)
- , IsInplace(false)
+ : StatsPtr(stats)
, IsDefined(true)
, SinkStats(std::move(sinkStats)) {
}
@@ -210,7 +198,7 @@ public:
if (!IsDefined) {
return nullptr;
}
- return IsInplace ? static_cast<const TTaskRunnerStatsBase*>(&StatsInplace) : StatsPtr;
+ return StatsPtr;
}
operator bool() const {
@@ -222,9 +210,7 @@ public:
}
private:
- TDqTaskRunnerStatsInplace StatsInplace;
const TDqTaskRunnerStats* StatsPtr;
- bool IsInplace;
bool IsDefined;
THashMap<ui32, const TDqAsyncOutputBufferStats*> SinkStats;
};
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
index 64faded43b2..0508c9a964c 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
@@ -83,7 +83,8 @@ IActor* CreateComputeActor(
memoryLimits,
taskRunnerActorFactory,
taskCounters,
- options.QuoterServiceActorId);
+ options.QuoterServiceActorId,
+ options.ComputeActorOwnsCounters);
}
}
diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp
index 7d3d874e69c..fc431d99cf2 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller.cpp
+++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp
@@ -130,7 +130,11 @@ private:
<< " PingCookie: " << ev->Cookie
<< " StatusCode: " << NYql::NDqProto::StatusIds_StatusCode_Name(state.GetStatusCode());
- if (state.HasStats() && state.GetStats().GetTasks().size()) {
+ if (state.HasStats() && TryAddStatsFromExtra(state.GetStats())) {
+ if (ServiceCounters.Counters && !AggrPeriod) {
+ ExportStats(TaskStat, taskId);
+ }
+ } else if (state.HasStats() && state.GetStats().GetTasks().size()) {
YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " AddStats " << taskId;
AddStats(state.GetStats());
if (ServiceCounters.Counters && !AggrPeriod) {
@@ -138,6 +142,7 @@ private:
}
}
+
TIssues localIssues;
// TODO: don't convert issues to string
NYql::IssuesFromMessage(state.GetIssues(), localIssues);
@@ -278,6 +283,24 @@ private:
}
}
+ bool TryAddStatsFromExtra(const NDqProto::TDqComputeActorStats& x) {
+ NDqProto::TExtraStats extraStats;
+ if (x.HasExtra() && x.GetExtra().UnpackTo(&extraStats)) {
+ YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " AddStats from extra";
+ for (const auto& [name, m] : extraStats.GetStats()) {
+ NYql::TCounters::TEntry value;
+ value.Sum = m.GetSum();
+ value.Max = m.GetMax();
+ value.Min = m.GetMin();
+ //value.Avg = m.GetAvg();
+ value.Count = m.GetCnt();
+ TaskStat.AddCounter(name, value);
+ }
+ return true;
+ }
+ return false;
+ }
+
void AddStats(const NDqProto::TDqComputeActorStats& x) {
YQL_ENSURE(x.GetTasks().size() == 1);
auto& s = x.GetTasks(0);
@@ -303,10 +326,6 @@ private:
ADD_COUNTER(InputBytes)
ADD_COUNTER(OutputRows)
ADD_COUNTER(OutputBytes)
- if (stats.GetProcessInit()) {
- const auto val = static_cast<i64>(stats.GetProcessInit());
- TaskStat.AddCounter(TaskStat.GetCounterName("TaskRunner", labels, "ProcessInit"), NYql::TCounters::TEntry{val, val, val, val, val});
- }
// profile stats
ADD_COUNTER(BuildCpuTimeUs)
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index e7b9a2c6cb9..9d5c1bf3633 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -129,6 +129,12 @@ public:
*Runner->GetStats(),
CurrentStats,
Runner->GetTaskId());
+ for (const auto& [inputId, _]: CurrentInputChannelsStats) {
+ UpdateInputChannelStats(inputId);
+ }
+ for (const auto& [outputId, _]: CurrentOutputChannelsStats) {
+ UpdateOutputChannelStats(outputId);
+ }
}
void UpdateOutputChannelStats(ui64 channelId)
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index b8f8741e8eb..7317ee4abcb 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -293,7 +293,7 @@ private:
auto cookie = ev->Cookie;
auto wasFinished = ev->Get()->WasFinished;
auto toPop = ev->Get()->Size;
- Invoker->Invoke([cookie,selfId,channelId=ev->Get()->ChannelId, actorSystem, replyTo, wasFinished, toPop, taskRunner=TaskRunner, settings=Settings, stageId=StageId, processInit=this->ProcessInit]() {
+ Invoker->Invoke([cookie,selfId,channelId=ev->Get()->ChannelId, actorSystem, replyTo, wasFinished, toPop, taskRunner=TaskRunner, settings=Settings, stageId=StageId]() {
try {
// auto guard = taskRunner->BindAllocator(); // only for local mode
auto channel = taskRunner->GetOutputChannel(channelId);
@@ -311,11 +311,10 @@ private:
}
TVector<NDqProto::TData> chunks;
- NDqProto::TPopResponse lastPop;
NDqProto::TPopResponse response;
for (;maxChunks && remain > 0 && !isFinished && hasData; maxChunks--, remain -= dataSize) {
NDqProto::TData data;
- lastPop = std::move(channel->Pop(data, remain));
+ const auto lastPop = std::move(channel->Pop(data, remain));
for (auto& metric : lastPop.GetMetric()) {
*response.AddMetric() = metric;
@@ -332,12 +331,6 @@ private:
}
}
- TDqTaskRunnerStatsInplace stats;
- NDqProto::TGetStatsResponse pbStats;
- lastPop.GetStats().UnpackTo(&pbStats);
- stats.FromProto(pbStats.GetStats());
- stats.ProcessInit = processInit;
-
actorSystem->Send(
new IEventHandle(
replyTo,
@@ -349,8 +342,7 @@ private:
Nothing(),
isFinished,
changed,
- GetSensors(response),
- TDqTaskRunnerStatsView(std::move(stats))),
+ GetSensors(response)),
/*flags=*/0,
cookie));
} catch (...) {
@@ -470,7 +462,7 @@ private:
Settings->FreezeDefaults();
StageId = taskMeta.GetStageId();
}
- Invoker->Invoke([taskRunner=TaskRunner, replyTo, selfId, cookie, actorSystem, settings=Settings, stageId=StageId, startTime, clusterName = ClusterName,this_=this](){
+ Invoker->Invoke([taskRunner=TaskRunner, replyTo, selfId, cookie, actorSystem, settings=Settings, stageId=StageId, startTime, clusterName = ClusterName](){
try {
//auto guard = taskRunner->BindAllocator(); // only for local mode
auto result = taskRunner->Prepare();
@@ -481,7 +473,6 @@ private:
"ProcessInit");
i64 val = (TInstant::Now()-startTime).MilliSeconds();
sensors.push_back({sensorName, val, val, val, val, 1});
- this_->ProcessInit = TDuration::MilliSeconds(val);
auto event = MakeHolder<TEvTaskRunnerCreateFinished>(
taskRunner->GetSecureParams(),
@@ -581,7 +572,6 @@ private:
ui64 StageId;
TWorkerRuntimeData* RuntimeData;
TString ClusterName;
- TDuration ProcessInit = TDuration::Zero();
};
class TTaskRunnerActorFactory: public ITaskRunnerActorFactory {
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
index f49e2c01ac5..7aacd11b261 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
@@ -38,6 +38,7 @@ namespace NYql::NDqs {
bool CanUseComputeActor = true;
NActors::TActorId QuoterServiceActorId;
+ bool ComputeActorOwnsCounters = false;
};
NActors::IActor* CreateLocalWorkerManager(const TLocalWorkerManagerOptions& options);