aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-10-04 22:46:37 +0300
committerhor911 <hor911@ydb.tech>2022-10-04 22:46:37 +0300
commitee9ef0e95c1ddf52aad66fee7a0f64b8d49b1300 (patch)
tree9f8c529d6efeb6c21cceaca473c71d216cc8c172
parent2c64e058a844f9cf03275337e962759d9dd52367 (diff)
downloadydb-ee9ef0e95c1ddf52aad66fee7a0f64b8d49b1300.tar.gz
Update statistics before finish
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp2
-rw-r--r--ydb/library/yql/dq/actors/task_runner/events.h1
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp12
3 files changed, 15 insertions, 0 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 1113dd051fd..4ae2f85225e 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -796,6 +796,8 @@ private:
}
void AskContinueRun(std::unique_ptr<NTaskRunnerActor::TEvContinueRun> continueRunEvent) {
+ continueRunEvent->SinkIds = GetIds(SinksMap);
+
if (!UseCpuQuota()) {
Send(TaskRunnerActorId, continueRunEvent.release());
return;
diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h
index 8fbb410b1ec..bea7d5becf2 100644
--- a/ydb/library/yql/dq/actors/task_runner/events.h
+++ b/ydb/library/yql/dq/actors/task_runner/events.h
@@ -334,6 +334,7 @@ struct TEvContinueRun
TMaybe<TWatermarkRequest> WatermarkRequest = Nothing();
TMaybe<TCheckpointRequest> CheckpointRequest = Nothing();
bool CheckpointOnly = false;
+ TVector<ui32> SinkIds;
};
struct TEvAsyncInputPushFinished
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index 8f235feabf7..123bc2783ae 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -209,6 +209,18 @@ private:
MemoryQuota->TryShrinkMemory(guard.GetMutex());
}
+ {
+ auto st = MakeHolder<TEvStatistics>(std::move(ev->Get()->SinkIds));
+
+ TaskRunner->UpdateStats();
+ THashMap<ui32, const TDqAsyncOutputBufferStats*> sinkStats;
+ for (const auto sinkId : st->SinkIds) {
+ sinkStats[sinkId] = TaskRunner->GetSink(sinkId)->GetStats();
+ }
+ st->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinkStats));
+ Send(ev->Sender, st.Release());
+ }
+
Send(
ev->Sender,
new TEvTaskRunFinished(