diff options
author | Олег <150132506+iddqdex@users.noreply.github.com> | 2025-03-11 22:38:37 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-11 22:38:37 +0300 |
commit | 76f835326e78bde9852d70dba2970958a7c70833 (patch) | |
tree | 6d7834ce232670cdd945ccba4f67c9dfe613543b | |
parent | fd69f43d3ff7014f3a8b13bcad0a020d74adc591 (diff) | |
download | ydb-76f835326e78bde9852d70dba2970958a7c70833.tar.gz |
Revert "Column Shard Bytes Time Series (#15423)" (#15603)
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 36 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_stats.cpp | 100 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_stats.h | 38 |
3 files changed, 26 insertions, 148 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index eccc23a806..5081e57b79 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -80,9 +80,7 @@ void TKqpScanComputeActor::AcquireRateQuota() { } void TKqpScanComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, bool last) { - Y_UNUSED(last); - - if (ScanData && dst->TasksSize() > 0) { + if (last && ScanData && dst->TasksSize() > 0) { YQL_ENSURE(dst->TasksSize() == 1); auto* taskStats = dst->MutableTasks(0); @@ -98,25 +96,23 @@ void TKqpScanComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, b tableStats->SetTablePath(ScanData->TablePath); if (auto* stats = ScanData->BasicStats.get()) { - if (RuntimeSettings.StatsMode >= NYql::NDqProto::DQ_STATS_MODE_FULL) { - ingressStats.SetRows(stats->Rows); - ingressStats.SetBytes(stats->Bytes); - ingressStats.SetFirstMessageMs(stats->FirstMessageMs); - ingressStats.SetLastMessageMs(stats->LastMessageMs); - - for (auto& [shardId, stat] : stats->ExternalStats) { - auto& externalStat = *sourceStats.AddExternalPartitions(); - externalStat.SetPartitionId(ToString(shardId)); - externalStat.SetExternalRows(stat.ExternalRows); - externalStat.SetExternalBytes(stat.ExternalBytes); - externalStat.SetFirstMessageMs(stat.FirstMessageMs); - externalStat.SetLastMessageMs(stat.LastMessageMs); - } - - taskStats->SetIngressRows(taskStats->GetIngressRows() + stats->Rows); - taskStats->SetIngressBytes(taskStats->GetIngressBytes() + stats->Bytes); + ingressStats.SetRows(stats->Rows); + ingressStats.SetBytes(stats->Bytes); + ingressStats.SetFirstMessageMs(stats->FirstMessageMs); + ingressStats.SetLastMessageMs(stats->LastMessageMs); + + for (auto& [shardId, stat] : stats->ExternalStats) { + auto& externalStat = *sourceStats.AddExternalPartitions(); + externalStat.SetPartitionId(ToString(shardId)); + externalStat.SetExternalRows(stat.ExternalRows); + externalStat.SetExternalBytes(stat.ExternalBytes); + externalStat.SetFirstMessageMs(stat.FirstMessageMs); + externalStat.SetLastMessageMs(stat.LastMessageMs); } + taskStats->SetIngressRows(taskStats->GetIngressRows() + stats->Rows); + taskStats->SetIngressBytes(taskStats->GetIngressBytes() + stats->Bytes); + tableStats->SetReadRows(stats->Rows); tableStats->SetReadBytes(stats->Bytes); tableStats->SetAffectedPartitions(stats->AffectedShards); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp index 4af9d7db47..3cf3dfbf60 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp @@ -12,12 +12,8 @@ ui64 NonZeroMin(ui64 a, ui64 b) { return (b == 0) ? a : ((a == 0 || a > b) ? b : a); } -void TTimeSeriesStats::ExportAggStats(NYql::NDqProto::TDqStatsAggr& stats) { - NKikimr::NKqp::ExportAggStats(Values, stats); -} - void TTimeSeriesStats::ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats) { - ExportAggStats(stats); + NKikimr::NKqp::ExportAggStats(Values, stats); ExportHistory(baseTimeMs, stats); } @@ -32,20 +28,16 @@ void TTimeSeriesStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAg } } -void TTimeSeriesStats::Resize(ui32 count) { - Values.resize(count); +void TTimeSeriesStats::Resize(ui32 taskCount) { + Values.resize(taskCount); } -void TTimeSeriesStats::SetNonZero(ui32 index, ui64 value) { +void TTimeSeriesStats::SetNonZero(ui32 taskIndex, ui64 value) { if (value) { Sum += value; - Sum -= Values[index]; - Values[index] = value; + Sum -= Values[taskIndex]; + Values[taskIndex] = value; } - AppendHistory(); -} - -void TTimeSeriesStats::AppendHistory() { if (HistorySampleCount) { auto nowMs = Now().MilliSeconds(); @@ -105,62 +97,6 @@ void TTimeSeriesStats::Pack() { } } -void TPartitionedStats::ResizeByTasks(ui32 taskCount) { - for (auto& p : Parts) { - p.resize(taskCount); - } -} - -void TPartitionedStats::ResizeByParts(ui32 partCount, ui32 taskCount) { - auto oldPartCount = Parts.size(); - Parts.resize(partCount); - for(auto i = oldPartCount; i < partCount; i++) { - Parts[i].resize(taskCount); - } - Resize(partCount); -} - -void TPartitionedStats::SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries) { - auto& part = Parts[partIndex]; - auto delta = value - part[taskIndex]; - part[taskIndex] = value; - Values[partIndex] += delta; - Sum += delta; - if (recordTimeSeries) { - AppendHistory(); - } -} - -void TTimeMultiSeriesStats::SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries) { - auto [it, inserted] = Indices.try_emplace(key); - if (inserted) { - it->second = Indices.size() - 1; - if (PartCount < Indices.size()) { - PartCount += 4; - } - } - if (stats.Parts.size() < PartCount) { - stats.ResizeByParts(PartCount, TaskCount); - } - stats.SetNonZero(taskIndex, it->second, value, recordTimeSeries); -} - -void TExternalStats::Resize(ui32 taskCount) { - ExternalRows.ResizeByTasks(taskCount); - ExternalBytes.ResizeByTasks(taskCount); - TaskCount = taskCount; -} - -void TExternalStats::SetHistorySampleCount(ui32 historySampleCount) { - ExternalBytes.HistorySampleCount = historySampleCount; -} - -void TExternalStats::ExportHistory(ui64 baseTimeMs, NDqProto::TDqExternalAggrStats& stats) { - if (stats.HasExternalBytes()) { - ExternalBytes.ExportHistory(baseTimeMs, *stats.MutableExternalBytes()); - } -} - void TAsyncStats::Resize(ui32 taskCount) { Bytes.Resize(taskCount); DecompressedBytes.resize(taskCount); @@ -191,7 +127,6 @@ void TAsyncStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncStatsAg } void TAsyncBufferStats::Resize(ui32 taskCount) { - External.Resize(taskCount); Ingress.Resize(taskCount); Push.Resize(taskCount); Pop.Resize(taskCount); @@ -199,7 +134,6 @@ void TAsyncBufferStats::Resize(ui32 taskCount) { } void TAsyncBufferStats::SetHistorySampleCount(ui32 historySampleCount) { - External.SetHistorySampleCount(historySampleCount); Ingress.SetHistorySampleCount(historySampleCount); Push.SetHistorySampleCount(historySampleCount); Pop.SetHistorySampleCount(historySampleCount); @@ -207,9 +141,6 @@ void TAsyncBufferStats::SetHistorySampleCount(ui32 historySampleCount) { } void TAsyncBufferStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) { - if (stats.HasExternal()) { - External.ExportHistory(baseTimeMs, *stats.MutableExternal()); - } if (stats.HasIngress()) { Ingress.ExportHistory(baseTimeMs, *stats.MutableIngress()); } @@ -472,17 +403,6 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress())); baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush())); baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop())); - for (auto& partitionStat : sourceStat.GetExternalPartitions()) { - auto key = partitionStat.GetPartitionId(); - asyncBufferStats.External.SetNonZero(asyncBufferStats.External.ExternalRows, - index, key, partitionStat.GetExternalRows(), false); - asyncBufferStats.External.SetNonZero(asyncBufferStats.External.ExternalBytes, - index, key, partitionStat.GetExternalBytes(), true); - asyncBufferStats.External.SetNonZero(asyncBufferStats.External.FirstMessageMs, - index, key, partitionStat.GetFirstMessageMs(), false); - asyncBufferStats.External.SetNonZero(asyncBufferStats.External.LastMessageMs, - index, key, partitionStat.GetLastMessageMs(), false); - } } } @@ -1154,8 +1074,6 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, state, stats.GetMaxMemoryUsage(), stats.GetDurationUs())); } -// SIMD-friendly aggregations are below. Compiler is able to vectorize sum/count, but needs help with min/max - void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) { Y_DEBUG_ABORT_UNLESS((data.size() & 3) == 0); @@ -1297,12 +1215,6 @@ void TQueryExecutionStats::ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto } void TQueryExecutionStats::ExportAggAsyncBufferStats(TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) { - auto& external = *stats.MutableExternal(); - data.External.ExternalRows.ExportAggStats(*external.MutableExternalRows()); - data.External.ExternalBytes.ExportAggStats(BaseTimeMs, *external.MutableExternalBytes()); - ExportOffsetAggStats(data.External.FirstMessageMs.Values, *external.MutableFirstMessageMs(), BaseTimeMs); - ExportOffsetAggStats(data.External.LastMessageMs.Values, *external.MutableLastMessageMs(), BaseTimeMs); - external.SetPartitionCount(data.External.Indices.size()); ExportAggAsyncStats(data.Ingress, *stats.MutableIngress()); ExportAggAsyncStats(data.Push, *stats.MutablePush()); ExportAggAsyncStats(data.Pop, *stats.MutablePop()); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h index 9c8b7b7cf2..72f3df581b 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h @@ -23,39 +23,10 @@ struct TTimeSeriesStats { std::vector<std::pair<ui64, ui64>> History; void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats); - void ExportAggStats(NYql::NDqProto::TDqStatsAggr& stats); void ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats); - void Resize(ui32 count); - void SetNonZero(ui32 index, ui64 value); - void Pack(); - void AppendHistory(); -}; - -struct TPartitionedStats : public TTimeSeriesStats { - std::vector<std::vector<ui64>> Parts; - - void ResizeByTasks(ui32 taskCount); - void ResizeByParts(ui32 partCount, ui32 taskCount); - void SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries); -}; - -struct TTimeMultiSeriesStats { - std::unordered_map<TString, ui32> Indices; - ui32 TaskCount = 0; - ui32 PartCount = 0; - - void SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries); -}; - -struct TExternalStats : public TTimeMultiSeriesStats { - TPartitionedStats ExternalRows; - TPartitionedStats ExternalBytes; - TPartitionedStats FirstMessageMs; - TPartitionedStats LastMessageMs; - void Resize(ui32 taskCount); - void SetHistorySampleCount(ui32 historySampleCount); - void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqExternalAggrStats& stats); + void SetNonZero(ui32 taskIndex, ui64 value); + void Pack(); }; struct TMetricInfo { @@ -109,7 +80,6 @@ struct TAsyncBufferStats { Resize(taskCount); } - TExternalStats External; TAsyncStats Ingress; TAsyncStats Push; TAsyncStats Pop; @@ -209,8 +179,8 @@ struct TStageExecutionStats { std::map<TString, TTableStats> Tables; std::map<TString, TAsyncBufferStats> Ingress; std::map<TString, TAsyncBufferStats> Egress; - std::unordered_map<ui32, TAsyncBufferStats> Input; - std::unordered_map<ui32, TAsyncBufferStats> Output; + std::map<ui32, TAsyncBufferStats> Input; + std::map<ui32, TAsyncBufferStats> Output; std::map<TString, TOperatorStats> Joins; std::map<TString, TOperatorStats> Filters; |