aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-03-15 09:35:44 +0300
committertesseract <tesseract@yandex-team.com>2023-03-15 09:35:44 +0300
commit6adce5f78300a9c9a5ec82efe9b0b6076052c479 (patch)
treec321b0688caa1a64b2b9f092c37f04a5c8d5165a
parent1b4e8b8c23ea33cfab0574d701b6b9226a3d8b39 (diff)
downloadydb-6adce5f78300a9c9a5ec82efe9b0b6076052c479.tar.gz
Персистить статистику от Partitions в ReadBalancere-е
-rw-r--r--ydb/core/persqueue/events/internal.h9
-rw-r--r--ydb/core/persqueue/pq_impl.cpp14
-rw-r--r--ydb/core/persqueue/read_balancer.cpp191
-rw-r--r--ydb/core/persqueue/read_balancer.h70
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.cpp10
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.h2
-rw-r--r--ydb/core/protos/pqconfig.proto1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp16
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/helpers.h1
-rw-r--r--ydb/core/tx/schemeshard/ut_stats.cpp17
10 files changed, 245 insertions, 86 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index 31134e29a8..56d77cc2d4 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -127,6 +127,7 @@ struct TEvPQ {
EvTxRollback,
EvPartitionConfigChanged,
EvSubDomainStatus,
+ EvStatsWakeup,
EvEnd
};
@@ -785,6 +786,14 @@ struct TEvPQ {
bool SubDomainOutOfSpace() const { return Record.GetSubDomainOutOfSpace(); }
};
+
+ struct TEvStatsWakeup : public TEventLocal<TEvStatsWakeup, EvStatsWakeup> {
+ TEvStatsWakeup(ui64 round)
+ : Round(round)
+ {}
+
+ ui64 Round;
+ };
};
} //NKikimr
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index b229b79cbf..a27bff1cec 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -375,11 +375,12 @@ public:
return NKikimrServices::TActivity::PERSQUEUE_ANS_ACTOR;
}
- TBuilderProxy(const ui64 tabletId, const TActorId& sender, const ui32 count)
+ TBuilderProxy(const ui64 tabletId, const TActorId& sender, const ui32 count, const ui64 cookie)
: TabletId(tabletId)
, Sender(sender)
, Waiting(count)
, Result()
+ , Cookie(cookie)
{}
void Bootstrap(const TActorContext& ctx)
@@ -405,7 +406,7 @@ private:
for (const auto& p : Result) {
resp.AddPartResult()->CopyFrom(p);
}
- ctx.Send(Sender, res.Release());
+ ctx.Send(Sender, res.Release(), 0, Cookie);
TThis::Die(ctx);
}
@@ -434,6 +435,7 @@ private:
TActorId Sender;
ui32 Waiting;
TVector<typename T2::TPartResult> Result;
+ ui64 Cookie;
};
@@ -441,17 +443,17 @@ TActorId CreateOffsetsProxyActor(const ui64 tabletId, const TActorId& sender, co
{
return ctx.Register(new TBuilderProxy<TEvPQ::TEvPartitionOffsetsResponse,
NKikimrPQ::TOffsetsResponse,
- TEvPersQueue::TEvOffsetsResponse>(tabletId, sender, count));
+ TEvPersQueue::TEvOffsetsResponse>(tabletId, sender, count, 0));
}
/******************************************************* StatusProxy *********************************************************/
-TActorId CreateStatusProxyActor(const ui64 tabletId, const TActorId& sender, const ui32 count, const TActorContext& ctx)
+TActorId CreateStatusProxyActor(const ui64 tabletId, const TActorId& sender, const ui32 count, const ui64 cookie, const TActorContext& ctx)
{
return ctx.Register(new TBuilderProxy<TEvPQ::TEvPartitionStatusResponse,
NKikimrPQ::TStatusResponse,
- TEvPersQueue::TEvStatusResponse>(tabletId, sender, count));
+ TEvPersQueue::TEvStatusResponse>(tabletId, sender, count, cookie));
}
/******************************************************* MonitoringProxy *********************************************************/
@@ -1477,7 +1479,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext&
cnt += p.second.InitDone;
}
- TActorId ans = CreateStatusProxyActor(TabletID(), ev->Sender, cnt, ctx);
+ TActorId ans = CreateStatusProxyActor(TabletID(), ev->Sender, cnt, ev->Cookie, ctx);
for (auto& p : Partitions) {
if (!p.second.InitDone)
continue;
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp
index 8bc0e419f3..e521e34232 100644
--- a/ydb/core/persqueue/read_balancer.cpp
+++ b/ydb/core/persqueue/read_balancer.cpp
@@ -68,6 +68,9 @@ bool TPersQueueReadBalancer::TTxInit::Execute(TTransactionContext& txc, const TA
ui32 part = partsRowset.GetValue<Schema::Partitions::Partition>();
ui64 tabletId = partsRowset.GetValue<Schema::Partitions::TabletId>();
Self->PartitionsInfo[part] = {tabletId, EPartitionState::EPS_FREE, TActorId(), part + 1};
+ Self->AggregatedStats.AggrStats(part, partsRowset.GetValue<Schema::Partitions::DataSize>(),
+ partsRowset.GetValue<Schema::Partitions::UsedReserveSize>());
+
if (!partsRowset.Next())
return false;
}
@@ -176,6 +179,37 @@ void TPersQueueReadBalancer::TTxWrite::Complete(const TActorContext &ctx) {
Self->InitDone(ctx);
}
+struct TPersQueueReadBalancer::TTxWritePartitionStats : public ITransaction {
+ TPersQueueReadBalancer * const Self;
+
+ TTxWritePartitionStats(TPersQueueReadBalancer *self)
+ : Self(self)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ Self->TTxWritePartitionStatsScheduled = false;
+
+ NIceDb::TNiceDb db(txc.DB);
+ for (auto& s : Self->AggregatedStats.Stats) {
+ auto partition = s.first;
+ auto& stats = s.second;
+
+ auto it = Self->PartitionsInfo.find(partition);
+ if (it == Self->PartitionsInfo.end()) {
+ continue;
+ }
+
+ db.Table<Schema::Partitions>().Key(partition).Update(
+ NIceDb::TUpdate<Schema::Partitions::DataSize>(stats.DataSize),
+ NIceDb::TUpdate<Schema::Partitions::UsedReserveSize>(stats.UsedReserveSize)
+ );
+ }
+
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {};
+};
bool TPersQueueReadBalancer::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext& ctx) {
if (!ev) {
@@ -188,6 +222,7 @@ bool TPersQueueReadBalancer::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr e
}
TString TPersQueueReadBalancer::GenerateStat() {
+ auto& metrics = AggregatedStats.Metrics;
TStringStream str;
HTML(str) {
TAG(TH2) {str << "PersQueueReadBalancer Tablet";}
@@ -197,13 +232,13 @@ TString TPersQueueReadBalancer::GenerateStat() {
TAG(TH3) {str << "ActivePipes: " << PipesInfo.size();}
if (Inited) {
TAG(TH3) {str << "Active partitions: " << NumActiveParts;}
- TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedSec: " << TotalAvgSpeedSec << "/" << MaxAvgSpeedSec << "/" << TotalAvgSpeedSec / NumActiveParts;}
- TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedMin: " << TotalAvgSpeedMin << "/" << MaxAvgSpeedMin << "/" << TotalAvgSpeedMin / NumActiveParts;}
- TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedHour: " << TotalAvgSpeedHour << "/" << MaxAvgSpeedHour << "/" << TotalAvgSpeedHour / NumActiveParts;}
- TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedDay: " << TotalAvgSpeedDay << "/" << MaxAvgSpeedDay << "/" << TotalAvgSpeedDay / NumActiveParts;}
- TAG(TH3) {str << "TotalDataSize: " << TotalDataSize;}
+ TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedSec: " << metrics.TotalAvgWriteSpeedPerSec << "/" << metrics.MaxAvgWriteSpeedPerSec << "/" << metrics.TotalAvgWriteSpeedPerSec / NumActiveParts;}
+ TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedMin: " << metrics.TotalAvgWriteSpeedPerMin << "/" << metrics.MaxAvgWriteSpeedPerMin << "/" << metrics.TotalAvgWriteSpeedPerMin / NumActiveParts;}
+ TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedHour: " << metrics.TotalAvgWriteSpeedPerHour << "/" << metrics.MaxAvgWriteSpeedPerHour << "/" << metrics.TotalAvgWriteSpeedPerHour / NumActiveParts;}
+ TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedDay: " << metrics.TotalAvgWriteSpeedPerDay << "/" << metrics.MaxAvgWriteSpeedPerDay << "/" << metrics.TotalAvgWriteSpeedPerDay / NumActiveParts;}
+ TAG(TH3) {str << "TotalDataSize: " << AggregatedStats.TotalDataSize;}
TAG(TH3) {str << "ReserveSize: " << PartitionReserveSize();}
- TAG(TH3) {str << "TotalUsedReserveSize: " << TotalUsedReserveSize;}
+ TAG(TH3) {str << "TotalUsedReserveSize: " << AggregatedStats.TotalUsedReserveSize;}
}
UL_CLASS("nav nav-tabs") {
@@ -647,44 +682,92 @@ TActorId TPersQueueReadBalancer::GetPipeClient(const ui64 tabletId, const TActor
void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TActorContext& ctx)
{
if ((tabletId == SchemeShardId && !WaitingForACL) ||
- (tabletId != SchemeShardId && !WaitingForStat.contains(tabletId)))
+ (tabletId != SchemeShardId && AggregatedStats.Cookies.contains(tabletId))) {
return;
+ }
TActorId pipeClient = GetPipeClient(tabletId, ctx);
if (tabletId == SchemeShardId) {
NTabletPipe::SendData(ctx, pipeClient, new NSchemeShard::TEvSchemeShard::TEvDescribeScheme(tabletId, PathId));
} else {
- NTabletPipe::SendData(ctx, pipeClient, new TEvPersQueue::TEvStatus());
+ ui64 cookie = ++AggregatedStats.NextCookie;
+ AggregatedStats.Cookies[tabletId] = cookie;
+ NTabletPipe::SendData(ctx, pipeClient, new TEvPersQueue::TEvStatus(), cookie);
NTabletPipe::SendData(ctx, pipeClient, new TEvPQ::TEvSubDomainStatus(SubDomainOutOfSpace));
}
}
-void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx)
-{
+void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->Record;
ui64 tabletId = record.GetTabletId();
- bool res = WaitingForStat.erase(tabletId);
- if (!res) //ignore if already processed
+ ui64 cookie = ev->Cookie;
+
+ if ((0 != cookie && cookie != AggregatedStats.Cookies[tabletId]) || (0 == cookie && !AggregatedStats.Cookies.contains(tabletId))) {
return;
+ }
+
+ AggregatedStats.Cookies.erase(tabletId);
+
for (const auto& partRes : record.GetPartResult()) {
- TotalAvgSpeedSec += partRes.GetAvgWriteSpeedPerSec();
- MaxAvgSpeedSec = Max<ui64>(MaxAvgSpeedSec, partRes.GetAvgWriteSpeedPerSec());
- TotalAvgSpeedMin += partRes.GetAvgWriteSpeedPerMin();
- MaxAvgSpeedMin = Max<ui64>(MaxAvgSpeedMin, partRes.GetAvgWriteSpeedPerMin());
- TotalAvgSpeedHour += partRes.GetAvgWriteSpeedPerHour();
- MaxAvgSpeedHour = Max<ui64>(MaxAvgSpeedHour, partRes.GetAvgWriteSpeedPerHour());
- TotalAvgSpeedDay += partRes.GetAvgWriteSpeedPerDay();
- MaxAvgSpeedDay = Max<ui64>(MaxAvgSpeedDay, partRes.GetAvgWriteSpeedPerDay());
-
- TotalDataSize += partRes.GetPartitionSize();
- TotalUsedReserveSize += partRes.GetUsedReserveSize();
- }
- if (WaitingForStat.empty()) {
+ if (!PartitionsInfo.contains(partRes.GetPartition())) {
+ continue;
+ }
+
+ AggregatedStats.AggrStats(partRes.GetPartition(), partRes.GetPartitionSize(), partRes.GetUsedReserveSize());
+ AggregatedStats.AggrStats(partRes.GetAvgWriteSpeedPerSec(), partRes.GetAvgWriteSpeedPerMin(),
+ partRes.GetAvgWriteSpeedPerHour(), partRes.GetAvgWriteSpeedPerDay());
+ }
+ if (AggregatedStats.Cookies.empty()) {
CheckStat(ctx);
}
}
+void TPersQueueReadBalancer::Handle(TEvPQ::TEvStatsWakeup::TPtr& ev, const TActorContext& ctx) {
+ if (AggregatedStats.Round != ev->Get()->Round) {
+ // old message
+ return;
+ }
+
+ if (AggregatedStats.Cookies.empty()) {
+ return;
+ }
+
+ CheckStat(ctx);
+}
+
+void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext&) {
+ Send(ev.Get()->Sender, GetStatsEvent());
+}
+
+void TPersQueueReadBalancer::TAggregatedStats::AggrStats(ui32 partition, ui64 dataSize, ui64 usedReserveSize) {
+ Y_VERIFY(dataSize >= usedReserveSize);
+
+ auto& oldValue = Stats[partition];
+
+ TPartitionStats newValue;
+ newValue.DataSize = dataSize;
+ newValue.UsedReserveSize = usedReserveSize;
+
+ TotalDataSize += (newValue.DataSize - oldValue.DataSize);
+ TotalUsedReserveSize += (newValue.UsedReserveSize - oldValue.UsedReserveSize);
+
+ Y_VERIFY(TotalDataSize >= TotalUsedReserveSize);
+
+ oldValue = newValue;
+}
+
+void TPersQueueReadBalancer::TAggregatedStats::AggrStats(ui64 avgWriteSpeedPerSec, ui64 avgWriteSpeedPerMin, ui64 avgWriteSpeedPerHour, ui64 avgWriteSpeedPerDay) {
+ NewMetrics.TotalAvgWriteSpeedPerSec += avgWriteSpeedPerSec;
+ NewMetrics.MaxAvgWriteSpeedPerSec = Max<ui64>(NewMetrics.MaxAvgWriteSpeedPerSec, avgWriteSpeedPerSec);
+ NewMetrics.TotalAvgWriteSpeedPerMin += avgWriteSpeedPerMin;
+ NewMetrics.MaxAvgWriteSpeedPerMin = Max<ui64>(NewMetrics.MaxAvgWriteSpeedPerMin, avgWriteSpeedPerMin);
+ NewMetrics.TotalAvgWriteSpeedPerHour += avgWriteSpeedPerHour;
+ NewMetrics.MaxAvgWriteSpeedPerHour = Max<ui64>(NewMetrics.MaxAvgWriteSpeedPerHour, avgWriteSpeedPerHour);
+ NewMetrics.TotalAvgWriteSpeedPerDay += avgWriteSpeedPerDay;
+ NewMetrics.MaxAvgWriteSpeedPerDay = Max<ui64>(NewMetrics.MaxAvgWriteSpeedPerDay, avgWriteSpeedPerDay);
+}
+
void TPersQueueReadBalancer::AnswerWaitingRequests(const TActorContext& ctx) {
TVector<TEvPersQueue::TEvCheckACL::TPtr> ww;
ww.swap(WaitingACLRequests);
@@ -697,7 +780,6 @@ void TPersQueueReadBalancer::AnswerWaitingRequests(const TActorContext& ctx) {
for (auto& r : dr) {
Handle(r, ctx);
}
-
}
void TPersQueueReadBalancer::Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) {
@@ -724,42 +806,59 @@ void TPersQueueReadBalancer::CheckStat(const TActorContext& ctx) {
//TODO: Deside about changing number of partitions and send request to SchemeShard
//TODO: make AlterTopic request via TX_PROXY
- TEvPersQueue::TEvPeriodicTopicStats* ev = new TEvPersQueue::TEvPeriodicTopicStats();
- auto& rec = ev->Record;
- rec.SetPathId(PathId);
- rec.SetGeneration(Generation);
- rec.SetRound(++StatsReportRound);
- rec.SetDataSize(TotalDataSize);
- rec.SetUsedReserveSize(TotalUsedReserveSize);
+ if (!TTxWritePartitionStatsScheduled) {
+ TTxWritePartitionStatsScheduled = true;
+ Execute(new TTxWritePartitionStats(this));
+ }
- Y_VERIFY(TotalDataSize >= TotalUsedReserveSize);
+ AggregatedStats.Metrics = AggregatedStats.NewMetrics;
+ TEvPersQueue::TEvPeriodicTopicStats* ev = GetStatsEvent() ;
LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
TStringBuilder() << "Send TEvPeriodicTopicStats PathId: " << PathId
<< " Generation: " << Generation
<< " StatsReportRound: " << StatsReportRound
- << " DataSize: " << TotalDataSize
- << " UsedReserveSize: " << TotalUsedReserveSize);
+ << " DataSize: " << AggregatedStats.TotalDataSize
+ << " UsedReserveSize: " << AggregatedStats.TotalUsedReserveSize);
NTabletPipe::SendData(ctx, GetPipeClient(SchemeShardId, ctx), ev);
}
+TEvPersQueue::TEvPeriodicTopicStats* TPersQueueReadBalancer::GetStatsEvent() {
+ TEvPersQueue::TEvPeriodicTopicStats* ev = new TEvPersQueue::TEvPeriodicTopicStats();
+ auto& rec = ev->Record;
+ rec.SetPathId(PathId);
+ rec.SetGeneration(Generation);
+ rec.SetRound(++StatsReportRound);
+ rec.SetDataSize(AggregatedStats.TotalDataSize);
+ rec.SetUsedReserveSize(AggregatedStats.TotalUsedReserveSize);
+
+ return ev;
+}
+
void TPersQueueReadBalancer::GetStat(const TActorContext& ctx) {
- if (!WaitingForStat.empty()) //if there is request infly
- return;
- TotalAvgSpeedSec = MaxAvgSpeedSec = 0;
- TotalAvgSpeedMin = MaxAvgSpeedMin = 0;
- TotalAvgSpeedHour = MaxAvgSpeedHour = 0;
- TotalAvgSpeedDay = MaxAvgSpeedDay = 0;
- TotalDataSize = 0;
- TotalUsedReserveSize = 0;
+ if (!AggregatedStats.Cookies.empty()) {
+ AggregatedStats.Cookies.clear();
+ CheckStat(ctx);
+ }
+
+ TPartitionMetrics newMetrics;
+ AggregatedStats.NewMetrics = newMetrics;
+
for (auto& p : PartitionsInfo) {
const ui64& tabletId = p.second.TabletId;
- bool res = WaitingForStat.insert(tabletId).second;
- if (!res) //already asked stat
+ if (AggregatedStats.Cookies.contains(tabletId)) { //already asked stat
continue;
+ }
RequestTabletIfNeeded(tabletId, ctx);
}
+
+ // TEvStatsWakeup must processed before next TEvWakeup, which send next status request to TPersQueue
+ const auto& config = AppData(ctx)->PQConfig;
+ ui64 delayMs = std::min(config.GetBalancerStatsWakeupIntervalSec() * 1000, config.GetBalancerWakeupIntervalSec() * 500);
+ if (0 < delayMs) {
+ Schedule(TDuration::MilliSeconds(delayMs), new TEvPQ::TEvStatsWakeup(++AggregatedStats.Round));
+ }
}
void TPersQueueReadBalancer::GetACL(const TActorContext& ctx) {
diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h
index d75782a810..fca831618c 100644
--- a/ydb/core/persqueue/read_balancer.h
+++ b/ydb/core/persqueue/read_balancer.h
@@ -9,6 +9,7 @@
#include <ydb/core/base/appdata.h>
#include <library/cpp/actors/core/hfunc.h>
#include <ydb/core/persqueue/events/global.h>
+#include <ydb/core/persqueue/events/internal.h>
#include <ydb/core/tablet_flat/flat_dbase_scheme.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
@@ -71,9 +72,11 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
struct TabletId : Column<33, NScheme::NTypeIds::Uint64> {};
struct State : Column<34, NScheme::NTypeIds::Uint32> {};
+ struct DataSize : Column<35, NScheme::NTypeIds::Uint64> {};
+ struct UsedReserveSize : Column<36, NScheme::NTypeIds::Uint64> {};
using TKey = TableKey<Partition>;
- using TColumns = TableColumns<Partition, TabletId, State>;
+ using TColumns = TableColumns<Partition, TabletId, State, DataSize, UsedReserveSize>;
};
struct Groups : Table<34> {
@@ -172,7 +175,6 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
void Complete(const TActorContext &ctx) override;
};
-
friend struct TTxWrite;
void Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext &ctx) {
@@ -251,6 +253,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
}
RegisterEvents.clear();
+ Y_VERIFY(0 < AppData(ctx)->PQConfig.GetBalancerWakeupIntervalSec());
ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetBalancerWakeupIntervalSec()), new TEvents::TEvWakeup()); //TODO: remove it
ctx.Send(ctx.SelfID, new TEvPersQueue::TEvUpdateACL());
}
@@ -293,6 +296,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
const TActorContext &ctx);
void CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &request, const NACLib::TUserToken& token, const TActorContext &ctx);
void GetStat(const TActorContext&);
+ TEvPersQueue::TEvPeriodicTopicStats* GetStatsEvent();
void GetACL(const TActorContext&);
void AnswerWaitingRequests(const TActorContext& ctx);
@@ -300,7 +304,9 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
void Handle(TEvents::TEvPoisonPill &ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQ::TEvStatsWakeup::TPtr& ev, const TActorContext& ctx);
void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext& ctx);
void RegisterSession(const TActorId& pipe, const TActorContext& ctx);
struct TPipeInfo;
@@ -443,18 +449,44 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
THashMap<ui64, TActorId> TabletPipes;
- THashSet<ui64> WaitingForStat;
bool WaitingForACL;
- ui64 TotalAvgSpeedSec;
- ui64 MaxAvgSpeedSec;
- ui64 TotalAvgSpeedMin;
- ui64 MaxAvgSpeedMin;
- ui64 TotalAvgSpeedHour;
- ui64 MaxAvgSpeedHour;
- ui64 TotalAvgSpeedDay;
- ui64 MaxAvgSpeedDay;
- ui64 TotalDataSize;
- ui64 TotalUsedReserveSize;
+
+ struct TPartitionStats {
+ ui64 DataSize = 0;
+ ui64 UsedReserveSize = 0;
+ };
+
+ struct TPartitionMetrics {
+ ui64 TotalAvgWriteSpeedPerSec = 0;
+ ui64 MaxAvgWriteSpeedPerSec = 0;
+ ui64 TotalAvgWriteSpeedPerMin = 0;
+ ui64 MaxAvgWriteSpeedPerMin = 0;
+ ui64 TotalAvgWriteSpeedPerHour = 0;
+ ui64 MaxAvgWriteSpeedPerHour = 0;
+ ui64 TotalAvgWriteSpeedPerDay = 0;
+ ui64 MaxAvgWriteSpeedPerDay = 0;
+ };
+
+ struct TAggregatedStats {
+ THashMap<ui32, TPartitionStats> Stats;
+ THashMap<ui64, ui64> Cookies;
+
+ ui64 TotalDataSize = 0;
+ ui64 TotalUsedReserveSize = 0;
+
+ TPartitionMetrics Metrics;
+ TPartitionMetrics NewMetrics;
+
+ ui64 Round = 0;
+ ui64 NextCookie = 0;
+
+ void AggrStats(ui32 partition, ui64 dataSize, ui64 usedReserveSize);
+ void AggrStats(ui64 avgWriteSpeedPerSec, ui64 avgWriteSpeedPerMin, ui64 avgWriteSpeedPerHour, ui64 avgWriteSpeedPerDay);
+ };
+ TAggregatedStats AggregatedStats;
+
+ struct TTxWritePartitionStats;
+ bool TTxWritePartitionStatsScheduled = false;
ui64 StatsReportRound;
@@ -493,16 +525,6 @@ public:
, NoGroupsInBase(true)
, ResourceMetrics(nullptr)
, WaitingForACL(false)
- , TotalAvgSpeedSec(0)
- , MaxAvgSpeedSec(0)
- , TotalAvgSpeedMin(0)
- , MaxAvgSpeedMin(0)
- , TotalAvgSpeedHour(0)
- , MaxAvgSpeedHour(0)
- , TotalAvgSpeedDay(0)
- , MaxAvgSpeedDay(0)
- , TotalDataSize(0)
- , TotalUsedReserveSize(0)
, StatsReportRound(0)
{}
@@ -548,9 +570,11 @@ public:
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
HFunc(TEvPersQueue::TEvStatusResponse, Handle);
+ HFunc(TEvPQ::TEvStatsWakeup, Handle);
HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle);
HFunc(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound, Handle);
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
+ HFunc(TEvPersQueue::TEvStatus, Handle);
default:
HandleDefaultEvents(ev, ctx);
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
index b4d739612d..e59a7d1c2d 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
@@ -972,4 +972,14 @@ void FillDeprecatedUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, co
write->SetValue(idataDeprecated.Data(), idataDeprecated.Size());
}
+TEvPersQueue::TEvPeriodicTopicStats* GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId) {
+ runtime.ResetScheduledCount();
+
+ TActorId sender = runtime.AllocateEdgeActor();
+ runtime.SendToPipe(balancerId, sender, new TEvPersQueue::TEvStatus(), 0, GetPipeConfigWithRetries());
+
+ TAutoPtr<IEventHandle> handle;
+ return runtime.GrabEdgeEvent<TEvPersQueue::TEvPeriodicTopicStats>(handle, TDuration::Seconds(2));
+}
+
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h
index 47166af066..fedb783c0f 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.h
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.h
@@ -482,4 +482,6 @@ void CmdWrite(
bool treatBadOffsetAsError = true,
bool disableDeduplication = false);
+TEvPersQueue::TEvPeriodicTopicStats* GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId);
+
} // namespace NKikimr::NPQ
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index e95d4cb050..45c5319e05 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -189,6 +189,7 @@ message TPQConfig {
optional TMoveTopicActorConfig MoveTopicActorConfig = 51;
optional uint64 BalancerWakeupIntervalSec = 54 [default = 30];
+ optional uint64 BalancerStatsWakeupIntervalSec = 55 [default = 5];
}
message TChannelProfile {
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index c52b0d0c72..8ec2cf2a76 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -2496,16 +2496,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
TPathId pathId(selfId, localPathId);
auto it = Self->Topics.find(pathId);
- if (it == Self->Topics.end()) {
- continue;
- }
+ if (it != Self->Topics.end()) {
+ auto& topic = it->second;
+ topic->Stats.SeqNo = TMessageSeqNo(rowset.GetValue<Schema::PersQueueGroupStats::SeqNoGeneration>(), rowset.GetValue<Schema::PersQueueGroupStats::SeqNoRound>());
+ topic->Stats.DataSize = rowset.GetValue<Schema::PersQueueGroupStats::DataSize>();
+ topic->Stats.UsedReserveSize = rowset.GetValue<Schema::PersQueueGroupStats::UsedReserveSize>();
- auto& topic = it->second;
- topic->Stats.SeqNo = TMessageSeqNo(rowset.GetValue<Schema::PersQueueGroupStats::SeqNoGeneration>(), rowset.GetValue<Schema::PersQueueGroupStats::SeqNoRound>());
- topic->Stats.DataSize = rowset.GetValue<Schema::PersQueueGroupStats::DataSize>();
- topic->Stats.UsedReserveSize = rowset.GetValue<Schema::PersQueueGroupStats::UsedReserveSize>();
-
- Self->ResolveDomainInfo(pathId)->AggrDiskSpaceUsage(topic->Stats, {});
+ Self->ResolveDomainInfo(pathId)->AggrDiskSpaceUsage(topic->Stats, {});
+ }
if (!rowset.Next()) {
return false;
diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h
index 75c9b0bae0..f63741da1c 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h
@@ -6,6 +6,7 @@
#include <library/cpp/testing/unittest/registar.h>
#include <ydb/core/engine/mkql_engine_flat.h>
+#include <ydb/core/persqueue/ut/common/pq_ut_common.h>
#include <ydb/core/protos/tx_datashard.pb.h>
#include <ydb/core/testlib/minikql_compile.h>
#include <ydb/core/tx/datashard/datashard.h>
diff --git a/ydb/core/tx/schemeshard/ut_stats.cpp b/ydb/core/tx/schemeshard/ut_stats.cpp
index b0fd243a03..2139c6a3cb 100644
--- a/ydb/core/tx/schemeshard/ut_stats.cpp
+++ b/ydb/core/tx/schemeshard/ut_stats.cpp
@@ -498,12 +498,26 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) {
env.TestWaitNotification(runtime, txId);
Assert(0, 0); // topic is empty
+ ui64 balancerId = DescribePath(runtime, "/MyRoot/Topic1").GetPathDescription().GetPersQueueGroup().GetBalancerTabletID();
+
ui32 msgSeqNo = 100;
WriteToTopic(runtime, topicPath, msgSeqNo, "Message 100");
env.SimulateSleep(runtime, TDuration::Seconds(3)); // Wait TEvPeriodicTopicStats
- Assert(69, 0); // 67 - it is unstable value. it can change if internal message store change
+ Assert(69, 0); // 69 - it is unstable value. it can change if internal message store change
+
+ auto stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId);
+ UNIT_ASSERT_EQUAL_C(69, stats->Record.GetDataSize(), "DataSize from ReadBalancer");
+ UNIT_ASSERT_EQUAL_C(0, stats->Record.GetUsedReserveSize(), "UsedReserveSize from ReadBalancer");
+
+ appData.PQConfig.SetBalancerWakeupIntervalSec(30);
+
+ GracefulRestartTablet(runtime, balancerId, sender);
+
+ stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId);
+ UNIT_ASSERT_EQUAL_C(69, stats->Record.GetDataSize(), "DataSize from ReadBalancer after reload");
+ UNIT_ASSERT_EQUAL_C(0, stats->Record.GetUsedReserveSize(), "UsedReserveSize from ReadBalancer after reload");
}
Y_UNIT_TEST(PeriodicTopicStatsReload) {
@@ -548,7 +562,6 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) {
env.TestWaitNotification(runtime, txId);
AssertTopicSize(7, 0);
-
ui64 topic1Id = DescribePath(runtime, "/MyRoot/Topic1").GetPathDescription().GetSelf().GetPathId();
ui64 generation = 1;