aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Dmitriev <monster@ydb.tech>2024-11-11 12:43:22 +0300
committerGitHub <noreply@github.com>2024-11-11 12:43:22 +0300
commitbd967a0df42e2e91116a6fc58d29021094f3b6e8 (patch)
tree4a60d61105b4816c92b1641c344b42989c400512
parentf6b4b02c6c3337e72bdaea1d917dfaec3b937c6d (diff)
downloadydb-bd967a0df42e2e91116a6fc58d29021094f3b6e8.tar.gz
do not use basic statistics if it is not fully gathered in schemeshard (#11291)
-rw-r--r--ydb/core/protos/statistics.proto2
-rw-r--r--ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp66
-rw-r--r--ydb/core/statistics/service/ut/ut_basic_statistics.cpp83
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp50
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.cpp9
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h6
6 files changed, 194 insertions, 22 deletions
diff --git a/ydb/core/protos/statistics.proto b/ydb/core/protos/statistics.proto
index 999b3d00dea..fb7f2f0e426 100644
--- a/ydb/core/protos/statistics.proto
+++ b/ydb/core/protos/statistics.proto
@@ -13,10 +13,12 @@ message TPathEntry {
optional uint64 RowCount = 2;
optional uint64 BytesSize = 3;
optional bool IsColumnTable = 4;
+ optional bool AreStatsFull = 5;
}
message TSchemeShardStats {
repeated TPathEntry Entries = 1;
+ optional bool AreAllStatsFull = 2;
}
// SS -> SA
diff --git a/ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp b/ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp
index 61efd9a2c74..51c59e451e3 100644
--- a/ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp
+++ b/ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp
@@ -21,18 +21,72 @@ struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase {
<< ", stats size# " << stats.size());
NIceDb::TNiceDb db(txc.DB);
- db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update(
- NIceDb::TUpdate<Schema::BaseStatistics::Stats>(stats));
- Self->BaseStatistics[schemeShardId] = stats;
+ NKikimrStat::TSchemeShardStats statRecord;
+ Y_PROTOBUF_SUPPRESS_NODISCARD statRecord.ParseFromString(stats);
+
+ // if statistics is sent from schemeshard for the first time or
+ // AreAllStatsFull field is not set (schemeshard is working on previous code version) or
+ // statistics is full for all tables
+ // then persist incoming statistics without changes
+ if (!Self->BaseStatistics.contains(schemeShardId) ||
+ !statRecord.HasAreAllStatsFull() || statRecord.GetAreAllStatsFull())
+ {
+ db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update(
+ NIceDb::TUpdate<Schema::BaseStatistics::Stats>(stats));
+ Self->BaseStatistics[schemeShardId] = stats;
+
+ } else {
+ NKikimrStat::TSchemeShardStats oldStatRecord;
+ const auto& oldStats = Self->BaseStatistics[schemeShardId];
+ Y_PROTOBUF_SUPPRESS_NODISCARD oldStatRecord.ParseFromString(oldStats);
+
+ struct TOldStats {
+ ui64 RowCount = 0;
+ ui64 BytesSize = 0;
+ };
+ THashMap<TPathId, TOldStats> oldStatsMap;
+
+ for (const auto& entry : oldStatRecord.GetEntries()) {
+ auto& oldEntry = oldStatsMap[PathIdFromPathId(entry.GetPathId())];
+ oldEntry.RowCount = entry.GetRowCount();
+ oldEntry.BytesSize = entry.GetBytesSize();
+ }
+
+ NKikimrStat::TSchemeShardStats newStatRecord;
+ for (const auto& entry : statRecord.GetEntries()) {
+ auto* newEntry = newStatRecord.AddEntries();
+ *newEntry->MutablePathId() = entry.GetPathId();
+ newEntry->SetIsColumnTable(entry.GetIsColumnTable());
+ newEntry->SetAreStatsFull(entry.GetAreStatsFull());
+
+ if (entry.GetAreStatsFull()) {
+ newEntry->SetRowCount(entry.GetRowCount());
+ newEntry->SetBytesSize(entry.GetBytesSize());
+ } else {
+ auto oldIter = oldStatsMap.find(PathIdFromPathId(entry.GetPathId()));
+ if (oldIter != oldStatsMap.end()) {
+ newEntry->SetRowCount(oldIter->second.RowCount);
+ newEntry->SetBytesSize(oldIter->second.BytesSize);
+ } else {
+ newEntry->SetRowCount(0);
+ newEntry->SetBytesSize(0);
+ }
+ }
+ }
+
+ TString newStats;
+ Y_PROTOBUF_SUPPRESS_NODISCARD newStatRecord.SerializeToString(&newStats);
+
+ db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update(
+ NIceDb::TUpdate<Schema::BaseStatistics::Stats>(newStats));
+ Self->BaseStatistics[schemeShardId] = newStats;
+ }
if (!Self->EnableColumnStatistics) {
return true;
}
- NKikimrStat::TSchemeShardStats statRecord;
- Y_PROTOBUF_SUPPRESS_NODISCARD statRecord.ParseFromString(stats);
-
auto& oldPathIds = Self->ScheduleTraversalsBySchemeShard[schemeShardId];
std::unordered_set<TPathId> newPathIds;
diff --git a/ydb/core/statistics/service/ut/ut_basic_statistics.cpp b/ydb/core/statistics/service/ut/ut_basic_statistics.cpp
index 310623fcaf7..834eff33454 100644
--- a/ydb/core/statistics/service/ut/ut_basic_statistics.cpp
+++ b/ydb/core/statistics/service/ut/ut_basic_statistics.cpp
@@ -1,15 +1,11 @@
#include <ydb/core/statistics/ut_common/ut_common.h>
#include <ydb/library/actors/testlib/test_runtime.h>
+#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/statistics/events.h>
#include <ydb/core/statistics/service/service.h>
-
-#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
-#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
-#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
-
-#include <thread>
+#include <ydb/core/tx/datashard/datashard.h>
namespace NKikimr {
namespace NStat {
@@ -75,6 +71,29 @@ void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId
}
}
+ui64 GetRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId) {
+ auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(nodeIndex));
+ NStat::TRequest req;
+ req.PathId = pathId;
+
+ auto evGet = std::make_unique<TEvStatistics::TEvGetStatistics>();
+ evGet->StatType = NStat::EStatType::SIMPLE;
+ evGet->StatRequests.push_back(req);
+
+ auto sender = runtime.AllocateEdgeActor(nodeIndex);
+ runtime.Send(statServiceId, sender, evGet.release(), nodeIndex, true);
+ auto evResult = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvGetStatisticsResult>(sender);
+
+ UNIT_ASSERT(evResult);
+ UNIT_ASSERT(evResult->Get());
+ UNIT_ASSERT(evResult->Get()->StatResponses.size() == 1);
+
+ auto rsp = evResult->Get()->StatResponses[0];
+ auto stat = rsp.Simple;
+
+ return stat.RowCount;
+}
+
} // namespace
Y_UNIT_TEST_SUITE(BasicStatistics) {
@@ -183,6 +202,58 @@ Y_UNIT_TEST_SUITE(BasicStatistics) {
ValidateRowCount(runtime, 1, pathId2, 6);
}
+ void TestNotFullStatistics(TTestEnv& env, size_t expectedRowCount) {
+ auto& runtime = *env.GetServer().GetRuntime();
+
+ auto pathId = ResolvePathId(runtime, "/Root/Database/Table");
+
+ TBlockEvents<TEvDataShard::TEvPeriodicTableStats> block(runtime);
+ runtime.WaitFor("TEvPeriodicTableStats", [&]{ return block.size() >= 3; });
+ block.Unblock(3);
+
+ bool firstStatsToSA = false;
+ auto statsObserver1 = runtime.AddObserver<TEvStatistics::TEvSchemeShardStats>([&](auto&){
+ firstStatsToSA = true;
+ });
+ runtime.WaitFor("TEvSchemeShardStats 1", [&]{ return firstStatsToSA; });
+
+ UNIT_ASSERT(GetRowCount(runtime, 1, pathId) == 0);
+
+ block.Unblock();
+ block.Stop();
+
+ bool secondStatsToSA = false;
+ auto statsObserver2 = runtime.AddObserver<TEvStatistics::TEvSchemeShardStats>([&](auto&){
+ secondStatsToSA = true;
+ });
+ runtime.WaitFor("TEvSchemeShardStats 2", [&]{ return secondStatsToSA; });
+
+ bool propagate = false;
+ auto propagateObserver = runtime.AddObserver<TEvStatistics::TEvPropagateStatistics>([&](auto&){
+ propagate = true;
+ });
+ runtime.WaitFor("TEvPropagateStatistics", [&]{ return propagate; });
+
+ UNIT_ASSERT(GetRowCount(runtime, 1, pathId) == expectedRowCount);
+ }
+
+ Y_UNIT_TEST(NotFullStatisticsDatashard) {
+ TTestEnv env(1, 1);
+
+ CreateDatabase(env, "Database");
+ CreateUniformTable(env, "Database", "Table");
+
+ TestNotFullStatistics(env, 4);
+ }
+
+ Y_UNIT_TEST(NotFullStatisticsColumnshard) {
+ TTestEnv env(1, 1);
+
+ CreateDatabase(env, "Database");
+ CreateColumnStoreTable(env, "Database", "Table", 4);
+
+ TestNotFullStatistics(env, 1000);
+ }
}
} // NSysView
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 6660c9cff25..e43eb9933f9 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -7405,7 +7405,7 @@ void TSchemeShard::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr&
StatisticsAggregatorId = TTabletId(entry.DomainInfo->Params.GetStatisticsAggregator());
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
"Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult, StatisticsAggregatorId=" << StatisticsAggregatorId
- << ", at schemeshard: " << TabletID());
+ << ", at schemeshard: " << TabletID());
ConnectToSA();
}
}
@@ -7414,13 +7414,16 @@ void TSchemeShard::Handle(TEvPrivate::TEvSendBaseStatsToSA::TPtr&, const TActorC
TDuration delta = SendBaseStatsToSA();
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
"Schedule next SendBaseStatsToSA in " << delta
- << ", at schemeshard: " << TabletID());
+ << ", at schemeshard: " << TabletID());
ctx.Schedule(delta, new TEvPrivate::TEvSendBaseStatsToSA());
}
void TSchemeShard::InitializeStatistics(const TActorContext& ctx) {
ResolveSA();
- ctx.Schedule(TDuration::Seconds(30), new TEvPrivate::TEvSendBaseStatsToSA());
+ // since columnshard statistics is now sent once in a minute,
+ // we expect that in most cases we will gather full stats
+ // before sending them to StatisticsAggregator
+ ctx.Schedule(TDuration::Seconds(120), new TEvPrivate::TEvSendBaseStatsToSA());
}
void TSchemeShard::ResolveSA() {
@@ -7494,30 +7497,56 @@ TDuration TSchemeShard::SendBaseStatsToSA() {
}
int count = 0;
+ bool areAllStatsFull = true;
NKikimrStat::TSchemeShardStats record;
for (const auto& [pathId, tableInfo] : Tables) {
- const auto& aggregated = tableInfo->GetStats().Aggregated;
+ const auto& stats = tableInfo->GetStats();
+ const auto& aggregated = stats.Aggregated;
+ bool areStatsFull = stats.AreStatsFull();
+
auto* entry = record.AddEntries();
auto* entryPathId = entry->MutablePathId();
entryPathId->SetOwnerId(pathId.OwnerId);
entryPathId->SetLocalId(pathId.LocalPathId);
- entry->SetRowCount(aggregated.RowCount);
- entry->SetBytesSize(aggregated.DataSize);
+ entry->SetRowCount(areStatsFull ? aggregated.RowCount : 0);
+ entry->SetBytesSize(areStatsFull ? aggregated.DataSize : 0);
entry->SetIsColumnTable(false);
+ entry->SetAreStatsFull(areStatsFull);
+ areAllStatsFull = areAllStatsFull && areStatsFull;
+
++count;
}
+
auto columnTablesPathIds = ColumnTables.GetAllPathIds();
for (const auto& pathId : columnTablesPathIds) {
const auto& tableInfo = ColumnTables.GetVerified(pathId);
- const auto& aggregated = tableInfo->Stats.Aggregated;
+ const auto& stats = tableInfo->GetStats();
+ const TTableAggregatedStats* aggregatedStats = nullptr;
+
+ // stats are stored differently for standalone and non-standalone column tables
+ if (tableInfo->IsStandalone()) {
+ aggregatedStats = &stats;
+ } else {
+ auto it = stats.TableStats.find(pathId);
+ if (it == stats.TableStats.end()) {
+ continue;
+ }
+ aggregatedStats = &it->second;
+ }
+ const auto& aggregated = aggregatedStats->Aggregated;
+ bool areStatsFull = aggregatedStats->AreStatsFull();
+
auto* entry = record.AddEntries();
auto* entryPathId = entry->MutablePathId();
entryPathId->SetOwnerId(pathId.OwnerId);
entryPathId->SetLocalId(pathId.LocalPathId);
- entry->SetRowCount(aggregated.RowCount);
- entry->SetBytesSize(aggregated.DataSize);
+ entry->SetRowCount(areStatsFull ? aggregated.RowCount : 0);
+ entry->SetBytesSize(areStatsFull ? aggregated.DataSize : 0);
entry->SetIsColumnTable(true);
+ entry->SetAreStatsFull(areStatsFull);
+ areAllStatsFull = areAllStatsFull && areStatsFull;
+
++count;
}
@@ -7528,8 +7557,9 @@ TDuration TSchemeShard::SendBaseStatsToSA() {
return TDuration::Seconds(30);
}
+ record.SetAreAllStatsFull(areAllStatsFull);
+
TString stats;
- stats.clear();
Y_PROTOBUF_SUPPRESS_NODISCARD record.SerializeToString(&stats);
auto event = std::make_unique<NStat::TEvStatistics::TEvSchemeShardStats>();
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
index 3436988902f..c3304ce6e4c 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
@@ -1636,6 +1636,8 @@ void TTableInfo::SetPartitioning(TVector<TTableShardInfo>&& newPartitioning) {
TPartitionStats newAggregatedStats;
newAggregatedStats.PartCount = newPartitioning.size();
ui64 cpuTotal = 0;
+ THashSet<TShardIdx> newUpdatedStats;
+
for (const auto& np : newPartitioning) {
auto idx = np.ShardIdx;
auto& newStats(newPartitionStats[idx]);
@@ -1658,6 +1660,10 @@ void TTableInfo::SetPartitioning(TVector<TTableShardInfo>&& newPartitioning) {
newAggregatedStats.WriteThroughput += newStats.WriteThroughput;
newAggregatedStats.ReadIops += newStats.ReadIops;
newAggregatedStats.WriteIops += newStats.WriteIops;
+
+ if (Stats.PartitionStats.contains(idx) && Stats.UpdatedStats.contains(idx)) {
+ newUpdatedStats.insert(idx);
+ }
}
newAggregatedStats.SetCurrentRawCpuUsage(cpuTotal, AppData()->TimeProvider->Now());
newAggregatedStats.LastAccessTime = Stats.Aggregated.LastAccessTime;
@@ -1684,6 +1690,7 @@ void TTableInfo::SetPartitioning(TVector<TTableShardInfo>&& newPartitioning) {
Stats.PartitionStats.swap(newPartitionStats);
Stats.Aggregated = newAggregatedStats;
+ Stats.UpdatedStats.swap(newUpdatedStats);
Partitions.swap(newPartitioning);
PreserializedTablePartitions.clear();
PreserializedTablePartitionsNoKeys.clear();
@@ -1790,6 +1797,8 @@ void TTableAggregatedStats::UpdateShardStats(TShardIdx datashardIdx, const TPart
Aggregated.TxCompleteLag = Max(Aggregated.TxCompleteLag, ps.second.TxCompleteLag);
}
}
+
+ UpdatedStats.insert(datashardIdx);
}
void TAggregatedStats::UpdateTableStats(TShardIdx shardIdx, const TPathId& pathId, const TPartitionStats& newStats) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 12ee6c637d3..d3fdc89e67a 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -328,6 +328,12 @@ struct TTableAggregatedStats {
THashMap<TShardIdx, TPartitionStats> PartitionStats;
size_t PartitionStatsUpdated = 0;
+ THashSet<TShardIdx> UpdatedStats;
+
+ bool AreStatsFull() const {
+ return Aggregated.PartCount && UpdatedStats.size() == Aggregated.PartCount;
+ }
+
void UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats);
};