diff options
author | Aleksandr Dmitriev <monster@ydb.tech> | 2024-11-11 12:43:22 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-11 12:43:22 +0300 |
commit | bd967a0df42e2e91116a6fc58d29021094f3b6e8 (patch) | |
tree | 4a60d61105b4816c92b1641c344b42989c400512 | |
parent | f6b4b02c6c3337e72bdaea1d917dfaec3b937c6d (diff) | |
download | ydb-bd967a0df42e2e91116a6fc58d29021094f3b6e8.tar.gz |
do not use basic statistics if it is not fully gathered in schemeshard (#11291)
-rw-r--r-- | ydb/core/protos/statistics.proto | 2 | ||||
-rw-r--r-- | ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp | 66 | ||||
-rw-r--r-- | ydb/core/statistics/service/ut/ut_basic_statistics.cpp | 83 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.cpp | 50 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_info_types.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_info_types.h | 6 |
6 files changed, 194 insertions, 22 deletions
diff --git a/ydb/core/protos/statistics.proto b/ydb/core/protos/statistics.proto index 999b3d00dea..fb7f2f0e426 100644 --- a/ydb/core/protos/statistics.proto +++ b/ydb/core/protos/statistics.proto @@ -13,10 +13,12 @@ message TPathEntry { optional uint64 RowCount = 2; optional uint64 BytesSize = 3; optional bool IsColumnTable = 4; + optional bool AreStatsFull = 5; } message TSchemeShardStats { repeated TPathEntry Entries = 1; + optional bool AreAllStatsFull = 2; } // SS -> SA diff --git a/ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp b/ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp index 61efd9a2c74..51c59e451e3 100644 --- a/ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp +++ b/ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp @@ -21,18 +21,72 @@ struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase { << ", stats size# " << stats.size()); NIceDb::TNiceDb db(txc.DB); - db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update( - NIceDb::TUpdate<Schema::BaseStatistics::Stats>(stats)); - Self->BaseStatistics[schemeShardId] = stats; + NKikimrStat::TSchemeShardStats statRecord; + Y_PROTOBUF_SUPPRESS_NODISCARD statRecord.ParseFromString(stats); + + // if statistics is sent from schemeshard for the first time or + // AreAllStatsFull field is not set (schemeshard is working on previous code version) or + // statistics is full for all tables + // then persist incoming statistics without changes + if (!Self->BaseStatistics.contains(schemeShardId) || + !statRecord.HasAreAllStatsFull() || statRecord.GetAreAllStatsFull()) + { + db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update( + NIceDb::TUpdate<Schema::BaseStatistics::Stats>(stats)); + Self->BaseStatistics[schemeShardId] = stats; + + } else { + NKikimrStat::TSchemeShardStats oldStatRecord; + const auto& oldStats = Self->BaseStatistics[schemeShardId]; + Y_PROTOBUF_SUPPRESS_NODISCARD oldStatRecord.ParseFromString(oldStats); + + struct TOldStats { + ui64 RowCount = 0; + ui64 BytesSize = 0; + }; + THashMap<TPathId, TOldStats> oldStatsMap; + + for (const auto& entry : oldStatRecord.GetEntries()) { + auto& oldEntry = oldStatsMap[PathIdFromPathId(entry.GetPathId())]; + oldEntry.RowCount = entry.GetRowCount(); + oldEntry.BytesSize = entry.GetBytesSize(); + } + + NKikimrStat::TSchemeShardStats newStatRecord; + for (const auto& entry : statRecord.GetEntries()) { + auto* newEntry = newStatRecord.AddEntries(); + *newEntry->MutablePathId() = entry.GetPathId(); + newEntry->SetIsColumnTable(entry.GetIsColumnTable()); + newEntry->SetAreStatsFull(entry.GetAreStatsFull()); + + if (entry.GetAreStatsFull()) { + newEntry->SetRowCount(entry.GetRowCount()); + newEntry->SetBytesSize(entry.GetBytesSize()); + } else { + auto oldIter = oldStatsMap.find(PathIdFromPathId(entry.GetPathId())); + if (oldIter != oldStatsMap.end()) { + newEntry->SetRowCount(oldIter->second.RowCount); + newEntry->SetBytesSize(oldIter->second.BytesSize); + } else { + newEntry->SetRowCount(0); + newEntry->SetBytesSize(0); + } + } + } + + TString newStats; + Y_PROTOBUF_SUPPRESS_NODISCARD newStatRecord.SerializeToString(&newStats); + + db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update( + NIceDb::TUpdate<Schema::BaseStatistics::Stats>(newStats)); + Self->BaseStatistics[schemeShardId] = newStats; + } if (!Self->EnableColumnStatistics) { return true; } - NKikimrStat::TSchemeShardStats statRecord; - Y_PROTOBUF_SUPPRESS_NODISCARD statRecord.ParseFromString(stats); - auto& oldPathIds = Self->ScheduleTraversalsBySchemeShard[schemeShardId]; std::unordered_set<TPathId> newPathIds; diff --git a/ydb/core/statistics/service/ut/ut_basic_statistics.cpp b/ydb/core/statistics/service/ut/ut_basic_statistics.cpp index 310623fcaf7..834eff33454 100644 --- a/ydb/core/statistics/service/ut/ut_basic_statistics.cpp +++ b/ydb/core/statistics/service/ut/ut_basic_statistics.cpp @@ -1,15 +1,11 @@ #include <ydb/core/statistics/ut_common/ut_common.h> #include <ydb/library/actors/testlib/test_runtime.h> +#include <ydb/core/testlib/actors/block_events.h> #include <ydb/core/statistics/events.h> #include <ydb/core/statistics/service/service.h> - -#include <ydb/public/sdk/cpp/client/ydb_result/result.h> -#include <ydb/public/sdk/cpp/client/ydb_table/table.h> -#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> - -#include <thread> +#include <ydb/core/tx/datashard/datashard.h> namespace NKikimr { namespace NStat { @@ -75,6 +71,29 @@ void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId } } +ui64 GetRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId) { + auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(nodeIndex)); + NStat::TRequest req; + req.PathId = pathId; + + auto evGet = std::make_unique<TEvStatistics::TEvGetStatistics>(); + evGet->StatType = NStat::EStatType::SIMPLE; + evGet->StatRequests.push_back(req); + + auto sender = runtime.AllocateEdgeActor(nodeIndex); + runtime.Send(statServiceId, sender, evGet.release(), nodeIndex, true); + auto evResult = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvGetStatisticsResult>(sender); + + UNIT_ASSERT(evResult); + UNIT_ASSERT(evResult->Get()); + UNIT_ASSERT(evResult->Get()->StatResponses.size() == 1); + + auto rsp = evResult->Get()->StatResponses[0]; + auto stat = rsp.Simple; + + return stat.RowCount; +} + } // namespace Y_UNIT_TEST_SUITE(BasicStatistics) { @@ -183,6 +202,58 @@ Y_UNIT_TEST_SUITE(BasicStatistics) { ValidateRowCount(runtime, 1, pathId2, 6); } + void TestNotFullStatistics(TTestEnv& env, size_t expectedRowCount) { + auto& runtime = *env.GetServer().GetRuntime(); + + auto pathId = ResolvePathId(runtime, "/Root/Database/Table"); + + TBlockEvents<TEvDataShard::TEvPeriodicTableStats> block(runtime); + runtime.WaitFor("TEvPeriodicTableStats", [&]{ return block.size() >= 3; }); + block.Unblock(3); + + bool firstStatsToSA = false; + auto statsObserver1 = runtime.AddObserver<TEvStatistics::TEvSchemeShardStats>([&](auto&){ + firstStatsToSA = true; + }); + runtime.WaitFor("TEvSchemeShardStats 1", [&]{ return firstStatsToSA; }); + + UNIT_ASSERT(GetRowCount(runtime, 1, pathId) == 0); + + block.Unblock(); + block.Stop(); + + bool secondStatsToSA = false; + auto statsObserver2 = runtime.AddObserver<TEvStatistics::TEvSchemeShardStats>([&](auto&){ + secondStatsToSA = true; + }); + runtime.WaitFor("TEvSchemeShardStats 2", [&]{ return secondStatsToSA; }); + + bool propagate = false; + auto propagateObserver = runtime.AddObserver<TEvStatistics::TEvPropagateStatistics>([&](auto&){ + propagate = true; + }); + runtime.WaitFor("TEvPropagateStatistics", [&]{ return propagate; }); + + UNIT_ASSERT(GetRowCount(runtime, 1, pathId) == expectedRowCount); + } + + Y_UNIT_TEST(NotFullStatisticsDatashard) { + TTestEnv env(1, 1); + + CreateDatabase(env, "Database"); + CreateUniformTable(env, "Database", "Table"); + + TestNotFullStatistics(env, 4); + } + + Y_UNIT_TEST(NotFullStatisticsColumnshard) { + TTestEnv env(1, 1); + + CreateDatabase(env, "Database"); + CreateColumnStoreTable(env, "Database", "Table", 4); + + TestNotFullStatistics(env, 1000); + } } } // NSysView diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 6660c9cff25..e43eb9933f9 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -7405,7 +7405,7 @@ void TSchemeShard::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& StatisticsAggregatorId = TTabletId(entry.DomainInfo->Params.GetStatisticsAggregator()); LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, "Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult, StatisticsAggregatorId=" << StatisticsAggregatorId - << ", at schemeshard: " << TabletID()); + << ", at schemeshard: " << TabletID()); ConnectToSA(); } } @@ -7414,13 +7414,16 @@ void TSchemeShard::Handle(TEvPrivate::TEvSendBaseStatsToSA::TPtr&, const TActorC TDuration delta = SendBaseStatsToSA(); LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, "Schedule next SendBaseStatsToSA in " << delta - << ", at schemeshard: " << TabletID()); + << ", at schemeshard: " << TabletID()); ctx.Schedule(delta, new TEvPrivate::TEvSendBaseStatsToSA()); } void TSchemeShard::InitializeStatistics(const TActorContext& ctx) { ResolveSA(); - ctx.Schedule(TDuration::Seconds(30), new TEvPrivate::TEvSendBaseStatsToSA()); + // since columnshard statistics is now sent once in a minute, + // we expect that in most cases we will gather full stats + // before sending them to StatisticsAggregator + ctx.Schedule(TDuration::Seconds(120), new TEvPrivate::TEvSendBaseStatsToSA()); } void TSchemeShard::ResolveSA() { @@ -7494,30 +7497,56 @@ TDuration TSchemeShard::SendBaseStatsToSA() { } int count = 0; + bool areAllStatsFull = true; NKikimrStat::TSchemeShardStats record; for (const auto& [pathId, tableInfo] : Tables) { - const auto& aggregated = tableInfo->GetStats().Aggregated; + const auto& stats = tableInfo->GetStats(); + const auto& aggregated = stats.Aggregated; + bool areStatsFull = stats.AreStatsFull(); + auto* entry = record.AddEntries(); auto* entryPathId = entry->MutablePathId(); entryPathId->SetOwnerId(pathId.OwnerId); entryPathId->SetLocalId(pathId.LocalPathId); - entry->SetRowCount(aggregated.RowCount); - entry->SetBytesSize(aggregated.DataSize); + entry->SetRowCount(areStatsFull ? aggregated.RowCount : 0); + entry->SetBytesSize(areStatsFull ? aggregated.DataSize : 0); entry->SetIsColumnTable(false); + entry->SetAreStatsFull(areStatsFull); + areAllStatsFull = areAllStatsFull && areStatsFull; + ++count; } + auto columnTablesPathIds = ColumnTables.GetAllPathIds(); for (const auto& pathId : columnTablesPathIds) { const auto& tableInfo = ColumnTables.GetVerified(pathId); - const auto& aggregated = tableInfo->Stats.Aggregated; + const auto& stats = tableInfo->GetStats(); + const TTableAggregatedStats* aggregatedStats = nullptr; + + // stats are stored differently for standalone and non-standalone column tables + if (tableInfo->IsStandalone()) { + aggregatedStats = &stats; + } else { + auto it = stats.TableStats.find(pathId); + if (it == stats.TableStats.end()) { + continue; + } + aggregatedStats = &it->second; + } + const auto& aggregated = aggregatedStats->Aggregated; + bool areStatsFull = aggregatedStats->AreStatsFull(); + auto* entry = record.AddEntries(); auto* entryPathId = entry->MutablePathId(); entryPathId->SetOwnerId(pathId.OwnerId); entryPathId->SetLocalId(pathId.LocalPathId); - entry->SetRowCount(aggregated.RowCount); - entry->SetBytesSize(aggregated.DataSize); + entry->SetRowCount(areStatsFull ? aggregated.RowCount : 0); + entry->SetBytesSize(areStatsFull ? aggregated.DataSize : 0); entry->SetIsColumnTable(true); + entry->SetAreStatsFull(areStatsFull); + areAllStatsFull = areAllStatsFull && areStatsFull; + ++count; } @@ -7528,8 +7557,9 @@ TDuration TSchemeShard::SendBaseStatsToSA() { return TDuration::Seconds(30); } + record.SetAreAllStatsFull(areAllStatsFull); + TString stats; - stats.clear(); Y_PROTOBUF_SUPPRESS_NODISCARD record.SerializeToString(&stats); auto event = std::make_unique<NStat::TEvStatistics::TEvSchemeShardStats>(); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 3436988902f..c3304ce6e4c 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -1636,6 +1636,8 @@ void TTableInfo::SetPartitioning(TVector<TTableShardInfo>&& newPartitioning) { TPartitionStats newAggregatedStats; newAggregatedStats.PartCount = newPartitioning.size(); ui64 cpuTotal = 0; + THashSet<TShardIdx> newUpdatedStats; + for (const auto& np : newPartitioning) { auto idx = np.ShardIdx; auto& newStats(newPartitionStats[idx]); @@ -1658,6 +1660,10 @@ void TTableInfo::SetPartitioning(TVector<TTableShardInfo>&& newPartitioning) { newAggregatedStats.WriteThroughput += newStats.WriteThroughput; newAggregatedStats.ReadIops += newStats.ReadIops; newAggregatedStats.WriteIops += newStats.WriteIops; + + if (Stats.PartitionStats.contains(idx) && Stats.UpdatedStats.contains(idx)) { + newUpdatedStats.insert(idx); + } } newAggregatedStats.SetCurrentRawCpuUsage(cpuTotal, AppData()->TimeProvider->Now()); newAggregatedStats.LastAccessTime = Stats.Aggregated.LastAccessTime; @@ -1684,6 +1690,7 @@ void TTableInfo::SetPartitioning(TVector<TTableShardInfo>&& newPartitioning) { Stats.PartitionStats.swap(newPartitionStats); Stats.Aggregated = newAggregatedStats; + Stats.UpdatedStats.swap(newUpdatedStats); Partitions.swap(newPartitioning); PreserializedTablePartitions.clear(); PreserializedTablePartitionsNoKeys.clear(); @@ -1790,6 +1797,8 @@ void TTableAggregatedStats::UpdateShardStats(TShardIdx datashardIdx, const TPart Aggregated.TxCompleteLag = Max(Aggregated.TxCompleteLag, ps.second.TxCompleteLag); } } + + UpdatedStats.insert(datashardIdx); } void TAggregatedStats::UpdateTableStats(TShardIdx shardIdx, const TPathId& pathId, const TPartitionStats& newStats) { diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 12ee6c637d3..d3fdc89e67a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -328,6 +328,12 @@ struct TTableAggregatedStats { THashMap<TShardIdx, TPartitionStats> PartitionStats; size_t PartitionStatsUpdated = 0; + THashSet<TShardIdx> UpdatedStats; + + bool AreStatsFull() const { + return Aggregated.PartCount && UpdatedStats.size() == Aggregated.PartCount; + } + void UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats); }; |