aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-03-01 11:23:46 +0300
committertesseract <tesseract@yandex-team.com>2023-03-01 11:23:46 +0300
commitc400b24651590ee3e66795b13cf40dcdc5fb50c9 (patch)
tree1975b0bde8e39325817978ed84d7bdeee81b6c75
parent22f99c1e277287a012a9b629919dbc8874d79912 (diff)
downloadydb-c400b24651590ee3e66795b13cf40dcdc5fb50c9.tar.gz
В schemeshard-е поддерживать статистику об используемом месте топиками пользователя
Мержить после https://a.yandex-team.ru/review/3501052/details До разделения на два ПР был https://a.yandex-team.ru/review/3465600/details
-rw-r--r--ydb/core/persqueue/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.linux.txt1
-rw-r--r--ydb/core/persqueue/events/global.h4
-rw-r--r--ydb/core/persqueue/partition.cpp33
-rw-r--r--ydb/core/persqueue/partition.h10
-rw-r--r--ydb/core/persqueue/read_balancer.cpp45
-rw-r--r--ydb/core/persqueue/read_balancer.h20
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.cpp53
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.h46
-rw-r--r--ydb/core/persqueue/utils.cpp22
-rw-r--r--ydb/core/persqueue/utils.h10
-rw-r--r--ydb/core/protos/counters_schemeshard.proto15
-rw-r--r--ydb/core/protos/pqconfig.proto14
-rw-r--r--ydb/core/protos/subdomains.proto3
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.linux.txt1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp123
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__stats.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h9
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h27
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_private.h5
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_utils.cpp19
-rw-r--r--ydb/core/tx/schemeshard/ut_base.cpp5
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt1
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/helpers.cpp28
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/helpers.h3
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp36
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/ls_checks.h4
-rw-r--r--ydb/core/tx/schemeshard/ut_stats.cpp208
38 files changed, 700 insertions, 67 deletions
diff --git a/ydb/core/persqueue/CMakeLists.darwin.txt b/ydb/core/persqueue/CMakeLists.darwin.txt
index e9cdb1f022..fc8c7d2dba 100644
--- a/ydb/core/persqueue/CMakeLists.darwin.txt
+++ b/ydb/core/persqueue/CMakeLists.darwin.txt
@@ -60,6 +60,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/transaction.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/type_codecs_defs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/user_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/utils.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/write_meta.cpp
)
generate_enum_serilization(ydb-core-persqueue
diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
index 1c810979a4..0923957c1a 100644
--- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
@@ -61,6 +61,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/transaction.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/type_codecs_defs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/user_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/utils.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/write_meta.cpp
)
generate_enum_serilization(ydb-core-persqueue
diff --git a/ydb/core/persqueue/CMakeLists.linux.txt b/ydb/core/persqueue/CMakeLists.linux.txt
index 1c810979a4..0923957c1a 100644
--- a/ydb/core/persqueue/CMakeLists.linux.txt
+++ b/ydb/core/persqueue/CMakeLists.linux.txt
@@ -61,6 +61,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/transaction.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/type_codecs_defs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/user_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/utils.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/write_meta.cpp
)
generate_enum_serilization(ydb-core-persqueue
diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h
index 15a7964fd8..7d40a67955 100644
--- a/ydb/core/persqueue/events/global.h
+++ b/ydb/core/persqueue/events/global.h
@@ -45,6 +45,7 @@ struct TEvPersQueue {
EvProposeTransaction,
EvProposeTransactionResult,
EvCancelTransactionProposal,
+ EvPeriodicTopicStats,
EvResponse = EvRequest + 256,
EvInternalEvents = EvResponse + 256,
EvEnd
@@ -235,5 +236,8 @@ struct TEvPersQueue {
struct TEvCancelTransactionProposal : public TEventPB<TEvCancelTransactionProposal, NKikimrPQ::TEvCancelTransactionProposal, EvCancelTransactionProposal> {
};
+ struct TEvPeriodicTopicStats : public TEventPB<TEvPeriodicTopicStats, NKikimrPQ::TEvPeriodicTopicStats, EvPeriodicTopicStats> {
+ };
+
};
} //NKikimr
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index d885758c10..f47a432d6e 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -1107,15 +1107,26 @@ void TPartition::HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActo
}
-ui64 TPartition::GetUsedStorage(const TActorContext& ctx) {
- auto duration = ctx.Now() - LastUsedStorageMeterTimestamp;
- LastUsedStorageMeterTimestamp = ctx.Now();
+ui64 TPartition::MeteringDataSize(const TActorContext& ctx) const {
ui64 size = Size();
- if (DataKeysBody.size() > 0) {
+ if (!DataKeysBody.empty()) {
size -= DataKeysBody.front().Size;
- } else {
- size = 0;
}
+ auto expired = ctx.Now() - TDuration::Seconds(Config.GetPartitionConfig().GetLifetimeSeconds());
+ for(size_t i = 0; i < HeadKeys.size(); ++i) {
+ auto& key = HeadKeys[i];
+ if (expired < key.Timestamp) {
+ break;
+ }
+ size -= key.Size;
+ }
+ return size;
+}
+
+ui64 TPartition::GetUsedStorage(const TActorContext& ctx) {
+ auto duration = ctx.Now() - LastUsedStorageMeterTimestamp;
+ LastUsedStorageMeterTimestamp = ctx.Now();
+ ui64 size = MeteringDataSize(ctx);
return size * duration.MilliSeconds() / 1000 / 1_MB; // mb*seconds
}
@@ -2203,7 +2214,8 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
result.SetReadBytesQuota(maxQuota);
- result.SetPartitionSize(Size());
+ result.SetPartitionSize(MeteringDataSize(ctx));
+ result.SetUsedReserveSize(UsedReserveSize());
result.SetStartOffset(StartOffset);
result.SetEndOffset(EndOffset);
@@ -2212,6 +2224,13 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
*result.MutableErrors() = {Errors.begin(), Errors.end()};
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
+ "Topic PartitionStatus PartitionSize: " << result.GetPartitionSize()
+ << " UsedReserveSize: " << result.GetUsedReserveSize()
+ << " ReserveSize: " << ReserveSize()
+ << " PartitionConfig" << Config.GetPartitionConfig();
+ );
+
ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionStatusResponse(result));
}
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 880e5aea9f..6379b7e14b 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -8,6 +8,7 @@
#include "sourceid.h"
#include "subscriber.h"
#include "user_info.h"
+#include "utils.h"
#include <ydb/core/keyvalue/keyvalue_events.h>
#include <ydb/library/persqueue/counter_time_keeper/counter_time_keeper.h>
@@ -308,6 +309,15 @@ public:
return BodySize + Head.PackedSize;
}
+ ui64 MeteringDataSize(const TActorContext& ctx) const;
+
+ ui64 UsedReserveSize() {
+ return std::min<ui64>(Size(), ReserveSize());
+ }
+
+ ui64 ReserveSize() {
+ return TopicPartitionReserveSize(Config);
+ }
//Bootstrap sends kvRead
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp
index 89676eef2e..89af42f079 100644
--- a/ydb/core/persqueue/read_balancer.cpp
+++ b/ydb/core/persqueue/read_balancer.cpp
@@ -199,6 +199,9 @@ TString TPersQueueReadBalancer::GenerateStat() {
TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedMin: " << TotalAvgSpeedMin << "/" << MaxAvgSpeedMin << "/" << TotalAvgSpeedMin / NumActiveParts;}
TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedHour: " << TotalAvgSpeedHour << "/" << MaxAvgSpeedHour << "/" << TotalAvgSpeedHour / NumActiveParts;}
TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedDay: " << TotalAvgSpeedDay << "/" << MaxAvgSpeedDay << "/" << TotalAvgSpeedDay / NumActiveParts;}
+ TAG(TH3) {str << "TotalDataSize: " << TotalDataSize;}
+ TAG(TH3) {str << "ReserveSize: " << PartitionReserveSize();}
+ TAG(TH3) {str << "TotalUsedReserveSize: " << TotalUsedReserveSize;}
}
UL_CLASS("nav nav-tabs") {
@@ -624,15 +627,10 @@ void TPersQueueReadBalancer::RestartPipe(const ui64 tabletId, const TActorContex
}
}
-
-void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TActorContext& ctx)
-{
- if ((tabletId == SchemeShardId && !WaitingForACL) ||
- (tabletId != SchemeShardId && !WaitingForStat.contains(tabletId)))
- return;
+TActorId TPersQueueReadBalancer::GetPipeClient(const ui64 tabletId, const TActorContext& ctx) {
+ TActorId pipeClient;
auto it = TabletPipes.find(tabletId);
- TActorId pipeClient;
if (it == TabletPipes.end()) {
NTabletPipe::TClientConfig clientConfig;
pipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig));
@@ -640,6 +638,17 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA
} else {
pipeClient = it->second;
}
+
+ return pipeClient;
+}
+
+void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TActorContext& ctx)
+{
+ if ((tabletId == SchemeShardId && !WaitingForACL) ||
+ (tabletId != SchemeShardId && !WaitingForStat.contains(tabletId)))
+ return;
+
+ TActorId pipeClient = GetPipeClient(tabletId, ctx);
if (tabletId == SchemeShardId) {
NTabletPipe::SendData(ctx, pipeClient, new NSchemeShard::TEvSchemeShard::TEvDescribeScheme(tabletId, PathId));
} else {
@@ -664,6 +673,9 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c
MaxAvgSpeedHour = Max<ui64>(MaxAvgSpeedHour, partRes.GetAvgWriteSpeedPerHour());
TotalAvgSpeedDay += partRes.GetAvgWriteSpeedPerDay();
MaxAvgSpeedDay = Max<ui64>(MaxAvgSpeedDay, partRes.GetAvgWriteSpeedPerDay());
+
+ TotalDataSize += partRes.GetPartitionSize();
+ TotalUsedReserveSize += partRes.GetUsedReserveSize();
}
if (WaitingForStat.empty()) {
CheckStat(ctx);
@@ -708,6 +720,23 @@ void TPersQueueReadBalancer::CheckStat(const TActorContext& ctx) {
Y_UNUSED(ctx);
//TODO: Deside about changing number of partitions and send request to SchemeShard
//TODO: make AlterTopic request via TX_PROXY
+
+ TEvPersQueue::TEvPeriodicTopicStats* ev = new TEvPersQueue::TEvPeriodicTopicStats();
+ auto& rec = ev->Record;
+ rec.SetPathId(PathId);
+ rec.SetGeneration(Generation);
+ rec.SetRound(++StatsReportRound);
+ rec.SetDataSize(TotalDataSize);
+ rec.SetUsedReserveSize(TotalUsedReserveSize);
+
+ LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
+ TStringBuilder() << "Send TEvPeriodicTopicStats PathId: " << PathId
+ << " Generation: " << Generation
+ << " StatsReportRound: " << StatsReportRound
+ << " DataSize: " << TotalDataSize
+ << " UsedReserveSize: " << TotalUsedReserveSize);
+
+ NTabletPipe::SendData(ctx, GetPipeClient(SchemeShardId, ctx), ev);
}
void TPersQueueReadBalancer::GetStat(const TActorContext& ctx) {
@@ -717,6 +746,8 @@ void TPersQueueReadBalancer::GetStat(const TActorContext& ctx) {
TotalAvgSpeedMin = MaxAvgSpeedMin = 0;
TotalAvgSpeedHour = MaxAvgSpeedHour = 0;
TotalAvgSpeedDay = MaxAvgSpeedDay = 0;
+ TotalDataSize = 0;
+ TotalUsedReserveSize = 0;
for (auto& p : PartitionsInfo) {
const ui64& tabletId = p.second.TabletId;
bool res = WaitingForStat.insert(tabletId).second;
diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h
index cd74aa3a8d..7b0c945c02 100644
--- a/ydb/core/persqueue/read_balancer.h
+++ b/ydb/core/persqueue/read_balancer.h
@@ -1,5 +1,7 @@
#pragma once
+#include "utils.h"
+
#include <util/system/hp_timer.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
@@ -178,9 +180,11 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
}
void HandleWakeup(TEvents::TEvWakeup::TPtr&, const TActorContext &ctx) {
+ LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, TStringBuilder() << "TPersQueueReadBalancer::HandleWakeup");
+
GetStat(ctx); //TODO: do it only on signals from outerspace right now
- ctx.Schedule(TDuration::Seconds(30), new TEvents::TEvWakeup()); //TODO: remove it
+ ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetBalancerWakeupIntervalSec()), new TEvents::TEvWakeup()); //TODO: remove it
}
void HandleUpdateACL(TEvPersQueue::TEvUpdateACL::TPtr&, const TActorContext &ctx) {
@@ -242,7 +246,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
}
RegisterEvents.clear();
- ctx.Schedule(TDuration::Seconds(30), new TEvents::TEvWakeup()); //TODO: remove it
+ ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetBalancerWakeupIntervalSec()), new TEvents::TEvWakeup()); //TODO: remove it
ctx.Send(ctx.SelfID, new TEvPersQueue::TEvUpdateACL());
}
@@ -270,6 +274,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
TStringBuilder GetPrefix() const;
+ TActorId GetPipeClient(const ui64 tabletId, const TActorContext&);
void RequestTabletIfNeeded(const ui64 tabletId, const TActorContext&);
void RestartPipe(const ui64 tabletId, const TActorContext&);
void CheckStat(const TActorContext&);
@@ -293,6 +298,10 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
struct TPipeInfo;
void UnregisterSession(const TActorId& pipe, const TActorContext& ctx);
void RebuildStructs();
+ ui64 PartitionReserveSize() {
+ return TopicPartitionReserveSize(TabletConfig);
+ }
+
bool Inited;
ui64 PathId;
@@ -431,6 +440,10 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
ui64 MaxAvgSpeedHour;
ui64 TotalAvgSpeedDay;
ui64 MaxAvgSpeedDay;
+ ui64 TotalDataSize;
+ ui64 TotalUsedReserveSize;
+
+ ui64 StatsReportRound;
std::deque<TAutoPtr<TEvPersQueue::TEvRegisterReadSession>> RegisterEvents;
std::deque<TAutoPtr<TEvPersQueue::TEvPersQueue::TEvUpdateBalancerConfig>> UpdateEvents;
@@ -467,6 +480,9 @@ public:
, MaxAvgSpeedHour(0)
, TotalAvgSpeedDay(0)
, MaxAvgSpeedDay(0)
+ , TotalDataSize(0)
+ , TotalUsedReserveSize(0)
+ , StatsReportRound(0)
{}
STFUNC(StateInit) {
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
index 515f65ddb0..b4d739612d 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
@@ -286,7 +286,11 @@ void PQTabletRestart(TTestActorRuntime& runtime, ui64 tabletId, TActorId edge) {
}
TActorId SetOwner(const ui32 partition, TTestContext& tc, const TString& owner, bool force) {
- TActorId pipeClient = tc.Runtime->ConnectToPipe(tc.TabletId, tc.Edge, 0, GetPipeConfigWithRetries());
+ return SetOwner(tc.Runtime.Get(), tc.TabletId, tc.Edge, partition, owner, force);
+}
+
+TActorId SetOwner(TTestActorRuntime* runtime, ui64 tabletId, const TActorId& sender, const ui32 partition, const TString& owner, bool force) {
+ TActorId pipeClient = runtime->ConnectToPipe(tabletId, sender, 0, GetPipeConfigWithRetries());
THolder<TEvPersQueue::TEvRequest> request;
@@ -297,7 +301,7 @@ TActorId SetOwner(const ui32 partition, TTestContext& tc, const TString& owner,
req->MutableCmdGetOwnership()->SetForce(force);
ActorIdToProto(pipeClient, req->MutablePipeClient());
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries(), pipeClient);
+ runtime->SendToPipe(tabletId, sender, request.Release(), 0, GetPipeConfigWithRetries(), pipeClient);
return pipeClient;
}
@@ -380,34 +384,38 @@ void WaitPartition(const TString &session, TTestContext& tc, ui32 partition, con
}
std::pair<TString, TActorId> CmdSetOwner(const ui32 partition, TTestContext& tc, const TString& owner, bool force) {
+ return CmdSetOwner(tc.Runtime.Get(), tc.TabletId, tc.Edge, partition, owner, force);
+}
+
+std::pair<TString, TActorId> CmdSetOwner(TTestActorRuntime* runtime, ui64 tabletId, const TActorId& sender, const ui32 partition, const TString& owner, bool force) {
TAutoPtr<IEventHandle> handle;
TEvPersQueue::TEvResponse *result;
TString cookie;
TActorId pipeClient;
for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
try {
- tc.Runtime->ResetScheduledCount();
+ runtime->ResetScheduledCount();
- pipeClient = SetOwner(partition, tc, owner, force);
+ pipeClient = SetOwner(runtime, tabletId, sender, partition, owner, force);
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
+ result = runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
UNIT_ASSERT(result);
UNIT_ASSERT(result->Record.HasStatus());
if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
- tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
+ runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
retriesLeft = 3;
continue;
}
if (result->Record.GetErrorReason().StartsWith("ownership session is killed by another session with id ")) {
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
+ result = runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
UNIT_ASSERT(result);
UNIT_ASSERT(result->Record.HasStatus());
}
if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
- tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
+ runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
retriesLeft = 3;
continue;
}
@@ -483,8 +491,13 @@ void WritePartDataWithBigMsg(const ui32 partition, const TString& sourceId, cons
void WriteData(const ui32 partition, const TString& sourceId, const TVector<std::pair<ui64, TString>> data, TTestContext& tc,
const TString& cookie, i32 msgSeqNo, i64 offset, bool disableDeduplication) {
+ WriteData(tc.Runtime.Get(), tc.TabletId, tc.Edge, partition, sourceId, data, cookie, msgSeqNo, offset, disableDeduplication);
+}
+
+void WriteData(TTestActorRuntime* runtime, ui64 tabletId, const TActorId& sender, const ui32 partition, const TString& sourceId,
+ const TVector<std::pair<ui64, TString>> data, const TString& cookie, i32 msgSeqNo, i64 offset, bool disableDeduplication) {
THolder<TEvPersQueue::TEvRequest> request;
- tc.Runtime->ResetScheduledCount();
+ runtime->ResetScheduledCount();
request.Reset(new TEvPersQueue::TEvRequest);
auto req = request->Record.MutablePartitionRequest();
req->SetPartition(partition);
@@ -499,7 +512,7 @@ void WriteData(const ui32 partition, const TString& sourceId, const TVector<std:
write->SetData(p.second);
write->SetDisableDeduplication(disableDeduplication);
}
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ runtime->SendToPipe(tabletId, sender, request.Release(), 0, GetPipeConfigWithRetries());
}
void CmdWrite(const ui32 partition, const TString& sourceId, const TVector<std::pair<ui64, TString>> data,
@@ -507,16 +520,26 @@ void CmdWrite(const ui32 partition, const TString& sourceId, const TVector<std::
bool isFirst, const TString& ownerCookie, i32 msn, i64 offset,
bool treatWrongCookieAsError, bool treatBadOffsetAsError,
bool disableDeduplication) {
+ CmdWrite(tc.Runtime.Get(), tc.TabletId, tc.Edge, partition, sourceId, tc.MsgSeqNoMap[partition],
+ data, error, alreadyWrittenSeqNo, isFirst, ownerCookie, msn, offset, treatWrongCookieAsError, treatBadOffsetAsError, disableDeduplication);
+
+}
+
+void CmdWrite(TTestActorRuntime* runtime, ui64 tabletId, const TActorId& sender, const ui32 partition,
+ const TString& sourceId, ui32& msgSeqNo, const TVector<std::pair<ui64, TString>> data,
+ bool error, const THashSet<ui32>& alreadyWrittenSeqNo,
+ bool isFirst, const TString& ownerCookie, i32 msn, i64 offset,
+ bool treatWrongCookieAsError, bool treatBadOffsetAsError,
+ bool disableDeduplication) {
TAutoPtr<IEventHandle> handle;
TEvPersQueue::TEvResponse *result;
- ui32& msgSeqNo = tc.MsgSeqNoMap[partition];
if (msn != -1) msgSeqNo = msn;
TString cookie = ownerCookie;
for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
try {
- WriteData(partition, sourceId, data, tc, cookie, msgSeqNo, offset, disableDeduplication);
- result = tc.Runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle,
+ WriteData(runtime, tabletId, sender, partition, sourceId, data, cookie, msgSeqNo, offset, disableDeduplication);
+ result = runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle,
[](const TEvPersQueue::TEvResponse& ev){
if (ev.Record.HasPartitionResponse() &&
ev.Record.GetPartitionResponse().CmdWriteResultSize() > 0 ||
@@ -528,14 +551,14 @@ void CmdWrite(const ui32 partition, const TString& sourceId, const TVector<std::
UNIT_ASSERT(result);
UNIT_ASSERT(result->Record.HasStatus());
if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
- tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
+ runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
retriesLeft = 3;
continue;
}
if (!treatWrongCookieAsError &&
result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE) {
- cookie = CmdSetOwner(partition, tc).first;
+ cookie = CmdSetOwner(runtime, tabletId, sender, partition).first;
msgSeqNo = 0;
retriesLeft = 3;
continue;
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h
index 3f62e1183f..47166af066 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.h
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.h
@@ -281,6 +281,14 @@ TActorId SetOwner(
const TString& owner,
bool force);
+TActorId SetOwner(
+ TTestActorRuntime* runtime,
+ ui64 tabletId,
+ const TActorId& sender,
+ const ui32 partition,
+ const TString& owner,
+ bool force);
+
void FillDeprecatedUserInfo(
NKikimrClient::TKeyValueRequest_TCmdWrite* write,
const TString& client,
@@ -326,6 +334,18 @@ void WriteData(
i64 offset,
bool disableDeduplication = false);
+void WriteData(
+ TTestActorRuntime* runtime,
+ ui64 tabletId,
+ const TActorId& sender,
+ const ui32 partition,
+ const TString& sourceId,
+ const TVector<std::pair<ui64, TString>> data,
+ const TString& cookie,
+ i32 msgSeqNo,
+ i64 offset,
+ bool disableDeduplication = false);
+
void WritePartData(
const ui32 partition,
const TString& sourceId,
@@ -363,6 +383,14 @@ std::pair<TString, TActorId> CmdSetOwner(
const TString& owner = "default",
bool force = true);
+std::pair<TString, TActorId> CmdSetOwner(
+ TTestActorRuntime* runtime,
+ ui64 tabletId,
+ const TActorId& sender,
+ const ui32 partition,
+ const TString& owner = "default",
+ bool force = true);
+
void CmdCreateSession(
const ui32 partition,
const TString& user,
@@ -436,4 +464,22 @@ void CmdWrite(
bool treatBadOffsetAsError = true,
bool disableDeduplication = false);
+void CmdWrite(
+ TTestActorRuntime* runtime,
+ ui64 tabletId,
+ const TActorId& sender,
+ const ui32 partition,
+ const TString& sourceId,
+ ui32& msgSeqNo,
+ const TVector<std::pair<ui64, TString>> data,
+ bool error = false,
+ const THashSet<ui32>& alreadyWrittenSeqNo = {},
+ bool isFirst = false,
+ const TString& ownerCookie = "",
+ i32 msn = -1,
+ i64 offset = -1,
+ bool treatWrongCookieAsError = false,
+ bool treatBadOffsetAsError = true,
+ bool disableDeduplication = false);
+
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp
new file mode 100644
index 0000000000..285640ffe6
--- /dev/null
+++ b/ydb/core/persqueue/utils.cpp
@@ -0,0 +1,22 @@
+#include "utils.h"
+
+namespace NKikimr::NPQ {
+
+ui64 TopicPartitionReserveSize(const NKikimrPQ::TPQTabletConfig& config) {
+ if (NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS == config.GetMeteringMode()) {
+ return 0;
+ }
+ if (config.GetPartitionConfig().HasStorageLimitBytes()) {
+ return config.GetPartitionConfig().GetStorageLimitBytes();
+ }
+ return config.GetPartitionConfig().GetLifetimeSeconds() * config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond();
+}
+
+ui64 TopicPartitionReserveThroughput(const NKikimrPQ::TPQTabletConfig& config) {
+ if (NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS == config.GetMeteringMode()) {
+ return 0;
+ }
+ return config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond();
+}
+
+} // NKikimr::NPQ
diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h
new file mode 100644
index 0000000000..806e9950ca
--- /dev/null
+++ b/ydb/core/persqueue/utils.h
@@ -0,0 +1,10 @@
+#pragma once
+
+#include <ydb/core/protos/pqconfig.pb.h>
+
+namespace NKikimr::NPQ {
+
+ui64 TopicPartitionReserveSize(const NKikimrPQ::TPQTabletConfig& config);
+ui64 TopicPartitionReserveThroughput(const NKikimrPQ::TPQTabletConfig& config);
+
+} // NKikimr::NPQ
diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto
index 0406f2c064..505a8ed6aa 100644
--- a/ydb/core/protos/counters_schemeshard.proto
+++ b/ydb/core/protos/counters_schemeshard.proto
@@ -183,6 +183,8 @@ enum ESimpleCounters {
COUNTER_IN_FLIGHT_OPS_TxCreateExternalDataSource = 148 [(CounterOpts) = {Name: "InFlightOps/CreateExternalDataSource"}];
COUNTER_IN_FLIGHT_OPS_TxDropExternalDataSource = 149 [(CounterOpts) = {Name: "InFlightOps/DropExternalDataSource"}];
COUNTER_IN_FLIGHT_OPS_TxAlterExternalDataSource = 150 [(CounterOpts) = {Name: "InFlightOps/AlterExternalDataSource"}];
+
+ COUNTER_PQ_STATS_QUEUE_SIZE = 151 [(CounterOpts) = {Name: "PQStatsQueueSize"}];
}
enum ECumulativeCounters {
@@ -297,6 +299,8 @@ enum ECumulativeCounters {
COUNTER_FINISHED_OPS_TxCreateExternalDataSource = 91 [(CounterOpts) = {Name: "FinishedOps/CreateExternalDataSource"}];
COUNTER_FINISHED_OPS_TxDropExternalDataSource = 92 [(CounterOpts) = {Name: "FinishedOps/DropExternalDataSource"}];
COUNTER_FINISHED_OPS_TxAlterExternalDataSource = 93 [(CounterOpts) = {Name: "FinishedOps/AlterExternalDataSource"}];
+
+ COUNTER_PQ_STATS_WRITTEN = 94 [(CounterOpts) = {Name: "PQStatsWritten"}];
}
enum EPercentileCounters {
@@ -399,6 +403,17 @@ enum EPercentileCounters {
Ranges: { Value: 500000 Name: "500 ms" }
Ranges: { Value: 1000000 Name: "1000 ms" }
}];
+
+ COUNTER_PQ_STATS_BATCH_LATENCY = 6 [(CounterOpts) = {
+ Name: "PQStatsBatchLatency",
+ Ranges: { Value: 1000 Name: "1 ms" }
+ Ranges: { Value: 10000 Name: "10 ms" }
+ Ranges: { Value: 50000 Name: "50 ms" }
+ Ranges: { Value: 100000 Name: "100 ms" }
+ Ranges: { Value: 200000 Name: "200 ms" }
+ Ranges: { Value: 500000 Name: "500 ms" }
+ Ranges: { Value: 1000000 Name: "1000 ms" }
+ }];
}
enum ETxTypes {
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 9aaf0bb96f..f218896b49 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -186,6 +186,8 @@ message TPQConfig {
}
optional TMoveTopicActorConfig MoveTopicActorConfig = 51;
+
+ optional uint64 BalancerWakeupIntervalSec = 54 [default = 30];
}
message TChannelProfile {
@@ -674,6 +676,8 @@ message TStatusResponse {
repeated TErrorMessage Errors = 29;
repeated TConsumerResult ConsumerResult = 30;
+
+ optional int64 UsedReserveSize = 31;
}
message TConsumerResult {
@@ -829,6 +833,16 @@ message TEvCancelTransactionProposal {
optional uint64 TxId = 1;
};
+message TEvPeriodicTopicStats {
+ required uint64 PathId = 1;
+
+ required uint64 Generation = 2;
+ required uint64 Round = 3;
+
+ required uint64 DataSize = 4;
+ required uint64 UsedReserveSize = 5;
+};
+
message TTransaction {
enum EState {
UNKNOWN = 0;
diff --git a/ydb/core/protos/subdomains.proto b/ydb/core/protos/subdomains.proto
index 4ac4a22cb5..77955cca5b 100644
--- a/ydb/core/protos/subdomains.proto
+++ b/ydb/core/protos/subdomains.proto
@@ -49,6 +49,9 @@ message TDiskSpaceUsage {
message TTopics {
// in bytes
optional uint64 ReserveSize = 1;
+ optional uint64 AccountSize = 2;
+ optional uint64 DataSize = 3;
+ optional uint64 UsedReserveSize = 4;
}
optional TTables Tables = 1;
diff --git a/ydb/core/tx/schemeshard/CMakeLists.darwin.txt b/ydb/core/tx/schemeshard/CMakeLists.darwin.txt
index abd0c5b9b2..7c311d3759 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.darwin.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.darwin.txt
@@ -197,6 +197,7 @@ target_sources(core-tx-schemeshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_allocate_pq.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__publish_to_scheme_board.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__state_changed_reply.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt
index b24c64ba3a..d80441560c 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt
@@ -198,6 +198,7 @@ target_sources(core-tx-schemeshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_allocate_pq.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__publish_to_scheme_board.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__state_changed_reply.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux.txt b/ydb/core/tx/schemeshard/CMakeLists.linux.txt
index b24c64ba3a..d80441560c 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.linux.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.linux.txt
@@ -198,6 +198,7 @@ target_sources(core-tx-schemeshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_allocate_pq.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__publish_to_scheme_board.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__state_changed_reply.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp
index 11f8af0f9a..5284f79137 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp
@@ -117,6 +117,7 @@ public:
domainInfo->DecPathsInside();
domainInfo->DecPQPartitionsInside(pqGroup->TotalPartitionCount);
domainInfo->DecPQReservedStorage(reserve.Storage);
+ domainInfo->AggrDiskSpaceUsage({}, pqGroup->Stats);
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_THROUGHPUT].Sub(reserve.Throughput);
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_STORAGE].Sub(reserve.Storage);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp
index 9926d0fc48..36ccb1fe09 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp
@@ -189,6 +189,7 @@ public:
domainInfo->DecPathsInside();
domainInfo->DecPQPartitionsInside(pqGroup->TotalPartitionCount);
domainInfo->DecPQReservedStorage(reserve.Storage);
+ domainInfo->AggrDiskSpaceUsage({}, pqGroup->Stats);
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_THROUGHPUT].Sub(reserve.Throughput);
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_STORAGE].Sub(reserve.Storage);
diff --git a/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp
new file mode 100644
index 0000000000..34f0f86dae
--- /dev/null
+++ b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp
@@ -0,0 +1,123 @@
+#include "schemeshard_impl.h"
+#include "schemeshard__stats_impl.h"
+
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/cputime.h>
+#include <ydb/core/protos/sys_view.pb.h>
+
+namespace NKikimr {
+namespace NSchemeShard {
+
+class TTxStoreTopicStats: public TTxStoreStats<TEvPersQueue::TEvPeriodicTopicStats> {
+ TSideEffects MergeOpSideEffects;
+
+public:
+ TTxStoreTopicStats(TSchemeShard* ss, TStatsQueue<TEvPersQueue::TEvPeriodicTopicStats>& queue, bool& persistStatsPending)
+ : TTxStoreStats(ss, queue, persistStatsPending)
+ {
+ }
+
+ virtual ~TTxStoreTopicStats() = default;
+
+ void Complete(const TActorContext& ) override {};
+
+ // returns true to continue batching
+ bool PersistSingleStats(const TPathId& pathId, const TStatsQueue<TEvPersQueue::TEvPeriodicTopicStats>::TItem& item, TTransactionContext& txc, const TActorContext& ctx) override;
+};
+
+
+bool TTxStoreTopicStats::PersistSingleStats(const TPathId& pathId, const TStatsQueueItem<TEvPersQueue::TEvPeriodicTopicStats>& item, TTransactionContext& txc, const TActorContext& ctx) {
+ const auto& rec = item.Ev->Get()->Record;
+
+ TTopicStats newStats;
+ newStats.SeqNo = TMessageSeqNo(rec.GetGeneration(), rec.GetRound());
+ newStats.DataSize = rec.GetDataSize();
+ newStats.UsedReserveSize = rec.GetUsedReserveSize();
+
+ auto& topic = Self->Topics[pathId];
+ auto& oldStats = topic->Stats;
+
+ if (newStats.SeqNo <= oldStats.SeqNo) {
+ // Ignore outdated message
+ return true;
+ }
+
+ auto subDomainInfo = Self->ResolveDomainInfo(pathId);
+ subDomainInfo->AggrDiskSpaceUsage(newStats, oldStats);
+
+ oldStats = newStats;
+
+ if (subDomainInfo->CheckDiskSpaceQuotas(Self)) {
+ NIceDb::TNiceDb db(txc.DB);
+
+ auto subDomainId = Self->ResolvePathIdForDomain(pathId);
+ Self->PersistSubDomainState(db, subDomainId, *subDomainInfo);
+
+ // Publish is done in a separate transaction, so we may call this directly
+ TDeque<TPathId> toPublish;
+ toPublish.push_back(subDomainId);
+ Self->PublishToSchemeBoard(TTxId(), std::move(toPublish), ctx);
+ }
+
+ return true;
+}
+
+
+void TSchemeShard::Handle(TEvPersQueue::TEvPeriodicTopicStats::TPtr& ev, const TActorContext& ctx) {
+ const auto& rec = ev->Get()->Record;
+
+ const TPathId pathId = TPathId(TabletID(), rec.GetPathId());
+
+ LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Got periodic topic stats at partition " << pathId
+ << " DataSize " << rec.GetDataSize()
+ << " UsedReserveSize " << rec.GetUsedReserveSize());
+
+ TStatsId statsId(pathId);
+ switch(TopicStatsQueue.Add(statsId, ev.Release())) {
+ case READY:
+ ExecuteTopicStatsBatch(ctx);
+ break;
+
+ case NOT_READY:
+ ScheduleTopicStatsBatch(ctx);
+ break;
+
+ default:
+ Y_FAIL("Unknown batch status");
+ }
+}
+
+void TSchemeShard::Handle(TEvPrivate::TEvPersistTopicStats::TPtr&, const TActorContext& ctx) {
+ LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Started TEvPersistStats at tablet " << TabletID() << ", queue size# " << TopicStatsQueue.Size());
+
+ TopicStatsBatchScheduled = false;
+ ExecuteTopicStatsBatch(ctx);
+}
+
+void TSchemeShard::ExecuteTopicStatsBatch(const TActorContext& ctx) {
+ if (!TopicPersistStatsPending && !TopicStatsQueue.Empty()) {
+ LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Will execute TTxStoreStats, queue# " << TopicStatsQueue.Size());
+
+ TopicPersistStatsPending = true;
+ Execute(new TTxStoreTopicStats(this, TopicStatsQueue, TopicPersistStatsPending), ctx);
+
+ ScheduleTopicStatsBatch(ctx);
+ }
+}
+
+void TSchemeShard::ScheduleTopicStatsBatch(const TActorContext& ctx) {
+ if (!TopicStatsBatchScheduled && !TopicStatsQueue.Empty()) {
+ TDuration delay = TopicStatsQueue.Delay();
+ LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Will delay TTxStoreTopicStats on# " << delay << ", queue# " << TopicStatsQueue.Size());
+
+ ctx.Schedule(delay, new TEvPrivate::TEvPersistTopicStats());
+ TopicStatsBatchScheduled = true;
+ }
+}
+
+} // NSchemeShard
+} // NKikimr
diff --git a/ydb/core/tx/schemeshard/schemeshard__stats.h b/ydb/core/tx/schemeshard/schemeshard__stats.h
index 659e57f087..b9abf7ab34 100644
--- a/ydb/core/tx/schemeshard/schemeshard__stats.h
+++ b/ydb/core/tx/schemeshard/schemeshard__stats.h
@@ -14,7 +14,7 @@ struct TStatsId {
TPathId PathId;
TTabletId Datashard;
- TStatsId(const TPathId& pathId, const TTabletId& datashard)
+ TStatsId(const TPathId& pathId, const TTabletId datashard = TTabletId(0))
: PathId(pathId)
, Datashard(datashard)
{
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index ab7f1a0c77..b061571cce 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -3967,6 +3967,10 @@ TSchemeShard::TSchemeShard(const TActorId &tablet, TTabletStorageInfo *info)
COUNTER_STATS_QUEUE_SIZE,
COUNTER_STATS_WRITTEN,
COUNTER_STATS_BATCH_LATENCY)
+ , TopicStatsQueue(this,
+ COUNTER_PQ_STATS_QUEUE_SIZE,
+ COUNTER_PQ_STATS_WRITTEN,
+ COUNTER_PQ_STATS_BATCH_LATENCY)
, AllowDataColumnForIndexTable(0, 0, 1)
{
TabletCountersPtr.Reset(new TProtobufTabletCounters<
@@ -4240,6 +4244,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) {
HFuncTraced(TEvDataShard::TEvSplitAck, Handle);
HFuncTraced(TEvDataShard::TEvSplitPartitioningChangedAck, Handle);
HFuncTraced(TEvDataShard::TEvPeriodicTableStats, Handle);
+ HFuncTraced(TEvPersQueue::TEvPeriodicTopicStats, Handle);
HFuncTraced(TEvDataShard::TEvGetTableStatsResult, Handle);
//
@@ -4355,6 +4360,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) {
HFuncTraced(TEvPrivate::TEvSubscribeToShardDeletion, Handle);
HFuncTraced(TEvPrivate::TEvPersistTableStats, Handle);
+ HFuncTraced(TEvPrivate::TEvPersistTopicStats, Handle);
HFuncTraced(TEvSchemeShard::TEvLogin, Handle);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index ae0cf92367..4a8f33b8b5 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -286,6 +286,10 @@ public:
bool TablePersistStatsPending = false;
TStatsQueue<TEvDataShard::TEvPeriodicTableStats> TableStatsQueue;
+ bool TopicStatsBatchScheduled = false;
+ bool TopicPersistStatsPending = false;
+ TStatsQueue<TEvPersQueue::TEvPeriodicTopicStats> TopicStatsQueue;
+
TSet<TPathId> CleanDroppedPathsCandidates;
TSet<TPathId> CleanDroppedSubDomainsCandidates;
bool CleanDroppedPathsInFly = false;
@@ -989,6 +993,11 @@ public:
void Handle(TEvDataShard::TEvPeriodicTableStats::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvGetTableStatsResult::TPtr& ev, const TActorContext& ctx);
+ void ExecuteTopicStatsBatch(const TActorContext& ctx);
+ void ScheduleTopicStatsBatch(const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvPersistTopicStats::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPersQueue::TEvPeriodicTopicStats::TPtr& ev, const TActorContext& ctx);
+
void Handle(TEvSchemeShard::TEvFindTabletSubDomainPathId::TPtr& ev, const TActorContext& ctx);
void ScheduleConditionalEraseRun(const TActorContext& ctx);
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 023610be4e..1f566a98ed 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -985,6 +985,20 @@ struct TColumnTableInfo : TSimpleRefCount<TColumnTableInfo> {
}
};
+struct TTopicStats {
+ TMessageSeqNo SeqNo;
+
+ ui64 DataSize = 0;
+ ui64 UsedReserveSize = 0;
+
+ TString ToString() const {
+ return TStringBuilder() << "TTopicStats {"
+ << " DataSize: " << DataSize
+ << " UsedReserveSize: " << UsedReserveSize
+ << " }";
+ }
+};
+
struct TTopicTabletInfo : TSimpleRefCount<TTopicTabletInfo> {
using TPtr = TIntrusivePtr<TTopicTabletInfo>;
using TKeySchema = TVector<NScheme::TTypeInfo>;
@@ -1177,6 +1191,8 @@ struct TTopicInfo : TSimpleRefCount<TTopicInfo> {
TString PreSerializedPathDescription; // Cached path description
TString PreSerializedPartitionsDescription; // Cached partition description
+ TTopicStats Stats;
+
bool FillKeySchema(const NKikimrPQ::TPQTabletConfig& tabletConfig, TString& error);
bool FillKeySchema(const TString& tabletConfig);
@@ -1322,6 +1338,12 @@ struct TSubDomainInfo: TSimpleRefCount<TSubDomainInfo> {
ui64 DataSize = 0;
ui64 IndexSize = 0;
} Tables;
+
+ struct TTopics {
+ ui64 DataSize = 0;
+ ui64 ReserveSize = 0;
+ ui64 UsedReserveSize = 0;
+ } Topics;
};
struct TDiskSpaceQuotas {
@@ -1835,6 +1857,11 @@ struct TSubDomainInfo: TSimpleRefCount<TSubDomainInfo> {
counters->ChangeDiskSpaceTablesTotalBytes(newTotalBytes - oldTotalBytes);
}
+ void AggrDiskSpaceUsage(const TTopicStats& newAggr, const TTopicStats& oldAggr = {}) {
+ DiskSpaceUsage.Topics.DataSize += (newAggr.DataSize - oldAggr.DataSize);
+ DiskSpaceUsage.Topics.UsedReserveSize += (newAggr.UsedReserveSize - oldAggr.UsedReserveSize);
+ }
+
const TDiskSpaceUsage& GetDiskSpaceUsage() const {
return DiskSpaceUsage;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
index 3e81f0fb91..525c6b81e9 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
@@ -674,6 +674,10 @@ void TPathDescriber::DescribeDomainRoot(TPathElement::TPtr pathEl) {
diskSpaceUsage->MutableTables()->SetDataSize(subDomainInfo->GetDiskSpaceUsage().Tables.DataSize);
diskSpaceUsage->MutableTables()->SetIndexSize(subDomainInfo->GetDiskSpaceUsage().Tables.IndexSize);
diskSpaceUsage->MutableTopics()->SetReserveSize(subDomainInfo->GetPQReservedStorage());
+ ui64 accountSize = subDomainInfo->GetDiskSpaceUsage().Topics.DataSize - subDomainInfo->GetDiskSpaceUsage().Topics.UsedReserveSize + subDomainInfo->GetPQReservedStorage();
+ diskSpaceUsage->MutableTopics()->SetAccountSize(accountSize);
+ diskSpaceUsage->MutableTopics()->SetDataSize(subDomainInfo->GetDiskSpaceUsage().Topics.DataSize);
+ diskSpaceUsage->MutableTopics()->SetUsedReserveSize(subDomainInfo->GetDiskSpaceUsage().Topics.UsedReserveSize);
if (subDomainInfo->GetDeclaredSchemeQuotas()) {
entry->MutableDeclaredSchemeQuotas()->CopyFrom(*subDomainInfo->GetDeclaredSchemeQuotas());
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.h b/ydb/core/tx/schemeshard/schemeshard_path_describer.h
index 2e40315cfc..4fe210d5ae 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.h
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.h
@@ -32,7 +32,7 @@ class TPathDescriber {
void DescribeSolomonVolume(TPathId pathId, TPathElement::TPtr pathEl, bool returnChannelsBinding);
void DescribeUserAttributes(TPathElement::TPtr pathEl);
void DescribePathVersion(const TPath& path);
- void DescribeDomain(TPathElement::TPtr pathEl);
+ void DescribeDomain(TPathElement::TPtr pathEl) ;
void DescribeDomainRoot(TPathElement::TPtr pathEl);
void DescribeDomainExtra(TPathElement::TPtr pathEl);
void DescribeRevertedMigrations(TPathElement::TPtr pathEl);
diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h
index fbe6c1e04d..b1d7581f82 100644
--- a/ydb/core/tx/schemeshard/schemeshard_private.h
+++ b/ydb/core/tx/schemeshard/schemeshard_private.h
@@ -27,6 +27,7 @@ struct TEvPrivate {
EvPersistTableStats,
EvConsoleConfigsTimeout,
EvRunCdcStreamScan,
+ EvPersistTopicStats,
EvEnd
};
@@ -163,6 +164,10 @@ struct TEvPrivate {
TEvPersistTableStats() = default;
};
+ struct TEvPersistTopicStats: public TEventLocal<TEvPersistTopicStats, EvPersistTopicStats> {
+ TEvPersistTopicStats() = default;
+ };
+
struct TEvConsoleConfigsTimeout: public TEventLocal<TEvConsoleConfigsTimeout, EvConsoleConfigsTimeout> {
};
diff --git a/ydb/core/tx/schemeshard/schemeshard_utils.cpp b/ydb/core/tx/schemeshard/schemeshard_utils.cpp
index 84c6fd44d8..b94c2a79f8 100644
--- a/ydb/core/tx/schemeshard/schemeshard_utils.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_utils.cpp
@@ -1,6 +1,7 @@
#include "schemeshard_utils.h"
#include <ydb/core/mind/hive/hive.h>
+#include <ydb/core/persqueue/utils.h>
#include <ydb/core/protos/counters_schemeshard.pb.h>
namespace NKikimr {
@@ -200,22 +201,8 @@ void TSelfPinger::ScheduleSelfPingWakeup(const NActors::TActorContext &ctx) {
}
PQGroupReserve::PQGroupReserve(const ::NKikimrPQ::TPQTabletConfig& tabletConfig, ui64 partitions) {
- const auto& partitionConfig = tabletConfig.GetPartitionConfig();
-
- if (tabletConfig.GetMeteringMode() == NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY) {
- const ui64 throughput = partitions * partitionConfig.GetWriteSpeedInBytesPerSecond();
-
- Throughput = throughput;
-
- if (partitionConfig.HasStorageLimitBytes()) {
- Storage = partitions * partitionConfig.GetStorageLimitBytes();
- } else {
- Storage = throughput * partitionConfig.GetLifetimeSeconds();
- }
- } else {
- Throughput = 0;
- Storage = 0;
- }
+ Storage = partitions * NPQ::TopicPartitionReserveSize(tabletConfig);
+ Throughput = partitions * NPQ::TopicPartitionReserveThroughput(tabletConfig);
}
}
diff --git a/ydb/core/tx/schemeshard/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base.cpp
index 569685595a..0cd7665641 100644
--- a/ydb/core/tx/schemeshard/ut_base.cpp
+++ b/ydb/core/tx/schemeshard/ut_base.cpp
@@ -10550,7 +10550,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
);
}
- Y_UNIT_TEST(TopicMeteringModeAndStorageSize) {
+ Y_UNIT_TEST(TopicReserveSize) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
ui64 txId = 100;
@@ -10558,7 +10558,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
const auto AssertReserve = [&] (TString path, ui64 expectedReservedStorage) {
TestDescribeResult(DescribePath(runtime, path),
{NLs::Finished,
- NLs::PQReservedStorage(expectedReservedStorage)});
+ NLs::TopicReservedStorage(expectedReservedStorage)});
};
// create with WriteSpeedInBytesPerSecond
@@ -10770,5 +10770,4 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
env.TestWaitNotification(runtime, txId);
AssertReserve("/MyRoot/Topic2", 3 * 17);
}
-
}
diff --git a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt
index e31d395fe6..ff67c37abd 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt
+++ b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt
@@ -22,6 +22,7 @@ target_link_libraries(tx-schemeshard-ut_helpers PUBLIC
core-engine-minikql
core-filestore-core
ydb-core-metering
+ persqueue-ut-common
ydb-core-protos
ydb-core-scheme
ydb-core-tablet_flat
diff --git a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt
index c1c282cef4..2c4e9247b6 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt
@@ -23,6 +23,7 @@ target_link_libraries(tx-schemeshard-ut_helpers PUBLIC
core-engine-minikql
core-filestore-core
ydb-core-metering
+ persqueue-ut-common
ydb-core-protos
ydb-core-scheme
ydb-core-tablet_flat
diff --git a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt
index c1c282cef4..2c4e9247b6 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt
+++ b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt
@@ -23,6 +23,7 @@ target_link_libraries(tx-schemeshard-ut_helpers PUBLIC
core-engine-minikql
core-filestore-core
ydb-core-metering
+ persqueue-ut-common
ydb-core-protos
ydb-core-scheme
ydb-core-tablet_flat
diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
index d5b550e6d3..f61c5a79e2 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/persqueue/events/global.h>
+#include <ydb/core/persqueue/ut/common/pq_ut_common.h>
#include <ydb/core/blockstore/core/blockstore.h>
@@ -2196,4 +2197,31 @@ namespace NSchemeShardUT_Private {
auto& rec = result->Record;
return rec;
}
+
+ void SendTEvPeriodicTopicStats(TTestActorRuntime& runtime, ui64 topicId, ui64 generation, ui64 round, ui64 dataSize, ui64 usedReserveSize) {
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ TEvPersQueue::TEvPeriodicTopicStats* ev = new TEvPersQueue::TEvPeriodicTopicStats();
+ auto& rec = ev->Record;
+ rec.SetPathId(topicId);
+ rec.SetGeneration(generation);
+ rec.SetRound(round);
+ rec.SetDataSize(dataSize);
+ rec.SetUsedReserveSize(usedReserveSize);
+
+ ForwardToTablet(runtime, TTestTxConfig::SchemeShard, sender, ev);
+ }
+
+ void WriteToTopic(TTestActorRuntime& runtime, const TString& path, ui32& msgSeqNo, const TString& message) {
+ auto topicDescr = DescribePath(runtime, path).GetPathDescription().GetPersQueueGroup();
+ auto partitionId = topicDescr.GetPartitions()[0].GetPartitionId();
+ auto tabletId = topicDescr.GetPartitions()[0].GetTabletId();
+
+ const auto edge = runtime.AllocateEdgeActor();
+ TString cookie = NKikimr::NPQ::CmdSetOwner(&runtime, tabletId, edge, partitionId, "default", true).first;
+
+ TVector<std::pair<ui64, TString>> data;
+ data.push_back({1, message});
+ NKikimr::NPQ::CmdWrite(&runtime, tabletId, edge, partitionId, "sourceid0", msgSeqNo, data, false, {}, true, cookie, 0);
+ }
}
diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h
index c58fe76069..75c9b0bae0 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h
@@ -534,4 +534,7 @@ namespace NSchemeShardUT_Private {
NKikimrPQ::TDescribeResponse GetDescribeFromPQBalancer(TTestActorRuntime& runtime, ui64 balancerId);
+ void SendTEvPeriodicTopicStats(TTestActorRuntime& runtime, ui64 topicId, ui64 generation, ui64 round, ui64 dataSize, ui64 usedReserveSize);
+ void WriteToTopic(TTestActorRuntime& runtime, const TString& path, ui32& msgSeqNo, const TString& message);
+
} //NSchemeShardUT_Private
diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
index a8dd706e72..0dfe288f18 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
@@ -13,6 +13,23 @@ namespace NLs {
using namespace NKikimr;
+#define DESCRIBE_ASSERT_EQUAL(name, type, expression, description) \
+ TCheckFunc name(type expected) { \
+ return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { \
+ UNIT_ASSERT_C(IsGoodDomainStatus(record.GetStatus()), "Unexpected status: " << record.GetStatus()); \
+ \
+ const auto& pathDescr = record.GetPathDescription(); \
+ const auto& subdomain = pathDescr.GetDomainDescription(); \
+ const auto& value = expression; \
+ \
+ UNIT_ASSERT_EQUAL_C(value, expected, \
+ description << " mismatch, subdomain with id " << subdomain.GetDomainKey().GetPathId() << \
+ " has value " << value << \
+ " but expected " << expected); \
+ }; \
+}
+
+
void NotInSubdomain(const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT(record.HasPathDescription());
NKikimrSchemeOp::TPathDescription descr = record.GetPathDescription();
@@ -619,20 +636,9 @@ TCheckFunc PQPartitionsInsideDomain(ui64 count) {
};
}
-TCheckFunc PQReservedStorage(ui64 count) {
- return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
- UNIT_ASSERT_C(IsGoodDomainStatus(record.GetStatus()), "Unexpected status: " << record.GetStatus());
-
- const auto& pathDescr = record.GetPathDescription();
- const auto& domain = pathDescr.GetDomainDescription();
- const auto& curCount = domain.GetDiskSpaceUsage().GetTopics().GetReserveSize();
-
- UNIT_ASSERT_EQUAL_C(curCount, count,
- "pq reserved storage mismatch, domain with id " << domain.GetDomainKey().GetPathId() <<
- " has size " << curCount <<
- " but expected " << count);
- };
-}
+DESCRIBE_ASSERT_EQUAL(TopicReservedStorage, ui64, subdomain.GetDiskSpaceUsage().GetTopics().GetReserveSize(), "Topic ReserveSize")
+DESCRIBE_ASSERT_EQUAL(TopicAccountSize, ui64, subdomain.GetDiskSpaceUsage().GetTopics().GetAccountSize(), "Topic AccountSize")
+DESCRIBE_ASSERT_EQUAL(TopicUsedReserveSize, ui64, subdomain.GetDiskSpaceUsage().GetTopics().GetUsedReserveSize(), "Topic UsedReserveSize")
TCheckFunc PathsInsideDomainOneOf(TSet<ui64> variants) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
@@ -1104,5 +1110,7 @@ TCheckFunc PartitionKeys(TVector<TString> lastShardKeys) {
};
}
+#undef DESCRIBE_ASSERT_EQUAL
+
} // NLs
} // NSchemeShardUT_Private
diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
index 95fd8cc128..d76c74d41f 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
@@ -34,7 +34,9 @@ namespace NLs {
TCheckFunc PathsInsideDomain(ui64 count);
TCheckFunc PQPartitionsInsideDomain(ui64 count);
- TCheckFunc PQReservedStorage(ui64 count);
+ TCheckFunc TopicReservedStorage(ui64 expected);
+ TCheckFunc TopicAccountSize(ui64 expected);
+ TCheckFunc TopicUsedReserveSize(ui64 expected);
TCheckFunc PathsInsideDomainOneOf(TSet<ui64> variants);
TCheckFunc ShardsInsideDomain(ui64 count);
TCheckFunc ShardsInsideDomainOneOf(TSet<ui64> variants);
diff --git a/ydb/core/tx/schemeshard/ut_stats.cpp b/ydb/core/tx/schemeshard/ut_stats.cpp
index d2fbcaf251..51029221ef 100644
--- a/ydb/core/tx/schemeshard/ut_stats.cpp
+++ b/ydb/core/tx/schemeshard/ut_stats.cpp
@@ -297,4 +297,212 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) {
WaitAndCheckStatPersisted(runtime, env, newRowsCount, batchTimeout, eventAction);
}
+
+ Y_UNIT_TEST(TopicAccountSizeAndUsedReserveSize) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+
+ auto& appData = runtime.GetAppData();
+
+ ui64 txId = 100;
+
+ // disable batching
+ appData.SchemeShardConfig.SetStatsBatchTimeoutMs(0);
+ appData.SchemeShardConfig.SetStatsMaxBatchSize(0);
+
+ // apply config via reboot
+ TActorId sender = runtime.AllocateEdgeActor();
+ GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender);
+
+ const auto Assert = [&] (ui64 expectedAccountSize, ui64 expectedUsedReserveSize) {
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Topic1"),
+ {NLs::Finished,
+ NLs::TopicAccountSize(expectedAccountSize),
+ NLs::TopicUsedReserveSize(expectedUsedReserveSize)});
+ };
+
+ TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"(
+ Name: "Topic1"
+ TotalGroupCount: 1
+ PartitionPerTablet: 1
+ PQTabletConfig {
+ PartitionConfig {
+ LifetimeSeconds: 13
+ WriteSpeedInBytesPerSecond : 19
+ }
+ MeteringMode: METERING_MODE_RESERVED_CAPACITY
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+ Assert(1 * 13 * 19, 0); // 247, 0
+
+ TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"(
+ Name: "Topic2"
+ TotalGroupCount: 3
+ PartitionPerTablet: 3
+ PQTabletConfig {
+ PartitionConfig {
+ LifetimeSeconds: 11
+ WriteSpeedInBytesPerSecond : 17
+ }
+ MeteringMode: METERING_MODE_RESERVED_CAPACITY
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+ Assert(1 * 13 * 19 + 3 * 11 * 17, 0); // 247 + 561 = 808, 0
+
+ TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"(
+ Name: "Topic3"
+ TotalGroupCount: 3
+ PartitionPerTablet: 3
+ PQTabletConfig {
+ PartitionConfig {
+ LifetimeSeconds: 11
+ WriteSpeedInBytesPerSecond : 17
+ }
+ MeteringMode: METERING_MODE_RESERVED_CAPACITY
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+ Assert(1 * 13 * 19 + 3 * 11 * 17 + 3 * 11 * 17, 0); // 247 + 561 + 561 = 1369, 0
+
+ ui64 topic1Id = DescribePath(runtime, "/MyRoot/Topic1").GetPathDescription().GetSelf().GetPathId();
+ ui64 topic2Id = DescribePath(runtime, "/MyRoot/Topic2").GetPathDescription().GetSelf().GetPathId();
+ ui64 topic3Id = DescribePath(runtime, "/MyRoot/Topic3").GetPathDescription().GetSelf().GetPathId();
+
+ ui64 generation = 1;
+ ui64 round = 1;
+
+ SendTEvPeriodicTopicStats(runtime, topic1Id, generation, ++round, 101, 101);
+ Assert(1369, 101); // only reserve size
+
+ SendTEvPeriodicTopicStats(runtime, topic1Id, generation, ++round, 383, 247);
+ Assert(1369 + (383 - 247), 247); // 1505, 247 reserve + exceeding the limit
+
+ SendTEvPeriodicTopicStats(runtime, topic2Id, generation, ++round, 113, 113);
+ Assert(1369 + (383 - 247), 247 + 113); // 1505, 360
+
+ SendTEvPeriodicTopicStats(runtime, topic1Id, generation, ++round, 31, 31);
+ Assert(1369, 31 + 113); // only reserve, data size
+
+ TestDropPQGroup(runtime, ++txId, "/MyRoot", "Topic2");
+ env.TestWaitNotification(runtime, txId);
+ Assert(808, 31);
+
+ SendTEvPeriodicTopicStats(runtime, topic3Id, generation, ++round, 151, 151);
+ Assert(808, 31 + 151);
+
+ TestDeallocatePQ(runtime, ++txId, "/MyRoot", "Name: \"Topic3\"");
+ env.TestWaitNotification(runtime, txId);
+ Assert(247, 31);
+ }
+
+ Y_UNIT_TEST(TopicPeriodicStatMeteringModeReserved) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::PERSQUEUE_READ_BALANCER, NLog::PRI_TRACE);
+
+ auto& appData = runtime.GetAppData();
+
+ ui64 txId = 100;
+
+ // disable batching
+ appData.SchemeShardConfig.SetStatsBatchTimeoutMs(0);
+ appData.SchemeShardConfig.SetStatsMaxBatchSize(0);
+
+ appData.PQConfig.SetBalancerWakeupIntervalSec(1);
+
+ // apply config via reboot
+ TActorId sender = runtime.AllocateEdgeActor();
+ GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender);
+
+ TString topicPath = "/MyRoot/Topic1";
+
+ const auto Assert = [&] (ui64 expectedAccountSize, ui64 expectedUsedReserveSize) {
+ TestDescribeResult(DescribePath(runtime,topicPath),
+ {NLs::Finished,
+ NLs::TopicAccountSize(expectedAccountSize),
+ NLs::TopicUsedReserveSize(expectedUsedReserveSize)});
+ };
+
+
+ TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"(
+ Name: "Topic1"
+ TotalGroupCount: 3
+ PartitionPerTablet: 3
+ PQTabletConfig {
+ PartitionConfig {
+ LifetimeSeconds: 11
+ WriteSpeedInBytesPerSecond : 17
+ }
+ MeteringMode: METERING_MODE_RESERVED_CAPACITY
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+ Assert(3 * 11 * 17, 0); // 561, 0
+
+ ui32 msgSeqNo = 100;
+ WriteToTopic(runtime, topicPath, msgSeqNo, "Message 100");
+
+ env.SimulateSleep(runtime, TDuration::Seconds(3)); // Wait TEvPeriodicTopicStats
+
+ Assert(3 * 11 * 17, 69); // 69 - it is unstable value. it can change if internal message store change
+ }
+
+ Y_UNIT_TEST(TopicPeriodicStatMeteringModeRequest) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::PERSQUEUE_READ_BALANCER, NLog::PRI_TRACE);
+
+ auto& appData = runtime.GetAppData();
+
+ ui64 txId = 100;
+
+ // disable batching
+ appData.SchemeShardConfig.SetStatsBatchTimeoutMs(0);
+ appData.SchemeShardConfig.SetStatsMaxBatchSize(0);
+
+ appData.PQConfig.SetBalancerWakeupIntervalSec(1);
+
+ // apply config via reboot
+ TActorId sender = runtime.AllocateEdgeActor();
+ GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender);
+
+ TString topicPath = "/MyRoot/Topic1";
+
+ const auto Assert = [&] (ui64 expectedAccountSize, ui64 expectedUsedReserveSize) {
+ TestDescribeResult(DescribePath(runtime,topicPath),
+ {NLs::Finished,
+ NLs::TopicAccountSize(expectedAccountSize),
+ NLs::TopicUsedReserveSize(expectedUsedReserveSize)});
+ };
+
+
+ TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"(
+ Name: "Topic1"
+ TotalGroupCount: 3
+ PartitionPerTablet: 3
+ PQTabletConfig {
+ PartitionConfig {
+ LifetimeSeconds: 11
+ WriteSpeedInBytesPerSecond : 17
+ }
+ MeteringMode: METERING_MODE_REQUEST_UNITS
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+ Assert(0, 0); // topic is empty
+
+ ui32 msgSeqNo = 100;
+ WriteToTopic(runtime, topicPath, msgSeqNo, "Message 100");
+
+ env.SimulateSleep(runtime, TDuration::Seconds(3)); // Wait TEvPeriodicTopicStats
+
+ Assert(69, 0); // 67 - it is unstable value. it can change if internal message store change
+ }
};