diff options
author | tesseract <tesseract@yandex-team.com> | 2023-03-01 11:23:46 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-03-01 11:23:46 +0300 |
commit | c400b24651590ee3e66795b13cf40dcdc5fb50c9 (patch) | |
tree | 1975b0bde8e39325817978ed84d7bdeee81b6c75 | |
parent | 22f99c1e277287a012a9b629919dbc8874d79912 (diff) | |
download | ydb-c400b24651590ee3e66795b13cf40dcdc5fb50c9.tar.gz |
В schemeshard-е поддерживать статистику об используемом месте топиками пользователя
Мержить после https://a.yandex-team.ru/review/3501052/details
До разделения на два ПР был https://a.yandex-team.ru/review/3465600/details
38 files changed, 700 insertions, 67 deletions
diff --git a/ydb/core/persqueue/CMakeLists.darwin.txt b/ydb/core/persqueue/CMakeLists.darwin.txt index e9cdb1f022..fc8c7d2dba 100644 --- a/ydb/core/persqueue/CMakeLists.darwin.txt +++ b/ydb/core/persqueue/CMakeLists.darwin.txt @@ -60,6 +60,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/transaction.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/type_codecs_defs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/user_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/utils.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/write_meta.cpp ) generate_enum_serilization(ydb-core-persqueue diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt index 1c810979a4..0923957c1a 100644 --- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt +++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt @@ -61,6 +61,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/transaction.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/type_codecs_defs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/user_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/utils.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/write_meta.cpp ) generate_enum_serilization(ydb-core-persqueue diff --git a/ydb/core/persqueue/CMakeLists.linux.txt b/ydb/core/persqueue/CMakeLists.linux.txt index 1c810979a4..0923957c1a 100644 --- a/ydb/core/persqueue/CMakeLists.linux.txt +++ b/ydb/core/persqueue/CMakeLists.linux.txt @@ -61,6 +61,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/transaction.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/type_codecs_defs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/user_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/utils.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/write_meta.cpp ) generate_enum_serilization(ydb-core-persqueue diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h index 15a7964fd8..7d40a67955 100644 --- a/ydb/core/persqueue/events/global.h +++ b/ydb/core/persqueue/events/global.h @@ -45,6 +45,7 @@ struct TEvPersQueue { EvProposeTransaction, EvProposeTransactionResult, EvCancelTransactionProposal, + EvPeriodicTopicStats, EvResponse = EvRequest + 256, EvInternalEvents = EvResponse + 256, EvEnd @@ -235,5 +236,8 @@ struct TEvPersQueue { struct TEvCancelTransactionProposal : public TEventPB<TEvCancelTransactionProposal, NKikimrPQ::TEvCancelTransactionProposal, EvCancelTransactionProposal> { }; + struct TEvPeriodicTopicStats : public TEventPB<TEvPeriodicTopicStats, NKikimrPQ::TEvPeriodicTopicStats, EvPeriodicTopicStats> { + }; + }; } //NKikimr diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index d885758c10..f47a432d6e 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1107,15 +1107,26 @@ void TPartition::HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActo } -ui64 TPartition::GetUsedStorage(const TActorContext& ctx) { - auto duration = ctx.Now() - LastUsedStorageMeterTimestamp; - LastUsedStorageMeterTimestamp = ctx.Now(); +ui64 TPartition::MeteringDataSize(const TActorContext& ctx) const { ui64 size = Size(); - if (DataKeysBody.size() > 0) { + if (!DataKeysBody.empty()) { size -= DataKeysBody.front().Size; - } else { - size = 0; } + auto expired = ctx.Now() - TDuration::Seconds(Config.GetPartitionConfig().GetLifetimeSeconds()); + for(size_t i = 0; i < HeadKeys.size(); ++i) { + auto& key = HeadKeys[i]; + if (expired < key.Timestamp) { + break; + } + size -= key.Size; + } + return size; +} + +ui64 TPartition::GetUsedStorage(const TActorContext& ctx) { + auto duration = ctx.Now() - LastUsedStorageMeterTimestamp; + LastUsedStorageMeterTimestamp = ctx.Now(); + ui64 size = MeteringDataSize(ctx); return size * duration.MilliSeconds() / 1000 / 1_MB; // mb*seconds } @@ -2203,7 +2214,8 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext result.SetReadBytesQuota(maxQuota); - result.SetPartitionSize(Size()); + result.SetPartitionSize(MeteringDataSize(ctx)); + result.SetUsedReserveSize(UsedReserveSize()); result.SetStartOffset(StartOffset); result.SetEndOffset(EndOffset); @@ -2212,6 +2224,13 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext *result.MutableErrors() = {Errors.begin(), Errors.end()}; + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, + "Topic PartitionStatus PartitionSize: " << result.GetPartitionSize() + << " UsedReserveSize: " << result.GetUsedReserveSize() + << " ReserveSize: " << ReserveSize() + << " PartitionConfig" << Config.GetPartitionConfig(); + ); + ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionStatusResponse(result)); } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 880e5aea9f..6379b7e14b 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -8,6 +8,7 @@ #include "sourceid.h" #include "subscriber.h" #include "user_info.h" +#include "utils.h" #include <ydb/core/keyvalue/keyvalue_events.h> #include <ydb/library/persqueue/counter_time_keeper/counter_time_keeper.h> @@ -308,6 +309,15 @@ public: return BodySize + Head.PackedSize; } + ui64 MeteringDataSize(const TActorContext& ctx) const; + + ui64 UsedReserveSize() { + return std::min<ui64>(Size(), ReserveSize()); + } + + ui64 ReserveSize() { + return TopicPartitionReserveSize(Config); + } //Bootstrap sends kvRead diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 89676eef2e..89af42f079 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -199,6 +199,9 @@ TString TPersQueueReadBalancer::GenerateStat() { TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedMin: " << TotalAvgSpeedMin << "/" << MaxAvgSpeedMin << "/" << TotalAvgSpeedMin / NumActiveParts;} TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedHour: " << TotalAvgSpeedHour << "/" << MaxAvgSpeedHour << "/" << TotalAvgSpeedHour / NumActiveParts;} TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedDay: " << TotalAvgSpeedDay << "/" << MaxAvgSpeedDay << "/" << TotalAvgSpeedDay / NumActiveParts;} + TAG(TH3) {str << "TotalDataSize: " << TotalDataSize;} + TAG(TH3) {str << "ReserveSize: " << PartitionReserveSize();} + TAG(TH3) {str << "TotalUsedReserveSize: " << TotalUsedReserveSize;} } UL_CLASS("nav nav-tabs") { @@ -624,15 +627,10 @@ void TPersQueueReadBalancer::RestartPipe(const ui64 tabletId, const TActorContex } } - -void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TActorContext& ctx) -{ - if ((tabletId == SchemeShardId && !WaitingForACL) || - (tabletId != SchemeShardId && !WaitingForStat.contains(tabletId))) - return; +TActorId TPersQueueReadBalancer::GetPipeClient(const ui64 tabletId, const TActorContext& ctx) { + TActorId pipeClient; auto it = TabletPipes.find(tabletId); - TActorId pipeClient; if (it == TabletPipes.end()) { NTabletPipe::TClientConfig clientConfig; pipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig)); @@ -640,6 +638,17 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA } else { pipeClient = it->second; } + + return pipeClient; +} + +void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TActorContext& ctx) +{ + if ((tabletId == SchemeShardId && !WaitingForACL) || + (tabletId != SchemeShardId && !WaitingForStat.contains(tabletId))) + return; + + TActorId pipeClient = GetPipeClient(tabletId, ctx); if (tabletId == SchemeShardId) { NTabletPipe::SendData(ctx, pipeClient, new NSchemeShard::TEvSchemeShard::TEvDescribeScheme(tabletId, PathId)); } else { @@ -664,6 +673,9 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c MaxAvgSpeedHour = Max<ui64>(MaxAvgSpeedHour, partRes.GetAvgWriteSpeedPerHour()); TotalAvgSpeedDay += partRes.GetAvgWriteSpeedPerDay(); MaxAvgSpeedDay = Max<ui64>(MaxAvgSpeedDay, partRes.GetAvgWriteSpeedPerDay()); + + TotalDataSize += partRes.GetPartitionSize(); + TotalUsedReserveSize += partRes.GetUsedReserveSize(); } if (WaitingForStat.empty()) { CheckStat(ctx); @@ -708,6 +720,23 @@ void TPersQueueReadBalancer::CheckStat(const TActorContext& ctx) { Y_UNUSED(ctx); //TODO: Deside about changing number of partitions and send request to SchemeShard //TODO: make AlterTopic request via TX_PROXY + + TEvPersQueue::TEvPeriodicTopicStats* ev = new TEvPersQueue::TEvPeriodicTopicStats(); + auto& rec = ev->Record; + rec.SetPathId(PathId); + rec.SetGeneration(Generation); + rec.SetRound(++StatsReportRound); + rec.SetDataSize(TotalDataSize); + rec.SetUsedReserveSize(TotalUsedReserveSize); + + LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, + TStringBuilder() << "Send TEvPeriodicTopicStats PathId: " << PathId + << " Generation: " << Generation + << " StatsReportRound: " << StatsReportRound + << " DataSize: " << TotalDataSize + << " UsedReserveSize: " << TotalUsedReserveSize); + + NTabletPipe::SendData(ctx, GetPipeClient(SchemeShardId, ctx), ev); } void TPersQueueReadBalancer::GetStat(const TActorContext& ctx) { @@ -717,6 +746,8 @@ void TPersQueueReadBalancer::GetStat(const TActorContext& ctx) { TotalAvgSpeedMin = MaxAvgSpeedMin = 0; TotalAvgSpeedHour = MaxAvgSpeedHour = 0; TotalAvgSpeedDay = MaxAvgSpeedDay = 0; + TotalDataSize = 0; + TotalUsedReserveSize = 0; for (auto& p : PartitionsInfo) { const ui64& tabletId = p.second.TabletId; bool res = WaitingForStat.insert(tabletId).second; diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h index cd74aa3a8d..7b0c945c02 100644 --- a/ydb/core/persqueue/read_balancer.h +++ b/ydb/core/persqueue/read_balancer.h @@ -1,5 +1,7 @@ #pragma once +#include "utils.h" + #include <util/system/hp_timer.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> @@ -178,9 +180,11 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa } void HandleWakeup(TEvents::TEvWakeup::TPtr&, const TActorContext &ctx) { + LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, TStringBuilder() << "TPersQueueReadBalancer::HandleWakeup"); + GetStat(ctx); //TODO: do it only on signals from outerspace right now - ctx.Schedule(TDuration::Seconds(30), new TEvents::TEvWakeup()); //TODO: remove it + ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetBalancerWakeupIntervalSec()), new TEvents::TEvWakeup()); //TODO: remove it } void HandleUpdateACL(TEvPersQueue::TEvUpdateACL::TPtr&, const TActorContext &ctx) { @@ -242,7 +246,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa } RegisterEvents.clear(); - ctx.Schedule(TDuration::Seconds(30), new TEvents::TEvWakeup()); //TODO: remove it + ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetBalancerWakeupIntervalSec()), new TEvents::TEvWakeup()); //TODO: remove it ctx.Send(ctx.SelfID, new TEvPersQueue::TEvUpdateACL()); } @@ -270,6 +274,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa TStringBuilder GetPrefix() const; + TActorId GetPipeClient(const ui64 tabletId, const TActorContext&); void RequestTabletIfNeeded(const ui64 tabletId, const TActorContext&); void RestartPipe(const ui64 tabletId, const TActorContext&); void CheckStat(const TActorContext&); @@ -293,6 +298,10 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa struct TPipeInfo; void UnregisterSession(const TActorId& pipe, const TActorContext& ctx); void RebuildStructs(); + ui64 PartitionReserveSize() { + return TopicPartitionReserveSize(TabletConfig); + } + bool Inited; ui64 PathId; @@ -431,6 +440,10 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa ui64 MaxAvgSpeedHour; ui64 TotalAvgSpeedDay; ui64 MaxAvgSpeedDay; + ui64 TotalDataSize; + ui64 TotalUsedReserveSize; + + ui64 StatsReportRound; std::deque<TAutoPtr<TEvPersQueue::TEvRegisterReadSession>> RegisterEvents; std::deque<TAutoPtr<TEvPersQueue::TEvPersQueue::TEvUpdateBalancerConfig>> UpdateEvents; @@ -467,6 +480,9 @@ public: , MaxAvgSpeedHour(0) , TotalAvgSpeedDay(0) , MaxAvgSpeedDay(0) + , TotalDataSize(0) + , TotalUsedReserveSize(0) + , StatsReportRound(0) {} STFUNC(StateInit) { diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp index 515f65ddb0..b4d739612d 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp @@ -286,7 +286,11 @@ void PQTabletRestart(TTestActorRuntime& runtime, ui64 tabletId, TActorId edge) { } TActorId SetOwner(const ui32 partition, TTestContext& tc, const TString& owner, bool force) { - TActorId pipeClient = tc.Runtime->ConnectToPipe(tc.TabletId, tc.Edge, 0, GetPipeConfigWithRetries()); + return SetOwner(tc.Runtime.Get(), tc.TabletId, tc.Edge, partition, owner, force); +} + +TActorId SetOwner(TTestActorRuntime* runtime, ui64 tabletId, const TActorId& sender, const ui32 partition, const TString& owner, bool force) { + TActorId pipeClient = runtime->ConnectToPipe(tabletId, sender, 0, GetPipeConfigWithRetries()); THolder<TEvPersQueue::TEvRequest> request; @@ -297,7 +301,7 @@ TActorId SetOwner(const ui32 partition, TTestContext& tc, const TString& owner, req->MutableCmdGetOwnership()->SetForce(force); ActorIdToProto(pipeClient, req->MutablePipeClient()); - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries(), pipeClient); + runtime->SendToPipe(tabletId, sender, request.Release(), 0, GetPipeConfigWithRetries(), pipeClient); return pipeClient; } @@ -380,34 +384,38 @@ void WaitPartition(const TString &session, TTestContext& tc, ui32 partition, con } std::pair<TString, TActorId> CmdSetOwner(const ui32 partition, TTestContext& tc, const TString& owner, bool force) { + return CmdSetOwner(tc.Runtime.Get(), tc.TabletId, tc.Edge, partition, owner, force); +} + +std::pair<TString, TActorId> CmdSetOwner(TTestActorRuntime* runtime, ui64 tabletId, const TActorId& sender, const ui32 partition, const TString& owner, bool force) { TAutoPtr<IEventHandle> handle; TEvPersQueue::TEvResponse *result; TString cookie; TActorId pipeClient; for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { try { - tc.Runtime->ResetScheduledCount(); + runtime->ResetScheduledCount(); - pipeClient = SetOwner(partition, tc, owner, force); + pipeClient = SetOwner(runtime, tabletId, sender, partition, owner, force); - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); + result = runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); UNIT_ASSERT(result); UNIT_ASSERT(result->Record.HasStatus()); if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress + runtime->DispatchEvents(); // Dispatch events so that initialization can make progress retriesLeft = 3; continue; } if (result->Record.GetErrorReason().StartsWith("ownership session is killed by another session with id ")) { - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); + result = runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); UNIT_ASSERT(result); UNIT_ASSERT(result->Record.HasStatus()); } if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress + runtime->DispatchEvents(); // Dispatch events so that initialization can make progress retriesLeft = 3; continue; } @@ -483,8 +491,13 @@ void WritePartDataWithBigMsg(const ui32 partition, const TString& sourceId, cons void WriteData(const ui32 partition, const TString& sourceId, const TVector<std::pair<ui64, TString>> data, TTestContext& tc, const TString& cookie, i32 msgSeqNo, i64 offset, bool disableDeduplication) { + WriteData(tc.Runtime.Get(), tc.TabletId, tc.Edge, partition, sourceId, data, cookie, msgSeqNo, offset, disableDeduplication); +} + +void WriteData(TTestActorRuntime* runtime, ui64 tabletId, const TActorId& sender, const ui32 partition, const TString& sourceId, + const TVector<std::pair<ui64, TString>> data, const TString& cookie, i32 msgSeqNo, i64 offset, bool disableDeduplication) { THolder<TEvPersQueue::TEvRequest> request; - tc.Runtime->ResetScheduledCount(); + runtime->ResetScheduledCount(); request.Reset(new TEvPersQueue::TEvRequest); auto req = request->Record.MutablePartitionRequest(); req->SetPartition(partition); @@ -499,7 +512,7 @@ void WriteData(const ui32 partition, const TString& sourceId, const TVector<std: write->SetData(p.second); write->SetDisableDeduplication(disableDeduplication); } - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + runtime->SendToPipe(tabletId, sender, request.Release(), 0, GetPipeConfigWithRetries()); } void CmdWrite(const ui32 partition, const TString& sourceId, const TVector<std::pair<ui64, TString>> data, @@ -507,16 +520,26 @@ void CmdWrite(const ui32 partition, const TString& sourceId, const TVector<std:: bool isFirst, const TString& ownerCookie, i32 msn, i64 offset, bool treatWrongCookieAsError, bool treatBadOffsetAsError, bool disableDeduplication) { + CmdWrite(tc.Runtime.Get(), tc.TabletId, tc.Edge, partition, sourceId, tc.MsgSeqNoMap[partition], + data, error, alreadyWrittenSeqNo, isFirst, ownerCookie, msn, offset, treatWrongCookieAsError, treatBadOffsetAsError, disableDeduplication); + +} + +void CmdWrite(TTestActorRuntime* runtime, ui64 tabletId, const TActorId& sender, const ui32 partition, + const TString& sourceId, ui32& msgSeqNo, const TVector<std::pair<ui64, TString>> data, + bool error, const THashSet<ui32>& alreadyWrittenSeqNo, + bool isFirst, const TString& ownerCookie, i32 msn, i64 offset, + bool treatWrongCookieAsError, bool treatBadOffsetAsError, + bool disableDeduplication) { TAutoPtr<IEventHandle> handle; TEvPersQueue::TEvResponse *result; - ui32& msgSeqNo = tc.MsgSeqNoMap[partition]; if (msn != -1) msgSeqNo = msn; TString cookie = ownerCookie; for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { try { - WriteData(partition, sourceId, data, tc, cookie, msgSeqNo, offset, disableDeduplication); - result = tc.Runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle, + WriteData(runtime, tabletId, sender, partition, sourceId, data, cookie, msgSeqNo, offset, disableDeduplication); + result = runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle, [](const TEvPersQueue::TEvResponse& ev){ if (ev.Record.HasPartitionResponse() && ev.Record.GetPartitionResponse().CmdWriteResultSize() > 0 || @@ -528,14 +551,14 @@ void CmdWrite(const ui32 partition, const TString& sourceId, const TVector<std:: UNIT_ASSERT(result); UNIT_ASSERT(result->Record.HasStatus()); if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress + runtime->DispatchEvents(); // Dispatch events so that initialization can make progress retriesLeft = 3; continue; } if (!treatWrongCookieAsError && result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE) { - cookie = CmdSetOwner(partition, tc).first; + cookie = CmdSetOwner(runtime, tabletId, sender, partition).first; msgSeqNo = 0; retriesLeft = 3; continue; diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h index 3f62e1183f..47166af066 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.h +++ b/ydb/core/persqueue/ut/common/pq_ut_common.h @@ -281,6 +281,14 @@ TActorId SetOwner( const TString& owner, bool force); +TActorId SetOwner( + TTestActorRuntime* runtime, + ui64 tabletId, + const TActorId& sender, + const ui32 partition, + const TString& owner, + bool force); + void FillDeprecatedUserInfo( NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, @@ -326,6 +334,18 @@ void WriteData( i64 offset, bool disableDeduplication = false); +void WriteData( + TTestActorRuntime* runtime, + ui64 tabletId, + const TActorId& sender, + const ui32 partition, + const TString& sourceId, + const TVector<std::pair<ui64, TString>> data, + const TString& cookie, + i32 msgSeqNo, + i64 offset, + bool disableDeduplication = false); + void WritePartData( const ui32 partition, const TString& sourceId, @@ -363,6 +383,14 @@ std::pair<TString, TActorId> CmdSetOwner( const TString& owner = "default", bool force = true); +std::pair<TString, TActorId> CmdSetOwner( + TTestActorRuntime* runtime, + ui64 tabletId, + const TActorId& sender, + const ui32 partition, + const TString& owner = "default", + bool force = true); + void CmdCreateSession( const ui32 partition, const TString& user, @@ -436,4 +464,22 @@ void CmdWrite( bool treatBadOffsetAsError = true, bool disableDeduplication = false); +void CmdWrite( + TTestActorRuntime* runtime, + ui64 tabletId, + const TActorId& sender, + const ui32 partition, + const TString& sourceId, + ui32& msgSeqNo, + const TVector<std::pair<ui64, TString>> data, + bool error = false, + const THashSet<ui32>& alreadyWrittenSeqNo = {}, + bool isFirst = false, + const TString& ownerCookie = "", + i32 msn = -1, + i64 offset = -1, + bool treatWrongCookieAsError = false, + bool treatBadOffsetAsError = true, + bool disableDeduplication = false); + } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp new file mode 100644 index 0000000000..285640ffe6 --- /dev/null +++ b/ydb/core/persqueue/utils.cpp @@ -0,0 +1,22 @@ +#include "utils.h" + +namespace NKikimr::NPQ { + +ui64 TopicPartitionReserveSize(const NKikimrPQ::TPQTabletConfig& config) { + if (NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS == config.GetMeteringMode()) { + return 0; + } + if (config.GetPartitionConfig().HasStorageLimitBytes()) { + return config.GetPartitionConfig().GetStorageLimitBytes(); + } + return config.GetPartitionConfig().GetLifetimeSeconds() * config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(); +} + +ui64 TopicPartitionReserveThroughput(const NKikimrPQ::TPQTabletConfig& config) { + if (NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS == config.GetMeteringMode()) { + return 0; + } + return config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(); +} + +} // NKikimr::NPQ diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h new file mode 100644 index 0000000000..806e9950ca --- /dev/null +++ b/ydb/core/persqueue/utils.h @@ -0,0 +1,10 @@ +#pragma once + +#include <ydb/core/protos/pqconfig.pb.h> + +namespace NKikimr::NPQ { + +ui64 TopicPartitionReserveSize(const NKikimrPQ::TPQTabletConfig& config); +ui64 TopicPartitionReserveThroughput(const NKikimrPQ::TPQTabletConfig& config); + +} // NKikimr::NPQ diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index 0406f2c064..505a8ed6aa 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -183,6 +183,8 @@ enum ESimpleCounters { COUNTER_IN_FLIGHT_OPS_TxCreateExternalDataSource = 148 [(CounterOpts) = {Name: "InFlightOps/CreateExternalDataSource"}]; COUNTER_IN_FLIGHT_OPS_TxDropExternalDataSource = 149 [(CounterOpts) = {Name: "InFlightOps/DropExternalDataSource"}]; COUNTER_IN_FLIGHT_OPS_TxAlterExternalDataSource = 150 [(CounterOpts) = {Name: "InFlightOps/AlterExternalDataSource"}]; + + COUNTER_PQ_STATS_QUEUE_SIZE = 151 [(CounterOpts) = {Name: "PQStatsQueueSize"}]; } enum ECumulativeCounters { @@ -297,6 +299,8 @@ enum ECumulativeCounters { COUNTER_FINISHED_OPS_TxCreateExternalDataSource = 91 [(CounterOpts) = {Name: "FinishedOps/CreateExternalDataSource"}]; COUNTER_FINISHED_OPS_TxDropExternalDataSource = 92 [(CounterOpts) = {Name: "FinishedOps/DropExternalDataSource"}]; COUNTER_FINISHED_OPS_TxAlterExternalDataSource = 93 [(CounterOpts) = {Name: "FinishedOps/AlterExternalDataSource"}]; + + COUNTER_PQ_STATS_WRITTEN = 94 [(CounterOpts) = {Name: "PQStatsWritten"}]; } enum EPercentileCounters { @@ -399,6 +403,17 @@ enum EPercentileCounters { Ranges: { Value: 500000 Name: "500 ms" } Ranges: { Value: 1000000 Name: "1000 ms" } }]; + + COUNTER_PQ_STATS_BATCH_LATENCY = 6 [(CounterOpts) = { + Name: "PQStatsBatchLatency", + Ranges: { Value: 1000 Name: "1 ms" } + Ranges: { Value: 10000 Name: "10 ms" } + Ranges: { Value: 50000 Name: "50 ms" } + Ranges: { Value: 100000 Name: "100 ms" } + Ranges: { Value: 200000 Name: "200 ms" } + Ranges: { Value: 500000 Name: "500 ms" } + Ranges: { Value: 1000000 Name: "1000 ms" } + }]; } enum ETxTypes { diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 9aaf0bb96f..f218896b49 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -186,6 +186,8 @@ message TPQConfig { } optional TMoveTopicActorConfig MoveTopicActorConfig = 51; + + optional uint64 BalancerWakeupIntervalSec = 54 [default = 30]; } message TChannelProfile { @@ -674,6 +676,8 @@ message TStatusResponse { repeated TErrorMessage Errors = 29; repeated TConsumerResult ConsumerResult = 30; + + optional int64 UsedReserveSize = 31; } message TConsumerResult { @@ -829,6 +833,16 @@ message TEvCancelTransactionProposal { optional uint64 TxId = 1; }; +message TEvPeriodicTopicStats { + required uint64 PathId = 1; + + required uint64 Generation = 2; + required uint64 Round = 3; + + required uint64 DataSize = 4; + required uint64 UsedReserveSize = 5; +}; + message TTransaction { enum EState { UNKNOWN = 0; diff --git a/ydb/core/protos/subdomains.proto b/ydb/core/protos/subdomains.proto index 4ac4a22cb5..77955cca5b 100644 --- a/ydb/core/protos/subdomains.proto +++ b/ydb/core/protos/subdomains.proto @@ -49,6 +49,9 @@ message TDiskSpaceUsage { message TTopics { // in bytes optional uint64 ReserveSize = 1; + optional uint64 AccountSize = 2; + optional uint64 DataSize = 3; + optional uint64 UsedReserveSize = 4; } optional TTables Tables = 1; diff --git a/ydb/core/tx/schemeshard/CMakeLists.darwin.txt b/ydb/core/tx/schemeshard/CMakeLists.darwin.txt index abd0c5b9b2..7c311d3759 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.darwin.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.darwin.txt @@ -197,6 +197,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_allocate_pq.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__publish_to_scheme_board.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__state_changed_reply.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt index b24c64ba3a..d80441560c 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt @@ -198,6 +198,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_allocate_pq.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__publish_to_scheme_board.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__state_changed_reply.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux.txt b/ydb/core/tx/schemeshard/CMakeLists.linux.txt index b24c64ba3a..d80441560c 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.linux.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.linux.txt @@ -198,6 +198,7 @@ target_sources(core-tx-schemeshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_allocate_pq.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__publish_to_scheme_board.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__state_changed_reply.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp index 11f8af0f9a..5284f79137 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp @@ -117,6 +117,7 @@ public: domainInfo->DecPathsInside(); domainInfo->DecPQPartitionsInside(pqGroup->TotalPartitionCount); domainInfo->DecPQReservedStorage(reserve.Storage); + domainInfo->AggrDiskSpaceUsage({}, pqGroup->Stats); context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_THROUGHPUT].Sub(reserve.Throughput); context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_STORAGE].Sub(reserve.Storage); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp index 9926d0fc48..36ccb1fe09 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp @@ -189,6 +189,7 @@ public: domainInfo->DecPathsInside(); domainInfo->DecPQPartitionsInside(pqGroup->TotalPartitionCount); domainInfo->DecPQReservedStorage(reserve.Storage); + domainInfo->AggrDiskSpaceUsage({}, pqGroup->Stats); context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_THROUGHPUT].Sub(reserve.Throughput); context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_STORAGE].Sub(reserve.Storage); diff --git a/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp new file mode 100644 index 0000000000..34f0f86dae --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp @@ -0,0 +1,123 @@ +#include "schemeshard_impl.h" +#include "schemeshard__stats_impl.h" + +#include <ydb/core/base/appdata.h> +#include <ydb/core/base/cputime.h> +#include <ydb/core/protos/sys_view.pb.h> + +namespace NKikimr { +namespace NSchemeShard { + +class TTxStoreTopicStats: public TTxStoreStats<TEvPersQueue::TEvPeriodicTopicStats> { + TSideEffects MergeOpSideEffects; + +public: + TTxStoreTopicStats(TSchemeShard* ss, TStatsQueue<TEvPersQueue::TEvPeriodicTopicStats>& queue, bool& persistStatsPending) + : TTxStoreStats(ss, queue, persistStatsPending) + { + } + + virtual ~TTxStoreTopicStats() = default; + + void Complete(const TActorContext& ) override {}; + + // returns true to continue batching + bool PersistSingleStats(const TPathId& pathId, const TStatsQueue<TEvPersQueue::TEvPeriodicTopicStats>::TItem& item, TTransactionContext& txc, const TActorContext& ctx) override; +}; + + +bool TTxStoreTopicStats::PersistSingleStats(const TPathId& pathId, const TStatsQueueItem<TEvPersQueue::TEvPeriodicTopicStats>& item, TTransactionContext& txc, const TActorContext& ctx) { + const auto& rec = item.Ev->Get()->Record; + + TTopicStats newStats; + newStats.SeqNo = TMessageSeqNo(rec.GetGeneration(), rec.GetRound()); + newStats.DataSize = rec.GetDataSize(); + newStats.UsedReserveSize = rec.GetUsedReserveSize(); + + auto& topic = Self->Topics[pathId]; + auto& oldStats = topic->Stats; + + if (newStats.SeqNo <= oldStats.SeqNo) { + // Ignore outdated message + return true; + } + + auto subDomainInfo = Self->ResolveDomainInfo(pathId); + subDomainInfo->AggrDiskSpaceUsage(newStats, oldStats); + + oldStats = newStats; + + if (subDomainInfo->CheckDiskSpaceQuotas(Self)) { + NIceDb::TNiceDb db(txc.DB); + + auto subDomainId = Self->ResolvePathIdForDomain(pathId); + Self->PersistSubDomainState(db, subDomainId, *subDomainInfo); + + // Publish is done in a separate transaction, so we may call this directly + TDeque<TPathId> toPublish; + toPublish.push_back(subDomainId); + Self->PublishToSchemeBoard(TTxId(), std::move(toPublish), ctx); + } + + return true; +} + + +void TSchemeShard::Handle(TEvPersQueue::TEvPeriodicTopicStats::TPtr& ev, const TActorContext& ctx) { + const auto& rec = ev->Get()->Record; + + const TPathId pathId = TPathId(TabletID(), rec.GetPathId()); + + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Got periodic topic stats at partition " << pathId + << " DataSize " << rec.GetDataSize() + << " UsedReserveSize " << rec.GetUsedReserveSize()); + + TStatsId statsId(pathId); + switch(TopicStatsQueue.Add(statsId, ev.Release())) { + case READY: + ExecuteTopicStatsBatch(ctx); + break; + + case NOT_READY: + ScheduleTopicStatsBatch(ctx); + break; + + default: + Y_FAIL("Unknown batch status"); + } +} + +void TSchemeShard::Handle(TEvPrivate::TEvPersistTopicStats::TPtr&, const TActorContext& ctx) { + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Started TEvPersistStats at tablet " << TabletID() << ", queue size# " << TopicStatsQueue.Size()); + + TopicStatsBatchScheduled = false; + ExecuteTopicStatsBatch(ctx); +} + +void TSchemeShard::ExecuteTopicStatsBatch(const TActorContext& ctx) { + if (!TopicPersistStatsPending && !TopicStatsQueue.Empty()) { + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Will execute TTxStoreStats, queue# " << TopicStatsQueue.Size()); + + TopicPersistStatsPending = true; + Execute(new TTxStoreTopicStats(this, TopicStatsQueue, TopicPersistStatsPending), ctx); + + ScheduleTopicStatsBatch(ctx); + } +} + +void TSchemeShard::ScheduleTopicStatsBatch(const TActorContext& ctx) { + if (!TopicStatsBatchScheduled && !TopicStatsQueue.Empty()) { + TDuration delay = TopicStatsQueue.Delay(); + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Will delay TTxStoreTopicStats on# " << delay << ", queue# " << TopicStatsQueue.Size()); + + ctx.Schedule(delay, new TEvPrivate::TEvPersistTopicStats()); + TopicStatsBatchScheduled = true; + } +} + +} // NSchemeShard +} // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard__stats.h b/ydb/core/tx/schemeshard/schemeshard__stats.h index 659e57f087..b9abf7ab34 100644 --- a/ydb/core/tx/schemeshard/schemeshard__stats.h +++ b/ydb/core/tx/schemeshard/schemeshard__stats.h @@ -14,7 +14,7 @@ struct TStatsId { TPathId PathId; TTabletId Datashard; - TStatsId(const TPathId& pathId, const TTabletId& datashard) + TStatsId(const TPathId& pathId, const TTabletId datashard = TTabletId(0)) : PathId(pathId) , Datashard(datashard) { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index ab7f1a0c77..b061571cce 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -3967,6 +3967,10 @@ TSchemeShard::TSchemeShard(const TActorId &tablet, TTabletStorageInfo *info) COUNTER_STATS_QUEUE_SIZE, COUNTER_STATS_WRITTEN, COUNTER_STATS_BATCH_LATENCY) + , TopicStatsQueue(this, + COUNTER_PQ_STATS_QUEUE_SIZE, + COUNTER_PQ_STATS_WRITTEN, + COUNTER_PQ_STATS_BATCH_LATENCY) , AllowDataColumnForIndexTable(0, 0, 1) { TabletCountersPtr.Reset(new TProtobufTabletCounters< @@ -4240,6 +4244,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvDataShard::TEvSplitAck, Handle); HFuncTraced(TEvDataShard::TEvSplitPartitioningChangedAck, Handle); HFuncTraced(TEvDataShard::TEvPeriodicTableStats, Handle); + HFuncTraced(TEvPersQueue::TEvPeriodicTopicStats, Handle); HFuncTraced(TEvDataShard::TEvGetTableStatsResult, Handle); // @@ -4355,6 +4360,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvPrivate::TEvSubscribeToShardDeletion, Handle); HFuncTraced(TEvPrivate::TEvPersistTableStats, Handle); + HFuncTraced(TEvPrivate::TEvPersistTopicStats, Handle); HFuncTraced(TEvSchemeShard::TEvLogin, Handle); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index ae0cf92367..4a8f33b8b5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -286,6 +286,10 @@ public: bool TablePersistStatsPending = false; TStatsQueue<TEvDataShard::TEvPeriodicTableStats> TableStatsQueue; + bool TopicStatsBatchScheduled = false; + bool TopicPersistStatsPending = false; + TStatsQueue<TEvPersQueue::TEvPeriodicTopicStats> TopicStatsQueue; + TSet<TPathId> CleanDroppedPathsCandidates; TSet<TPathId> CleanDroppedSubDomainsCandidates; bool CleanDroppedPathsInFly = false; @@ -989,6 +993,11 @@ public: void Handle(TEvDataShard::TEvPeriodicTableStats::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvGetTableStatsResult::TPtr& ev, const TActorContext& ctx); + void ExecuteTopicStatsBatch(const TActorContext& ctx); + void ScheduleTopicStatsBatch(const TActorContext& ctx); + void Handle(TEvPrivate::TEvPersistTopicStats::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPersQueue::TEvPeriodicTopicStats::TPtr& ev, const TActorContext& ctx); + void Handle(TEvSchemeShard::TEvFindTabletSubDomainPathId::TPtr& ev, const TActorContext& ctx); void ScheduleConditionalEraseRun(const TActorContext& ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 023610be4e..1f566a98ed 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -985,6 +985,20 @@ struct TColumnTableInfo : TSimpleRefCount<TColumnTableInfo> { } }; +struct TTopicStats { + TMessageSeqNo SeqNo; + + ui64 DataSize = 0; + ui64 UsedReserveSize = 0; + + TString ToString() const { + return TStringBuilder() << "TTopicStats {" + << " DataSize: " << DataSize + << " UsedReserveSize: " << UsedReserveSize + << " }"; + } +}; + struct TTopicTabletInfo : TSimpleRefCount<TTopicTabletInfo> { using TPtr = TIntrusivePtr<TTopicTabletInfo>; using TKeySchema = TVector<NScheme::TTypeInfo>; @@ -1177,6 +1191,8 @@ struct TTopicInfo : TSimpleRefCount<TTopicInfo> { TString PreSerializedPathDescription; // Cached path description TString PreSerializedPartitionsDescription; // Cached partition description + TTopicStats Stats; + bool FillKeySchema(const NKikimrPQ::TPQTabletConfig& tabletConfig, TString& error); bool FillKeySchema(const TString& tabletConfig); @@ -1322,6 +1338,12 @@ struct TSubDomainInfo: TSimpleRefCount<TSubDomainInfo> { ui64 DataSize = 0; ui64 IndexSize = 0; } Tables; + + struct TTopics { + ui64 DataSize = 0; + ui64 ReserveSize = 0; + ui64 UsedReserveSize = 0; + } Topics; }; struct TDiskSpaceQuotas { @@ -1835,6 +1857,11 @@ struct TSubDomainInfo: TSimpleRefCount<TSubDomainInfo> { counters->ChangeDiskSpaceTablesTotalBytes(newTotalBytes - oldTotalBytes); } + void AggrDiskSpaceUsage(const TTopicStats& newAggr, const TTopicStats& oldAggr = {}) { + DiskSpaceUsage.Topics.DataSize += (newAggr.DataSize - oldAggr.DataSize); + DiskSpaceUsage.Topics.UsedReserveSize += (newAggr.UsedReserveSize - oldAggr.UsedReserveSize); + } + const TDiskSpaceUsage& GetDiskSpaceUsage() const { return DiskSpaceUsage; } diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index 3e81f0fb91..525c6b81e9 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -674,6 +674,10 @@ void TPathDescriber::DescribeDomainRoot(TPathElement::TPtr pathEl) { diskSpaceUsage->MutableTables()->SetDataSize(subDomainInfo->GetDiskSpaceUsage().Tables.DataSize); diskSpaceUsage->MutableTables()->SetIndexSize(subDomainInfo->GetDiskSpaceUsage().Tables.IndexSize); diskSpaceUsage->MutableTopics()->SetReserveSize(subDomainInfo->GetPQReservedStorage()); + ui64 accountSize = subDomainInfo->GetDiskSpaceUsage().Topics.DataSize - subDomainInfo->GetDiskSpaceUsage().Topics.UsedReserveSize + subDomainInfo->GetPQReservedStorage(); + diskSpaceUsage->MutableTopics()->SetAccountSize(accountSize); + diskSpaceUsage->MutableTopics()->SetDataSize(subDomainInfo->GetDiskSpaceUsage().Topics.DataSize); + diskSpaceUsage->MutableTopics()->SetUsedReserveSize(subDomainInfo->GetDiskSpaceUsage().Topics.UsedReserveSize); if (subDomainInfo->GetDeclaredSchemeQuotas()) { entry->MutableDeclaredSchemeQuotas()->CopyFrom(*subDomainInfo->GetDeclaredSchemeQuotas()); diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.h b/ydb/core/tx/schemeshard/schemeshard_path_describer.h index 2e40315cfc..4fe210d5ae 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.h +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.h @@ -32,7 +32,7 @@ class TPathDescriber { void DescribeSolomonVolume(TPathId pathId, TPathElement::TPtr pathEl, bool returnChannelsBinding); void DescribeUserAttributes(TPathElement::TPtr pathEl); void DescribePathVersion(const TPath& path); - void DescribeDomain(TPathElement::TPtr pathEl); + void DescribeDomain(TPathElement::TPtr pathEl) ; void DescribeDomainRoot(TPathElement::TPtr pathEl); void DescribeDomainExtra(TPathElement::TPtr pathEl); void DescribeRevertedMigrations(TPathElement::TPtr pathEl); diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h index fbe6c1e04d..b1d7581f82 100644 --- a/ydb/core/tx/schemeshard/schemeshard_private.h +++ b/ydb/core/tx/schemeshard/schemeshard_private.h @@ -27,6 +27,7 @@ struct TEvPrivate { EvPersistTableStats, EvConsoleConfigsTimeout, EvRunCdcStreamScan, + EvPersistTopicStats, EvEnd }; @@ -163,6 +164,10 @@ struct TEvPrivate { TEvPersistTableStats() = default; }; + struct TEvPersistTopicStats: public TEventLocal<TEvPersistTopicStats, EvPersistTopicStats> { + TEvPersistTopicStats() = default; + }; + struct TEvConsoleConfigsTimeout: public TEventLocal<TEvConsoleConfigsTimeout, EvConsoleConfigsTimeout> { }; diff --git a/ydb/core/tx/schemeshard/schemeshard_utils.cpp b/ydb/core/tx/schemeshard/schemeshard_utils.cpp index 84c6fd44d8..b94c2a79f8 100644 --- a/ydb/core/tx/schemeshard/schemeshard_utils.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_utils.cpp @@ -1,6 +1,7 @@ #include "schemeshard_utils.h" #include <ydb/core/mind/hive/hive.h> +#include <ydb/core/persqueue/utils.h> #include <ydb/core/protos/counters_schemeshard.pb.h> namespace NKikimr { @@ -200,22 +201,8 @@ void TSelfPinger::ScheduleSelfPingWakeup(const NActors::TActorContext &ctx) { } PQGroupReserve::PQGroupReserve(const ::NKikimrPQ::TPQTabletConfig& tabletConfig, ui64 partitions) { - const auto& partitionConfig = tabletConfig.GetPartitionConfig(); - - if (tabletConfig.GetMeteringMode() == NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY) { - const ui64 throughput = partitions * partitionConfig.GetWriteSpeedInBytesPerSecond(); - - Throughput = throughput; - - if (partitionConfig.HasStorageLimitBytes()) { - Storage = partitions * partitionConfig.GetStorageLimitBytes(); - } else { - Storage = throughput * partitionConfig.GetLifetimeSeconds(); - } - } else { - Throughput = 0; - Storage = 0; - } + Storage = partitions * NPQ::TopicPartitionReserveSize(tabletConfig); + Throughput = partitions * NPQ::TopicPartitionReserveThroughput(tabletConfig); } } diff --git a/ydb/core/tx/schemeshard/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base.cpp index 569685595a..0cd7665641 100644 --- a/ydb/core/tx/schemeshard/ut_base.cpp +++ b/ydb/core/tx/schemeshard/ut_base.cpp @@ -10550,7 +10550,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { ); } - Y_UNIT_TEST(TopicMeteringModeAndStorageSize) { + Y_UNIT_TEST(TopicReserveSize) { TTestBasicRuntime runtime; TTestEnv env(runtime); ui64 txId = 100; @@ -10558,7 +10558,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { const auto AssertReserve = [&] (TString path, ui64 expectedReservedStorage) { TestDescribeResult(DescribePath(runtime, path), {NLs::Finished, - NLs::PQReservedStorage(expectedReservedStorage)}); + NLs::TopicReservedStorage(expectedReservedStorage)}); }; // create with WriteSpeedInBytesPerSecond @@ -10770,5 +10770,4 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { env.TestWaitNotification(runtime, txId); AssertReserve("/MyRoot/Topic2", 3 * 17); } - } diff --git a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt index e31d395fe6..ff67c37abd 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt +++ b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.darwin.txt @@ -22,6 +22,7 @@ target_link_libraries(tx-schemeshard-ut_helpers PUBLIC core-engine-minikql core-filestore-core ydb-core-metering + persqueue-ut-common ydb-core-protos ydb-core-scheme ydb-core-tablet_flat diff --git a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt index c1c282cef4..2c4e9247b6 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux-aarch64.txt @@ -23,6 +23,7 @@ target_link_libraries(tx-schemeshard-ut_helpers PUBLIC core-engine-minikql core-filestore-core ydb-core-metering + persqueue-ut-common ydb-core-protos ydb-core-scheme ydb-core-tablet_flat diff --git a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt index c1c282cef4..2c4e9247b6 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt +++ b/ydb/core/tx/schemeshard/ut_helpers/CMakeLists.linux.txt @@ -23,6 +23,7 @@ target_link_libraries(tx-schemeshard-ut_helpers PUBLIC core-engine-minikql core-filestore-core ydb-core-metering + persqueue-ut-common ydb-core-protos ydb-core-scheme ydb-core-tablet_flat diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index d5b550e6d3..f61c5a79e2 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -5,6 +5,7 @@ #include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/persqueue/events/global.h> +#include <ydb/core/persqueue/ut/common/pq_ut_common.h> #include <ydb/core/blockstore/core/blockstore.h> @@ -2196,4 +2197,31 @@ namespace NSchemeShardUT_Private { auto& rec = result->Record; return rec; } + + void SendTEvPeriodicTopicStats(TTestActorRuntime& runtime, ui64 topicId, ui64 generation, ui64 round, ui64 dataSize, ui64 usedReserveSize) { + TActorId sender = runtime.AllocateEdgeActor(); + + TEvPersQueue::TEvPeriodicTopicStats* ev = new TEvPersQueue::TEvPeriodicTopicStats(); + auto& rec = ev->Record; + rec.SetPathId(topicId); + rec.SetGeneration(generation); + rec.SetRound(round); + rec.SetDataSize(dataSize); + rec.SetUsedReserveSize(usedReserveSize); + + ForwardToTablet(runtime, TTestTxConfig::SchemeShard, sender, ev); + } + + void WriteToTopic(TTestActorRuntime& runtime, const TString& path, ui32& msgSeqNo, const TString& message) { + auto topicDescr = DescribePath(runtime, path).GetPathDescription().GetPersQueueGroup(); + auto partitionId = topicDescr.GetPartitions()[0].GetPartitionId(); + auto tabletId = topicDescr.GetPartitions()[0].GetTabletId(); + + const auto edge = runtime.AllocateEdgeActor(); + TString cookie = NKikimr::NPQ::CmdSetOwner(&runtime, tabletId, edge, partitionId, "default", true).first; + + TVector<std::pair<ui64, TString>> data; + data.push_back({1, message}); + NKikimr::NPQ::CmdWrite(&runtime, tabletId, edge, partitionId, "sourceid0", msgSeqNo, data, false, {}, true, cookie, 0); + } } diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h index c58fe76069..75c9b0bae0 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h @@ -534,4 +534,7 @@ namespace NSchemeShardUT_Private { NKikimrPQ::TDescribeResponse GetDescribeFromPQBalancer(TTestActorRuntime& runtime, ui64 balancerId); + void SendTEvPeriodicTopicStats(TTestActorRuntime& runtime, ui64 topicId, ui64 generation, ui64 round, ui64 dataSize, ui64 usedReserveSize); + void WriteToTopic(TTestActorRuntime& runtime, const TString& path, ui32& msgSeqNo, const TString& message); + } //NSchemeShardUT_Private diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp index a8dd706e72..0dfe288f18 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp @@ -13,6 +13,23 @@ namespace NLs { using namespace NKikimr; +#define DESCRIBE_ASSERT_EQUAL(name, type, expression, description) \ + TCheckFunc name(type expected) { \ + return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { \ + UNIT_ASSERT_C(IsGoodDomainStatus(record.GetStatus()), "Unexpected status: " << record.GetStatus()); \ + \ + const auto& pathDescr = record.GetPathDescription(); \ + const auto& subdomain = pathDescr.GetDomainDescription(); \ + const auto& value = expression; \ + \ + UNIT_ASSERT_EQUAL_C(value, expected, \ + description << " mismatch, subdomain with id " << subdomain.GetDomainKey().GetPathId() << \ + " has value " << value << \ + " but expected " << expected); \ + }; \ +} + + void NotInSubdomain(const NKikimrScheme::TEvDescribeSchemeResult& record) { UNIT_ASSERT(record.HasPathDescription()); NKikimrSchemeOp::TPathDescription descr = record.GetPathDescription(); @@ -619,20 +636,9 @@ TCheckFunc PQPartitionsInsideDomain(ui64 count) { }; } -TCheckFunc PQReservedStorage(ui64 count) { - return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { - UNIT_ASSERT_C(IsGoodDomainStatus(record.GetStatus()), "Unexpected status: " << record.GetStatus()); - - const auto& pathDescr = record.GetPathDescription(); - const auto& domain = pathDescr.GetDomainDescription(); - const auto& curCount = domain.GetDiskSpaceUsage().GetTopics().GetReserveSize(); - - UNIT_ASSERT_EQUAL_C(curCount, count, - "pq reserved storage mismatch, domain with id " << domain.GetDomainKey().GetPathId() << - " has size " << curCount << - " but expected " << count); - }; -} +DESCRIBE_ASSERT_EQUAL(TopicReservedStorage, ui64, subdomain.GetDiskSpaceUsage().GetTopics().GetReserveSize(), "Topic ReserveSize") +DESCRIBE_ASSERT_EQUAL(TopicAccountSize, ui64, subdomain.GetDiskSpaceUsage().GetTopics().GetAccountSize(), "Topic AccountSize") +DESCRIBE_ASSERT_EQUAL(TopicUsedReserveSize, ui64, subdomain.GetDiskSpaceUsage().GetTopics().GetUsedReserveSize(), "Topic UsedReserveSize") TCheckFunc PathsInsideDomainOneOf(TSet<ui64> variants) { return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { @@ -1104,5 +1110,7 @@ TCheckFunc PartitionKeys(TVector<TString> lastShardKeys) { }; } +#undef DESCRIBE_ASSERT_EQUAL + } // NLs } // NSchemeShardUT_Private diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h index 95fd8cc128..d76c74d41f 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h @@ -34,7 +34,9 @@ namespace NLs { TCheckFunc PathsInsideDomain(ui64 count); TCheckFunc PQPartitionsInsideDomain(ui64 count); - TCheckFunc PQReservedStorage(ui64 count); + TCheckFunc TopicReservedStorage(ui64 expected); + TCheckFunc TopicAccountSize(ui64 expected); + TCheckFunc TopicUsedReserveSize(ui64 expected); TCheckFunc PathsInsideDomainOneOf(TSet<ui64> variants); TCheckFunc ShardsInsideDomain(ui64 count); TCheckFunc ShardsInsideDomainOneOf(TSet<ui64> variants); diff --git a/ydb/core/tx/schemeshard/ut_stats.cpp b/ydb/core/tx/schemeshard/ut_stats.cpp index d2fbcaf251..51029221ef 100644 --- a/ydb/core/tx/schemeshard/ut_stats.cpp +++ b/ydb/core/tx/schemeshard/ut_stats.cpp @@ -297,4 +297,212 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) { WaitAndCheckStatPersisted(runtime, env, newRowsCount, batchTimeout, eventAction); } + + Y_UNIT_TEST(TopicAccountSizeAndUsedReserveSize) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + auto& appData = runtime.GetAppData(); + + ui64 txId = 100; + + // disable batching + appData.SchemeShardConfig.SetStatsBatchTimeoutMs(0); + appData.SchemeShardConfig.SetStatsMaxBatchSize(0); + + // apply config via reboot + TActorId sender = runtime.AllocateEdgeActor(); + GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender); + + const auto Assert = [&] (ui64 expectedAccountSize, ui64 expectedUsedReserveSize) { + TestDescribeResult(DescribePath(runtime, "/MyRoot/Topic1"), + {NLs::Finished, + NLs::TopicAccountSize(expectedAccountSize), + NLs::TopicUsedReserveSize(expectedUsedReserveSize)}); + }; + + TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"( + Name: "Topic1" + TotalGroupCount: 1 + PartitionPerTablet: 1 + PQTabletConfig { + PartitionConfig { + LifetimeSeconds: 13 + WriteSpeedInBytesPerSecond : 19 + } + MeteringMode: METERING_MODE_RESERVED_CAPACITY + } + )"); + env.TestWaitNotification(runtime, txId); + Assert(1 * 13 * 19, 0); // 247, 0 + + TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"( + Name: "Topic2" + TotalGroupCount: 3 + PartitionPerTablet: 3 + PQTabletConfig { + PartitionConfig { + LifetimeSeconds: 11 + WriteSpeedInBytesPerSecond : 17 + } + MeteringMode: METERING_MODE_RESERVED_CAPACITY + } + )"); + env.TestWaitNotification(runtime, txId); + Assert(1 * 13 * 19 + 3 * 11 * 17, 0); // 247 + 561 = 808, 0 + + TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"( + Name: "Topic3" + TotalGroupCount: 3 + PartitionPerTablet: 3 + PQTabletConfig { + PartitionConfig { + LifetimeSeconds: 11 + WriteSpeedInBytesPerSecond : 17 + } + MeteringMode: METERING_MODE_RESERVED_CAPACITY + } + )"); + env.TestWaitNotification(runtime, txId); + Assert(1 * 13 * 19 + 3 * 11 * 17 + 3 * 11 * 17, 0); // 247 + 561 + 561 = 1369, 0 + + ui64 topic1Id = DescribePath(runtime, "/MyRoot/Topic1").GetPathDescription().GetSelf().GetPathId(); + ui64 topic2Id = DescribePath(runtime, "/MyRoot/Topic2").GetPathDescription().GetSelf().GetPathId(); + ui64 topic3Id = DescribePath(runtime, "/MyRoot/Topic3").GetPathDescription().GetSelf().GetPathId(); + + ui64 generation = 1; + ui64 round = 1; + + SendTEvPeriodicTopicStats(runtime, topic1Id, generation, ++round, 101, 101); + Assert(1369, 101); // only reserve size + + SendTEvPeriodicTopicStats(runtime, topic1Id, generation, ++round, 383, 247); + Assert(1369 + (383 - 247), 247); // 1505, 247 reserve + exceeding the limit + + SendTEvPeriodicTopicStats(runtime, topic2Id, generation, ++round, 113, 113); + Assert(1369 + (383 - 247), 247 + 113); // 1505, 360 + + SendTEvPeriodicTopicStats(runtime, topic1Id, generation, ++round, 31, 31); + Assert(1369, 31 + 113); // only reserve, data size + + TestDropPQGroup(runtime, ++txId, "/MyRoot", "Topic2"); + env.TestWaitNotification(runtime, txId); + Assert(808, 31); + + SendTEvPeriodicTopicStats(runtime, topic3Id, generation, ++round, 151, 151); + Assert(808, 31 + 151); + + TestDeallocatePQ(runtime, ++txId, "/MyRoot", "Name: \"Topic3\""); + env.TestWaitNotification(runtime, txId); + Assert(247, 31); + } + + Y_UNIT_TEST(TopicPeriodicStatMeteringModeReserved) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::PERSQUEUE_READ_BALANCER, NLog::PRI_TRACE); + + auto& appData = runtime.GetAppData(); + + ui64 txId = 100; + + // disable batching + appData.SchemeShardConfig.SetStatsBatchTimeoutMs(0); + appData.SchemeShardConfig.SetStatsMaxBatchSize(0); + + appData.PQConfig.SetBalancerWakeupIntervalSec(1); + + // apply config via reboot + TActorId sender = runtime.AllocateEdgeActor(); + GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender); + + TString topicPath = "/MyRoot/Topic1"; + + const auto Assert = [&] (ui64 expectedAccountSize, ui64 expectedUsedReserveSize) { + TestDescribeResult(DescribePath(runtime,topicPath), + {NLs::Finished, + NLs::TopicAccountSize(expectedAccountSize), + NLs::TopicUsedReserveSize(expectedUsedReserveSize)}); + }; + + + TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"( + Name: "Topic1" + TotalGroupCount: 3 + PartitionPerTablet: 3 + PQTabletConfig { + PartitionConfig { + LifetimeSeconds: 11 + WriteSpeedInBytesPerSecond : 17 + } + MeteringMode: METERING_MODE_RESERVED_CAPACITY + } + )"); + env.TestWaitNotification(runtime, txId); + Assert(3 * 11 * 17, 0); // 561, 0 + + ui32 msgSeqNo = 100; + WriteToTopic(runtime, topicPath, msgSeqNo, "Message 100"); + + env.SimulateSleep(runtime, TDuration::Seconds(3)); // Wait TEvPeriodicTopicStats + + Assert(3 * 11 * 17, 69); // 69 - it is unstable value. it can change if internal message store change + } + + Y_UNIT_TEST(TopicPeriodicStatMeteringModeRequest) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::PERSQUEUE_READ_BALANCER, NLog::PRI_TRACE); + + auto& appData = runtime.GetAppData(); + + ui64 txId = 100; + + // disable batching + appData.SchemeShardConfig.SetStatsBatchTimeoutMs(0); + appData.SchemeShardConfig.SetStatsMaxBatchSize(0); + + appData.PQConfig.SetBalancerWakeupIntervalSec(1); + + // apply config via reboot + TActorId sender = runtime.AllocateEdgeActor(); + GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender); + + TString topicPath = "/MyRoot/Topic1"; + + const auto Assert = [&] (ui64 expectedAccountSize, ui64 expectedUsedReserveSize) { + TestDescribeResult(DescribePath(runtime,topicPath), + {NLs::Finished, + NLs::TopicAccountSize(expectedAccountSize), + NLs::TopicUsedReserveSize(expectedUsedReserveSize)}); + }; + + + TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"( + Name: "Topic1" + TotalGroupCount: 3 + PartitionPerTablet: 3 + PQTabletConfig { + PartitionConfig { + LifetimeSeconds: 11 + WriteSpeedInBytesPerSecond : 17 + } + MeteringMode: METERING_MODE_REQUEST_UNITS + } + )"); + env.TestWaitNotification(runtime, txId); + Assert(0, 0); // topic is empty + + ui32 msgSeqNo = 100; + WriteToTopic(runtime, topicPath, msgSeqNo, "Message 100"); + + env.SimulateSleep(runtime, TDuration::Seconds(3)); // Wait TEvPeriodicTopicStats + + Assert(69, 0); // 67 - it is unstable value. it can change if internal message store change + } }; |