aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorОлег <150132506+iddqdex@users.noreply.github.com>2025-03-11 22:38:37 +0300
committerGitHub <noreply@github.com>2025-03-11 22:38:37 +0300
commit76f835326e78bde9852d70dba2970958a7c70833 (patch)
tree6d7834ce232670cdd945ccba4f67c9dfe613543b
parentfd69f43d3ff7014f3a8b13bcad0a020d74adc591 (diff)
downloadydb-76f835326e78bde9852d70dba2970958a7c70833.tar.gz
Revert "Column Shard Bytes Time Series (#15423)" (#15603)
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp36
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_stats.cpp100
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_stats.h38
3 files changed, 26 insertions, 148 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index eccc23a806..5081e57b79 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -80,9 +80,7 @@ void TKqpScanComputeActor::AcquireRateQuota() {
}
void TKqpScanComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, bool last) {
- Y_UNUSED(last);
-
- if (ScanData && dst->TasksSize() > 0) {
+ if (last && ScanData && dst->TasksSize() > 0) {
YQL_ENSURE(dst->TasksSize() == 1);
auto* taskStats = dst->MutableTasks(0);
@@ -98,25 +96,23 @@ void TKqpScanComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, b
tableStats->SetTablePath(ScanData->TablePath);
if (auto* stats = ScanData->BasicStats.get()) {
- if (RuntimeSettings.StatsMode >= NYql::NDqProto::DQ_STATS_MODE_FULL) {
- ingressStats.SetRows(stats->Rows);
- ingressStats.SetBytes(stats->Bytes);
- ingressStats.SetFirstMessageMs(stats->FirstMessageMs);
- ingressStats.SetLastMessageMs(stats->LastMessageMs);
-
- for (auto& [shardId, stat] : stats->ExternalStats) {
- auto& externalStat = *sourceStats.AddExternalPartitions();
- externalStat.SetPartitionId(ToString(shardId));
- externalStat.SetExternalRows(stat.ExternalRows);
- externalStat.SetExternalBytes(stat.ExternalBytes);
- externalStat.SetFirstMessageMs(stat.FirstMessageMs);
- externalStat.SetLastMessageMs(stat.LastMessageMs);
- }
-
- taskStats->SetIngressRows(taskStats->GetIngressRows() + stats->Rows);
- taskStats->SetIngressBytes(taskStats->GetIngressBytes() + stats->Bytes);
+ ingressStats.SetRows(stats->Rows);
+ ingressStats.SetBytes(stats->Bytes);
+ ingressStats.SetFirstMessageMs(stats->FirstMessageMs);
+ ingressStats.SetLastMessageMs(stats->LastMessageMs);
+
+ for (auto& [shardId, stat] : stats->ExternalStats) {
+ auto& externalStat = *sourceStats.AddExternalPartitions();
+ externalStat.SetPartitionId(ToString(shardId));
+ externalStat.SetExternalRows(stat.ExternalRows);
+ externalStat.SetExternalBytes(stat.ExternalBytes);
+ externalStat.SetFirstMessageMs(stat.FirstMessageMs);
+ externalStat.SetLastMessageMs(stat.LastMessageMs);
}
+ taskStats->SetIngressRows(taskStats->GetIngressRows() + stats->Rows);
+ taskStats->SetIngressBytes(taskStats->GetIngressBytes() + stats->Bytes);
+
tableStats->SetReadRows(stats->Rows);
tableStats->SetReadBytes(stats->Bytes);
tableStats->SetAffectedPartitions(stats->AffectedShards);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
index 4af9d7db47..3cf3dfbf60 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
@@ -12,12 +12,8 @@ ui64 NonZeroMin(ui64 a, ui64 b) {
return (b == 0) ? a : ((a == 0 || a > b) ? b : a);
}
-void TTimeSeriesStats::ExportAggStats(NYql::NDqProto::TDqStatsAggr& stats) {
- NKikimr::NKqp::ExportAggStats(Values, stats);
-}
-
void TTimeSeriesStats::ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats) {
- ExportAggStats(stats);
+ NKikimr::NKqp::ExportAggStats(Values, stats);
ExportHistory(baseTimeMs, stats);
}
@@ -32,20 +28,16 @@ void TTimeSeriesStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAg
}
}
-void TTimeSeriesStats::Resize(ui32 count) {
- Values.resize(count);
+void TTimeSeriesStats::Resize(ui32 taskCount) {
+ Values.resize(taskCount);
}
-void TTimeSeriesStats::SetNonZero(ui32 index, ui64 value) {
+void TTimeSeriesStats::SetNonZero(ui32 taskIndex, ui64 value) {
if (value) {
Sum += value;
- Sum -= Values[index];
- Values[index] = value;
+ Sum -= Values[taskIndex];
+ Values[taskIndex] = value;
}
- AppendHistory();
-}
-
-void TTimeSeriesStats::AppendHistory() {
if (HistorySampleCount) {
auto nowMs = Now().MilliSeconds();
@@ -105,62 +97,6 @@ void TTimeSeriesStats::Pack() {
}
}
-void TPartitionedStats::ResizeByTasks(ui32 taskCount) {
- for (auto& p : Parts) {
- p.resize(taskCount);
- }
-}
-
-void TPartitionedStats::ResizeByParts(ui32 partCount, ui32 taskCount) {
- auto oldPartCount = Parts.size();
- Parts.resize(partCount);
- for(auto i = oldPartCount; i < partCount; i++) {
- Parts[i].resize(taskCount);
- }
- Resize(partCount);
-}
-
-void TPartitionedStats::SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries) {
- auto& part = Parts[partIndex];
- auto delta = value - part[taskIndex];
- part[taskIndex] = value;
- Values[partIndex] += delta;
- Sum += delta;
- if (recordTimeSeries) {
- AppendHistory();
- }
-}
-
-void TTimeMultiSeriesStats::SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries) {
- auto [it, inserted] = Indices.try_emplace(key);
- if (inserted) {
- it->second = Indices.size() - 1;
- if (PartCount < Indices.size()) {
- PartCount += 4;
- }
- }
- if (stats.Parts.size() < PartCount) {
- stats.ResizeByParts(PartCount, TaskCount);
- }
- stats.SetNonZero(taskIndex, it->second, value, recordTimeSeries);
-}
-
-void TExternalStats::Resize(ui32 taskCount) {
- ExternalRows.ResizeByTasks(taskCount);
- ExternalBytes.ResizeByTasks(taskCount);
- TaskCount = taskCount;
-}
-
-void TExternalStats::SetHistorySampleCount(ui32 historySampleCount) {
- ExternalBytes.HistorySampleCount = historySampleCount;
-}
-
-void TExternalStats::ExportHistory(ui64 baseTimeMs, NDqProto::TDqExternalAggrStats& stats) {
- if (stats.HasExternalBytes()) {
- ExternalBytes.ExportHistory(baseTimeMs, *stats.MutableExternalBytes());
- }
-}
-
void TAsyncStats::Resize(ui32 taskCount) {
Bytes.Resize(taskCount);
DecompressedBytes.resize(taskCount);
@@ -191,7 +127,6 @@ void TAsyncStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncStatsAg
}
void TAsyncBufferStats::Resize(ui32 taskCount) {
- External.Resize(taskCount);
Ingress.Resize(taskCount);
Push.Resize(taskCount);
Pop.Resize(taskCount);
@@ -199,7 +134,6 @@ void TAsyncBufferStats::Resize(ui32 taskCount) {
}
void TAsyncBufferStats::SetHistorySampleCount(ui32 historySampleCount) {
- External.SetHistorySampleCount(historySampleCount);
Ingress.SetHistorySampleCount(historySampleCount);
Push.SetHistorySampleCount(historySampleCount);
Pop.SetHistorySampleCount(historySampleCount);
@@ -207,9 +141,6 @@ void TAsyncBufferStats::SetHistorySampleCount(ui32 historySampleCount) {
}
void TAsyncBufferStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
- if (stats.HasExternal()) {
- External.ExportHistory(baseTimeMs, *stats.MutableExternal());
- }
if (stats.HasIngress()) {
Ingress.ExportHistory(baseTimeMs, *stats.MutableIngress());
}
@@ -472,17 +403,6 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress()));
baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush()));
baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop()));
- for (auto& partitionStat : sourceStat.GetExternalPartitions()) {
- auto key = partitionStat.GetPartitionId();
- asyncBufferStats.External.SetNonZero(asyncBufferStats.External.ExternalRows,
- index, key, partitionStat.GetExternalRows(), false);
- asyncBufferStats.External.SetNonZero(asyncBufferStats.External.ExternalBytes,
- index, key, partitionStat.GetExternalBytes(), true);
- asyncBufferStats.External.SetNonZero(asyncBufferStats.External.FirstMessageMs,
- index, key, partitionStat.GetFirstMessageMs(), false);
- asyncBufferStats.External.SetNonZero(asyncBufferStats.External.LastMessageMs,
- index, key, partitionStat.GetLastMessageMs(), false);
- }
}
}
@@ -1154,8 +1074,6 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD
BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, state, stats.GetMaxMemoryUsage(), stats.GetDurationUs()));
}
-// SIMD-friendly aggregations are below. Compiler is able to vectorize sum/count, but needs help with min/max
-
void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) {
Y_DEBUG_ABORT_UNLESS((data.size() & 3) == 0);
@@ -1297,12 +1215,6 @@ void TQueryExecutionStats::ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto
}
void TQueryExecutionStats::ExportAggAsyncBufferStats(TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
- auto& external = *stats.MutableExternal();
- data.External.ExternalRows.ExportAggStats(*external.MutableExternalRows());
- data.External.ExternalBytes.ExportAggStats(BaseTimeMs, *external.MutableExternalBytes());
- ExportOffsetAggStats(data.External.FirstMessageMs.Values, *external.MutableFirstMessageMs(), BaseTimeMs);
- ExportOffsetAggStats(data.External.LastMessageMs.Values, *external.MutableLastMessageMs(), BaseTimeMs);
- external.SetPartitionCount(data.External.Indices.size());
ExportAggAsyncStats(data.Ingress, *stats.MutableIngress());
ExportAggAsyncStats(data.Push, *stats.MutablePush());
ExportAggAsyncStats(data.Pop, *stats.MutablePop());
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h
index 9c8b7b7cf2..72f3df581b 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h
@@ -23,39 +23,10 @@ struct TTimeSeriesStats {
std::vector<std::pair<ui64, ui64>> History;
void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats);
- void ExportAggStats(NYql::NDqProto::TDqStatsAggr& stats);
void ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats);
- void Resize(ui32 count);
- void SetNonZero(ui32 index, ui64 value);
- void Pack();
- void AppendHistory();
-};
-
-struct TPartitionedStats : public TTimeSeriesStats {
- std::vector<std::vector<ui64>> Parts;
-
- void ResizeByTasks(ui32 taskCount);
- void ResizeByParts(ui32 partCount, ui32 taskCount);
- void SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries);
-};
-
-struct TTimeMultiSeriesStats {
- std::unordered_map<TString, ui32> Indices;
- ui32 TaskCount = 0;
- ui32 PartCount = 0;
-
- void SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries);
-};
-
-struct TExternalStats : public TTimeMultiSeriesStats {
- TPartitionedStats ExternalRows;
- TPartitionedStats ExternalBytes;
- TPartitionedStats FirstMessageMs;
- TPartitionedStats LastMessageMs;
-
void Resize(ui32 taskCount);
- void SetHistorySampleCount(ui32 historySampleCount);
- void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqExternalAggrStats& stats);
+ void SetNonZero(ui32 taskIndex, ui64 value);
+ void Pack();
};
struct TMetricInfo {
@@ -109,7 +80,6 @@ struct TAsyncBufferStats {
Resize(taskCount);
}
- TExternalStats External;
TAsyncStats Ingress;
TAsyncStats Push;
TAsyncStats Pop;
@@ -209,8 +179,8 @@ struct TStageExecutionStats {
std::map<TString, TTableStats> Tables;
std::map<TString, TAsyncBufferStats> Ingress;
std::map<TString, TAsyncBufferStats> Egress;
- std::unordered_map<ui32, TAsyncBufferStats> Input;
- std::unordered_map<ui32, TAsyncBufferStats> Output;
+ std::map<ui32, TAsyncBufferStats> Input;
+ std::map<ui32, TAsyncBufferStats> Output;
std::map<TString, TOperatorStats> Joins;
std::map<TString, TOperatorStats> Filters;