diff options
author | tesseract <tesseract@yandex-team.com> | 2023-03-15 09:35:44 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-03-15 09:35:44 +0300 |
commit | 6adce5f78300a9c9a5ec82efe9b0b6076052c479 (patch) | |
tree | c321b0688caa1a64b2b9f092c37f04a5c8d5165a | |
parent | 1b4e8b8c23ea33cfab0574d701b6b9226a3d8b39 (diff) | |
download | ydb-6adce5f78300a9c9a5ec82efe9b0b6076052c479.tar.gz |
Персистить статистику от Partitions в ReadBalancere-е
-rw-r--r-- | ydb/core/persqueue/events/internal.h | 9 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 14 | ||||
-rw-r--r-- | ydb/core/persqueue/read_balancer.cpp | 191 | ||||
-rw-r--r-- | ydb/core/persqueue/read_balancer.h | 70 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/common/pq_ut_common.cpp | 10 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/common/pq_ut_common.h | 2 | ||||
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__init.cpp | 16 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_helpers/helpers.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_stats.cpp | 17 |
10 files changed, 245 insertions, 86 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 31134e29a8..56d77cc2d4 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -127,6 +127,7 @@ struct TEvPQ { EvTxRollback, EvPartitionConfigChanged, EvSubDomainStatus, + EvStatsWakeup, EvEnd }; @@ -785,6 +786,14 @@ struct TEvPQ { bool SubDomainOutOfSpace() const { return Record.GetSubDomainOutOfSpace(); } }; + + struct TEvStatsWakeup : public TEventLocal<TEvStatsWakeup, EvStatsWakeup> { + TEvStatsWakeup(ui64 round) + : Round(round) + {} + + ui64 Round; + }; }; } //NKikimr diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index b229b79cbf..a27bff1cec 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -375,11 +375,12 @@ public: return NKikimrServices::TActivity::PERSQUEUE_ANS_ACTOR; } - TBuilderProxy(const ui64 tabletId, const TActorId& sender, const ui32 count) + TBuilderProxy(const ui64 tabletId, const TActorId& sender, const ui32 count, const ui64 cookie) : TabletId(tabletId) , Sender(sender) , Waiting(count) , Result() + , Cookie(cookie) {} void Bootstrap(const TActorContext& ctx) @@ -405,7 +406,7 @@ private: for (const auto& p : Result) { resp.AddPartResult()->CopyFrom(p); } - ctx.Send(Sender, res.Release()); + ctx.Send(Sender, res.Release(), 0, Cookie); TThis::Die(ctx); } @@ -434,6 +435,7 @@ private: TActorId Sender; ui32 Waiting; TVector<typename T2::TPartResult> Result; + ui64 Cookie; }; @@ -441,17 +443,17 @@ TActorId CreateOffsetsProxyActor(const ui64 tabletId, const TActorId& sender, co { return ctx.Register(new TBuilderProxy<TEvPQ::TEvPartitionOffsetsResponse, NKikimrPQ::TOffsetsResponse, - TEvPersQueue::TEvOffsetsResponse>(tabletId, sender, count)); + TEvPersQueue::TEvOffsetsResponse>(tabletId, sender, count, 0)); } /******************************************************* StatusProxy *********************************************************/ -TActorId CreateStatusProxyActor(const ui64 tabletId, const TActorId& sender, const ui32 count, const TActorContext& ctx) +TActorId CreateStatusProxyActor(const ui64 tabletId, const TActorId& sender, const ui32 count, const ui64 cookie, const TActorContext& ctx) { return ctx.Register(new TBuilderProxy<TEvPQ::TEvPartitionStatusResponse, NKikimrPQ::TStatusResponse, - TEvPersQueue::TEvStatusResponse>(tabletId, sender, count)); + TEvPersQueue::TEvStatusResponse>(tabletId, sender, count, cookie)); } /******************************************************* MonitoringProxy *********************************************************/ @@ -1477,7 +1479,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext& cnt += p.second.InitDone; } - TActorId ans = CreateStatusProxyActor(TabletID(), ev->Sender, cnt, ctx); + TActorId ans = CreateStatusProxyActor(TabletID(), ev->Sender, cnt, ev->Cookie, ctx); for (auto& p : Partitions) { if (!p.second.InitDone) continue; diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 8bc0e419f3..e521e34232 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -68,6 +68,9 @@ bool TPersQueueReadBalancer::TTxInit::Execute(TTransactionContext& txc, const TA ui32 part = partsRowset.GetValue<Schema::Partitions::Partition>(); ui64 tabletId = partsRowset.GetValue<Schema::Partitions::TabletId>(); Self->PartitionsInfo[part] = {tabletId, EPartitionState::EPS_FREE, TActorId(), part + 1}; + Self->AggregatedStats.AggrStats(part, partsRowset.GetValue<Schema::Partitions::DataSize>(), + partsRowset.GetValue<Schema::Partitions::UsedReserveSize>()); + if (!partsRowset.Next()) return false; } @@ -176,6 +179,37 @@ void TPersQueueReadBalancer::TTxWrite::Complete(const TActorContext &ctx) { Self->InitDone(ctx); } +struct TPersQueueReadBalancer::TTxWritePartitionStats : public ITransaction { + TPersQueueReadBalancer * const Self; + + TTxWritePartitionStats(TPersQueueReadBalancer *self) + : Self(self) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + Self->TTxWritePartitionStatsScheduled = false; + + NIceDb::TNiceDb db(txc.DB); + for (auto& s : Self->AggregatedStats.Stats) { + auto partition = s.first; + auto& stats = s.second; + + auto it = Self->PartitionsInfo.find(partition); + if (it == Self->PartitionsInfo.end()) { + continue; + } + + db.Table<Schema::Partitions>().Key(partition).Update( + NIceDb::TUpdate<Schema::Partitions::DataSize>(stats.DataSize), + NIceDb::TUpdate<Schema::Partitions::UsedReserveSize>(stats.UsedReserveSize) + ); + } + + return true; + } + + void Complete(const TActorContext&) override {}; +}; bool TPersQueueReadBalancer::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext& ctx) { if (!ev) { @@ -188,6 +222,7 @@ bool TPersQueueReadBalancer::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr e } TString TPersQueueReadBalancer::GenerateStat() { + auto& metrics = AggregatedStats.Metrics; TStringStream str; HTML(str) { TAG(TH2) {str << "PersQueueReadBalancer Tablet";} @@ -197,13 +232,13 @@ TString TPersQueueReadBalancer::GenerateStat() { TAG(TH3) {str << "ActivePipes: " << PipesInfo.size();} if (Inited) { TAG(TH3) {str << "Active partitions: " << NumActiveParts;} - TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedSec: " << TotalAvgSpeedSec << "/" << MaxAvgSpeedSec << "/" << TotalAvgSpeedSec / NumActiveParts;} - TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedMin: " << TotalAvgSpeedMin << "/" << MaxAvgSpeedMin << "/" << TotalAvgSpeedMin / NumActiveParts;} - TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedHour: " << TotalAvgSpeedHour << "/" << MaxAvgSpeedHour << "/" << TotalAvgSpeedHour / NumActiveParts;} - TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedDay: " << TotalAvgSpeedDay << "/" << MaxAvgSpeedDay << "/" << TotalAvgSpeedDay / NumActiveParts;} - TAG(TH3) {str << "TotalDataSize: " << TotalDataSize;} + TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedSec: " << metrics.TotalAvgWriteSpeedPerSec << "/" << metrics.MaxAvgWriteSpeedPerSec << "/" << metrics.TotalAvgWriteSpeedPerSec / NumActiveParts;} + TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedMin: " << metrics.TotalAvgWriteSpeedPerMin << "/" << metrics.MaxAvgWriteSpeedPerMin << "/" << metrics.TotalAvgWriteSpeedPerMin / NumActiveParts;} + TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedHour: " << metrics.TotalAvgWriteSpeedPerHour << "/" << metrics.MaxAvgWriteSpeedPerHour << "/" << metrics.TotalAvgWriteSpeedPerHour / NumActiveParts;} + TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedDay: " << metrics.TotalAvgWriteSpeedPerDay << "/" << metrics.MaxAvgWriteSpeedPerDay << "/" << metrics.TotalAvgWriteSpeedPerDay / NumActiveParts;} + TAG(TH3) {str << "TotalDataSize: " << AggregatedStats.TotalDataSize;} TAG(TH3) {str << "ReserveSize: " << PartitionReserveSize();} - TAG(TH3) {str << "TotalUsedReserveSize: " << TotalUsedReserveSize;} + TAG(TH3) {str << "TotalUsedReserveSize: " << AggregatedStats.TotalUsedReserveSize;} } UL_CLASS("nav nav-tabs") { @@ -647,44 +682,92 @@ TActorId TPersQueueReadBalancer::GetPipeClient(const ui64 tabletId, const TActor void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TActorContext& ctx) { if ((tabletId == SchemeShardId && !WaitingForACL) || - (tabletId != SchemeShardId && !WaitingForStat.contains(tabletId))) + (tabletId != SchemeShardId && AggregatedStats.Cookies.contains(tabletId))) { return; + } TActorId pipeClient = GetPipeClient(tabletId, ctx); if (tabletId == SchemeShardId) { NTabletPipe::SendData(ctx, pipeClient, new NSchemeShard::TEvSchemeShard::TEvDescribeScheme(tabletId, PathId)); } else { - NTabletPipe::SendData(ctx, pipeClient, new TEvPersQueue::TEvStatus()); + ui64 cookie = ++AggregatedStats.NextCookie; + AggregatedStats.Cookies[tabletId] = cookie; + NTabletPipe::SendData(ctx, pipeClient, new TEvPersQueue::TEvStatus(), cookie); NTabletPipe::SendData(ctx, pipeClient, new TEvPQ::TEvSubDomainStatus(SubDomainOutOfSpace)); } } -void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) -{ +void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) { const auto& record = ev->Get()->Record; ui64 tabletId = record.GetTabletId(); - bool res = WaitingForStat.erase(tabletId); - if (!res) //ignore if already processed + ui64 cookie = ev->Cookie; + + if ((0 != cookie && cookie != AggregatedStats.Cookies[tabletId]) || (0 == cookie && !AggregatedStats.Cookies.contains(tabletId))) { return; + } + + AggregatedStats.Cookies.erase(tabletId); + for (const auto& partRes : record.GetPartResult()) { - TotalAvgSpeedSec += partRes.GetAvgWriteSpeedPerSec(); - MaxAvgSpeedSec = Max<ui64>(MaxAvgSpeedSec, partRes.GetAvgWriteSpeedPerSec()); - TotalAvgSpeedMin += partRes.GetAvgWriteSpeedPerMin(); - MaxAvgSpeedMin = Max<ui64>(MaxAvgSpeedMin, partRes.GetAvgWriteSpeedPerMin()); - TotalAvgSpeedHour += partRes.GetAvgWriteSpeedPerHour(); - MaxAvgSpeedHour = Max<ui64>(MaxAvgSpeedHour, partRes.GetAvgWriteSpeedPerHour()); - TotalAvgSpeedDay += partRes.GetAvgWriteSpeedPerDay(); - MaxAvgSpeedDay = Max<ui64>(MaxAvgSpeedDay, partRes.GetAvgWriteSpeedPerDay()); - - TotalDataSize += partRes.GetPartitionSize(); - TotalUsedReserveSize += partRes.GetUsedReserveSize(); - } - if (WaitingForStat.empty()) { + if (!PartitionsInfo.contains(partRes.GetPartition())) { + continue; + } + + AggregatedStats.AggrStats(partRes.GetPartition(), partRes.GetPartitionSize(), partRes.GetUsedReserveSize()); + AggregatedStats.AggrStats(partRes.GetAvgWriteSpeedPerSec(), partRes.GetAvgWriteSpeedPerMin(), + partRes.GetAvgWriteSpeedPerHour(), partRes.GetAvgWriteSpeedPerDay()); + } + if (AggregatedStats.Cookies.empty()) { CheckStat(ctx); } } +void TPersQueueReadBalancer::Handle(TEvPQ::TEvStatsWakeup::TPtr& ev, const TActorContext& ctx) { + if (AggregatedStats.Round != ev->Get()->Round) { + // old message + return; + } + + if (AggregatedStats.Cookies.empty()) { + return; + } + + CheckStat(ctx); +} + +void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext&) { + Send(ev.Get()->Sender, GetStatsEvent()); +} + +void TPersQueueReadBalancer::TAggregatedStats::AggrStats(ui32 partition, ui64 dataSize, ui64 usedReserveSize) { + Y_VERIFY(dataSize >= usedReserveSize); + + auto& oldValue = Stats[partition]; + + TPartitionStats newValue; + newValue.DataSize = dataSize; + newValue.UsedReserveSize = usedReserveSize; + + TotalDataSize += (newValue.DataSize - oldValue.DataSize); + TotalUsedReserveSize += (newValue.UsedReserveSize - oldValue.UsedReserveSize); + + Y_VERIFY(TotalDataSize >= TotalUsedReserveSize); + + oldValue = newValue; +} + +void TPersQueueReadBalancer::TAggregatedStats::AggrStats(ui64 avgWriteSpeedPerSec, ui64 avgWriteSpeedPerMin, ui64 avgWriteSpeedPerHour, ui64 avgWriteSpeedPerDay) { + NewMetrics.TotalAvgWriteSpeedPerSec += avgWriteSpeedPerSec; + NewMetrics.MaxAvgWriteSpeedPerSec = Max<ui64>(NewMetrics.MaxAvgWriteSpeedPerSec, avgWriteSpeedPerSec); + NewMetrics.TotalAvgWriteSpeedPerMin += avgWriteSpeedPerMin; + NewMetrics.MaxAvgWriteSpeedPerMin = Max<ui64>(NewMetrics.MaxAvgWriteSpeedPerMin, avgWriteSpeedPerMin); + NewMetrics.TotalAvgWriteSpeedPerHour += avgWriteSpeedPerHour; + NewMetrics.MaxAvgWriteSpeedPerHour = Max<ui64>(NewMetrics.MaxAvgWriteSpeedPerHour, avgWriteSpeedPerHour); + NewMetrics.TotalAvgWriteSpeedPerDay += avgWriteSpeedPerDay; + NewMetrics.MaxAvgWriteSpeedPerDay = Max<ui64>(NewMetrics.MaxAvgWriteSpeedPerDay, avgWriteSpeedPerDay); +} + void TPersQueueReadBalancer::AnswerWaitingRequests(const TActorContext& ctx) { TVector<TEvPersQueue::TEvCheckACL::TPtr> ww; ww.swap(WaitingACLRequests); @@ -697,7 +780,6 @@ void TPersQueueReadBalancer::AnswerWaitingRequests(const TActorContext& ctx) { for (auto& r : dr) { Handle(r, ctx); } - } void TPersQueueReadBalancer::Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) { @@ -724,42 +806,59 @@ void TPersQueueReadBalancer::CheckStat(const TActorContext& ctx) { //TODO: Deside about changing number of partitions and send request to SchemeShard //TODO: make AlterTopic request via TX_PROXY - TEvPersQueue::TEvPeriodicTopicStats* ev = new TEvPersQueue::TEvPeriodicTopicStats(); - auto& rec = ev->Record; - rec.SetPathId(PathId); - rec.SetGeneration(Generation); - rec.SetRound(++StatsReportRound); - rec.SetDataSize(TotalDataSize); - rec.SetUsedReserveSize(TotalUsedReserveSize); + if (!TTxWritePartitionStatsScheduled) { + TTxWritePartitionStatsScheduled = true; + Execute(new TTxWritePartitionStats(this)); + } - Y_VERIFY(TotalDataSize >= TotalUsedReserveSize); + AggregatedStats.Metrics = AggregatedStats.NewMetrics; + TEvPersQueue::TEvPeriodicTopicStats* ev = GetStatsEvent() ; LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, TStringBuilder() << "Send TEvPeriodicTopicStats PathId: " << PathId << " Generation: " << Generation << " StatsReportRound: " << StatsReportRound - << " DataSize: " << TotalDataSize - << " UsedReserveSize: " << TotalUsedReserveSize); + << " DataSize: " << AggregatedStats.TotalDataSize + << " UsedReserveSize: " << AggregatedStats.TotalUsedReserveSize); NTabletPipe::SendData(ctx, GetPipeClient(SchemeShardId, ctx), ev); } +TEvPersQueue::TEvPeriodicTopicStats* TPersQueueReadBalancer::GetStatsEvent() { + TEvPersQueue::TEvPeriodicTopicStats* ev = new TEvPersQueue::TEvPeriodicTopicStats(); + auto& rec = ev->Record; + rec.SetPathId(PathId); + rec.SetGeneration(Generation); + rec.SetRound(++StatsReportRound); + rec.SetDataSize(AggregatedStats.TotalDataSize); + rec.SetUsedReserveSize(AggregatedStats.TotalUsedReserveSize); + + return ev; +} + void TPersQueueReadBalancer::GetStat(const TActorContext& ctx) { - if (!WaitingForStat.empty()) //if there is request infly - return; - TotalAvgSpeedSec = MaxAvgSpeedSec = 0; - TotalAvgSpeedMin = MaxAvgSpeedMin = 0; - TotalAvgSpeedHour = MaxAvgSpeedHour = 0; - TotalAvgSpeedDay = MaxAvgSpeedDay = 0; - TotalDataSize = 0; - TotalUsedReserveSize = 0; + if (!AggregatedStats.Cookies.empty()) { + AggregatedStats.Cookies.clear(); + CheckStat(ctx); + } + + TPartitionMetrics newMetrics; + AggregatedStats.NewMetrics = newMetrics; + for (auto& p : PartitionsInfo) { const ui64& tabletId = p.second.TabletId; - bool res = WaitingForStat.insert(tabletId).second; - if (!res) //already asked stat + if (AggregatedStats.Cookies.contains(tabletId)) { //already asked stat continue; + } RequestTabletIfNeeded(tabletId, ctx); } + + // TEvStatsWakeup must processed before next TEvWakeup, which send next status request to TPersQueue + const auto& config = AppData(ctx)->PQConfig; + ui64 delayMs = std::min(config.GetBalancerStatsWakeupIntervalSec() * 1000, config.GetBalancerWakeupIntervalSec() * 500); + if (0 < delayMs) { + Schedule(TDuration::MilliSeconds(delayMs), new TEvPQ::TEvStatsWakeup(++AggregatedStats.Round)); + } } void TPersQueueReadBalancer::GetACL(const TActorContext& ctx) { diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h index d75782a810..fca831618c 100644 --- a/ydb/core/persqueue/read_balancer.h +++ b/ydb/core/persqueue/read_balancer.h @@ -9,6 +9,7 @@ #include <ydb/core/base/appdata.h> #include <library/cpp/actors/core/hfunc.h> #include <ydb/core/persqueue/events/global.h> +#include <ydb/core/persqueue/events/internal.h> #include <ydb/core/tablet_flat/flat_dbase_scheme.h> #include <ydb/core/tablet_flat/flat_cxx_database.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> @@ -71,9 +72,11 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa struct TabletId : Column<33, NScheme::NTypeIds::Uint64> {}; struct State : Column<34, NScheme::NTypeIds::Uint32> {}; + struct DataSize : Column<35, NScheme::NTypeIds::Uint64> {}; + struct UsedReserveSize : Column<36, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey<Partition>; - using TColumns = TableColumns<Partition, TabletId, State>; + using TColumns = TableColumns<Partition, TabletId, State, DataSize, UsedReserveSize>; }; struct Groups : Table<34> { @@ -172,7 +175,6 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa void Complete(const TActorContext &ctx) override; }; - friend struct TTxWrite; void Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext &ctx) { @@ -251,6 +253,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa } RegisterEvents.clear(); + Y_VERIFY(0 < AppData(ctx)->PQConfig.GetBalancerWakeupIntervalSec()); ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetBalancerWakeupIntervalSec()), new TEvents::TEvWakeup()); //TODO: remove it ctx.Send(ctx.SelfID, new TEvPersQueue::TEvUpdateACL()); } @@ -293,6 +296,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa const TActorContext &ctx); void CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &request, const NACLib::TUserToken& token, const TActorContext &ctx); void GetStat(const TActorContext&); + TEvPersQueue::TEvPeriodicTopicStats* GetStatsEvent(); void GetACL(const TActorContext&); void AnswerWaitingRequests(const TActorContext& ctx); @@ -300,7 +304,9 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa void Handle(TEvents::TEvPoisonPill &ev, const TActorContext& ctx); void Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQ::TEvStatsWakeup::TPtr& ev, const TActorContext& ctx); void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext& ctx); void RegisterSession(const TActorId& pipe, const TActorContext& ctx); struct TPipeInfo; @@ -443,18 +449,44 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa THashMap<ui64, TActorId> TabletPipes; - THashSet<ui64> WaitingForStat; bool WaitingForACL; - ui64 TotalAvgSpeedSec; - ui64 MaxAvgSpeedSec; - ui64 TotalAvgSpeedMin; - ui64 MaxAvgSpeedMin; - ui64 TotalAvgSpeedHour; - ui64 MaxAvgSpeedHour; - ui64 TotalAvgSpeedDay; - ui64 MaxAvgSpeedDay; - ui64 TotalDataSize; - ui64 TotalUsedReserveSize; + + struct TPartitionStats { + ui64 DataSize = 0; + ui64 UsedReserveSize = 0; + }; + + struct TPartitionMetrics { + ui64 TotalAvgWriteSpeedPerSec = 0; + ui64 MaxAvgWriteSpeedPerSec = 0; + ui64 TotalAvgWriteSpeedPerMin = 0; + ui64 MaxAvgWriteSpeedPerMin = 0; + ui64 TotalAvgWriteSpeedPerHour = 0; + ui64 MaxAvgWriteSpeedPerHour = 0; + ui64 TotalAvgWriteSpeedPerDay = 0; + ui64 MaxAvgWriteSpeedPerDay = 0; + }; + + struct TAggregatedStats { + THashMap<ui32, TPartitionStats> Stats; + THashMap<ui64, ui64> Cookies; + + ui64 TotalDataSize = 0; + ui64 TotalUsedReserveSize = 0; + + TPartitionMetrics Metrics; + TPartitionMetrics NewMetrics; + + ui64 Round = 0; + ui64 NextCookie = 0; + + void AggrStats(ui32 partition, ui64 dataSize, ui64 usedReserveSize); + void AggrStats(ui64 avgWriteSpeedPerSec, ui64 avgWriteSpeedPerMin, ui64 avgWriteSpeedPerHour, ui64 avgWriteSpeedPerDay); + }; + TAggregatedStats AggregatedStats; + + struct TTxWritePartitionStats; + bool TTxWritePartitionStatsScheduled = false; ui64 StatsReportRound; @@ -493,16 +525,6 @@ public: , NoGroupsInBase(true) , ResourceMetrics(nullptr) , WaitingForACL(false) - , TotalAvgSpeedSec(0) - , MaxAvgSpeedSec(0) - , TotalAvgSpeedMin(0) - , MaxAvgSpeedMin(0) - , TotalAvgSpeedHour(0) - , MaxAvgSpeedHour(0) - , TotalAvgSpeedDay(0) - , MaxAvgSpeedDay(0) - , TotalDataSize(0) - , TotalUsedReserveSize(0) , StatsReportRound(0) {} @@ -548,9 +570,11 @@ public: HFunc(TEvTabletPipe::TEvClientConnected, Handle); HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); HFunc(TEvPersQueue::TEvStatusResponse, Handle); + HFunc(TEvPQ::TEvStatsWakeup, Handle); HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle); HFunc(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound, Handle); HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); + HFunc(TEvPersQueue::TEvStatus, Handle); default: HandleDefaultEvents(ev, ctx); diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp index b4d739612d..e59a7d1c2d 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp @@ -972,4 +972,14 @@ void FillDeprecatedUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, co write->SetValue(idataDeprecated.Data(), idataDeprecated.Size()); } +TEvPersQueue::TEvPeriodicTopicStats* GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId) { + runtime.ResetScheduledCount(); + + TActorId sender = runtime.AllocateEdgeActor(); + runtime.SendToPipe(balancerId, sender, new TEvPersQueue::TEvStatus(), 0, GetPipeConfigWithRetries()); + + TAutoPtr<IEventHandle> handle; + return runtime.GrabEdgeEvent<TEvPersQueue::TEvPeriodicTopicStats>(handle, TDuration::Seconds(2)); +} + } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h index 47166af066..fedb783c0f 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.h +++ b/ydb/core/persqueue/ut/common/pq_ut_common.h @@ -482,4 +482,6 @@ void CmdWrite( bool treatBadOffsetAsError = true, bool disableDeduplication = false); +TEvPersQueue::TEvPeriodicTopicStats* GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId); + } // namespace NKikimr::NPQ diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index e95d4cb050..45c5319e05 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -189,6 +189,7 @@ message TPQConfig { optional TMoveTopicActorConfig MoveTopicActorConfig = 51; optional uint64 BalancerWakeupIntervalSec = 54 [default = 30]; + optional uint64 BalancerStatsWakeupIntervalSec = 55 [default = 5]; } message TChannelProfile { diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index c52b0d0c72..8ec2cf2a76 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -2496,16 +2496,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { TPathId pathId(selfId, localPathId); auto it = Self->Topics.find(pathId); - if (it == Self->Topics.end()) { - continue; - } + if (it != Self->Topics.end()) { + auto& topic = it->second; + topic->Stats.SeqNo = TMessageSeqNo(rowset.GetValue<Schema::PersQueueGroupStats::SeqNoGeneration>(), rowset.GetValue<Schema::PersQueueGroupStats::SeqNoRound>()); + topic->Stats.DataSize = rowset.GetValue<Schema::PersQueueGroupStats::DataSize>(); + topic->Stats.UsedReserveSize = rowset.GetValue<Schema::PersQueueGroupStats::UsedReserveSize>(); - auto& topic = it->second; - topic->Stats.SeqNo = TMessageSeqNo(rowset.GetValue<Schema::PersQueueGroupStats::SeqNoGeneration>(), rowset.GetValue<Schema::PersQueueGroupStats::SeqNoRound>()); - topic->Stats.DataSize = rowset.GetValue<Schema::PersQueueGroupStats::DataSize>(); - topic->Stats.UsedReserveSize = rowset.GetValue<Schema::PersQueueGroupStats::UsedReserveSize>(); - - Self->ResolveDomainInfo(pathId)->AggrDiskSpaceUsage(topic->Stats, {}); + Self->ResolveDomainInfo(pathId)->AggrDiskSpaceUsage(topic->Stats, {}); + } if (!rowset.Next()) { return false; diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h index 75c9b0bae0..f63741da1c 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h @@ -6,6 +6,7 @@ #include <library/cpp/testing/unittest/registar.h> #include <ydb/core/engine/mkql_engine_flat.h> +#include <ydb/core/persqueue/ut/common/pq_ut_common.h> #include <ydb/core/protos/tx_datashard.pb.h> #include <ydb/core/testlib/minikql_compile.h> #include <ydb/core/tx/datashard/datashard.h> diff --git a/ydb/core/tx/schemeshard/ut_stats.cpp b/ydb/core/tx/schemeshard/ut_stats.cpp index b0fd243a03..2139c6a3cb 100644 --- a/ydb/core/tx/schemeshard/ut_stats.cpp +++ b/ydb/core/tx/schemeshard/ut_stats.cpp @@ -498,12 +498,26 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) { env.TestWaitNotification(runtime, txId); Assert(0, 0); // topic is empty + ui64 balancerId = DescribePath(runtime, "/MyRoot/Topic1").GetPathDescription().GetPersQueueGroup().GetBalancerTabletID(); + ui32 msgSeqNo = 100; WriteToTopic(runtime, topicPath, msgSeqNo, "Message 100"); env.SimulateSleep(runtime, TDuration::Seconds(3)); // Wait TEvPeriodicTopicStats - Assert(69, 0); // 67 - it is unstable value. it can change if internal message store change + Assert(69, 0); // 69 - it is unstable value. it can change if internal message store change + + auto stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId); + UNIT_ASSERT_EQUAL_C(69, stats->Record.GetDataSize(), "DataSize from ReadBalancer"); + UNIT_ASSERT_EQUAL_C(0, stats->Record.GetUsedReserveSize(), "UsedReserveSize from ReadBalancer"); + + appData.PQConfig.SetBalancerWakeupIntervalSec(30); + + GracefulRestartTablet(runtime, balancerId, sender); + + stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId); + UNIT_ASSERT_EQUAL_C(69, stats->Record.GetDataSize(), "DataSize from ReadBalancer after reload"); + UNIT_ASSERT_EQUAL_C(0, stats->Record.GetUsedReserveSize(), "UsedReserveSize from ReadBalancer after reload"); } Y_UNIT_TEST(PeriodicTopicStatsReload) { @@ -548,7 +562,6 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) { env.TestWaitNotification(runtime, txId); AssertTopicSize(7, 0); - ui64 topic1Id = DescribePath(runtime, "/MyRoot/Topic1").GetPathDescription().GetSelf().GetPathId(); ui64 generation = 1; |