aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-02-09 10:04:25 +0300
committertesseract <tesseract@yandex-team.com>2023-02-09 10:04:25 +0300
commitaf24f1c243267a93328f66b5b3d67c5473e74f5b (patch)
tree881ca3dbb795b8103df2a7e9c68bc511f11c4b84
parentb989f9b4278df1d0326ff514f2f59193cfd4dbe1 (diff)
downloadydb-af24f1c243267a93328f66b5b3d67c5473e74f5b.tar.gz
Выделить методы получения текущего размера партиции топика
Небольшое выделение метода что бы было проще разбираться в коде Мотивация названия По коду ``` result.SetPartitionSize(Size());``` Т.е. это размер партиции, но метод добавляется и так к партиции, и partition.PartitionSize() звучит коряво (два раза partition). В итоге просто Size
-rw-r--r--ydb/core/persqueue/partition.cpp14
-rw-r--r--ydb/core/persqueue/partition.h7
2 files changed, 13 insertions, 8 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index eaab8534b86..25f4379bcb4 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -567,7 +567,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
out << "CreationTime: " << CreationTime.ToStringLocalUpToSeconds(); res.push_back(out.Str()); out.Clear();
out << "InitDuration: " << InitDuration.ToString(); res.push_back(out.Str()); out.Clear();
out << "TotalCount: " << (Head.GetNextOffset() - StartOffset); res.push_back(out.Str()); out.Clear();
- out << "TotalSize: " << BodySize + Head.PackedSize; res.push_back(out.Str()); out.Clear();
+ out << "TotalSize: " << Size(); res.push_back(out.Str()); out.Clear();
out << "LastOffset: " << (Head.GetNextOffset()); res.push_back(out.Str()); out.Clear();
out << "HeadOffset: " << Head.Offset << ", count: " << Head.GetCount(); res.push_back(out.Str()); out.Clear();
out << "WriteInflightSize: " << WriteInflightSize; res.push_back(out.Str()); out.Clear();
@@ -1052,7 +1052,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActo
ui64 TPartition::GetUsedStorage(const TActorContext& ctx) {
auto duration = ctx.Now() - LastUsedStorageMeterTimestamp;
LastUsedStorageMeterTimestamp = ctx.Now();
- ui64 size = BodySize + Head.PackedSize;
+ ui64 size = Size();
if (DataKeysBody.size() > 0) {
size -= DataKeysBody.front().Size;
} else {
@@ -2048,7 +2048,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
if (DiskIsFull) {
result.SetStatus(NKikimrPQ::TStatusResponse::STATUS_DISK_IS_FULL);
} else if (EndOffset - StartOffset >= static_cast<ui64>(Config.GetPartitionConfig().GetMaxCountInPartition()) ||
- BodySize + Head.PackedSize >= static_cast<ui64>(Config.GetPartitionConfig().GetMaxSizeInPartition())) {
+ Size() >= static_cast<ui64>(Config.GetPartitionConfig().GetMaxSizeInPartition())) {
result.SetStatus(NKikimrPQ::TStatusResponse::STATUS_PARTITION_IS_FULL);
} else {
result.SetStatus(NKikimrPQ::TStatusResponse::STATUS_OK);
@@ -2136,7 +2136,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
result.SetReadBytesQuota(maxQuota);
- result.SetPartitionSize(BodySize + Head.PackedSize);
+ result.SetPartitionSize(Size());
result.SetStartOffset(StartOffset);
result.SetEndOffset(EndOffset);
@@ -3505,7 +3505,7 @@ void TPartition::ReportCounters(const TActorContext& ctx) {
}
}
- ui64 partSize = BodySize + Head.PackedSize;
+ ui64 partSize = Size();
if (partSize != PartitionCountersLabeled->GetCounters()[METRIC_TOTAL_PART_SIZE].Get()) {
haveChanges = true;
PartitionCountersLabeled->GetCounters()[METRIC_MAX_PART_SIZE].Set(partSize);
@@ -4752,12 +4752,12 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c
const ui64 maxSize = Config.GetPartitionConfig().GetMaxSizeInPartition();
const ui64 maxCount = Config.GetPartitionConfig().GetMaxCountInPartition();
- if (EndOffset - StartOffset >= maxCount || BodySize + Head.PackedSize >= maxSize) {
+ if (EndOffset - StartOffset >= maxCount || Size() >= maxSize) {
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(ev->Get()->Msgs.size());
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(sz);
ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL,
- Sprintf("try later, partition is full - already have %" PRIu64" from %" PRIu64 " count, %" PRIu64 " from %" PRIu64 " size", EndOffset - StartOffset, maxCount, BodySize + Head.PackedSize, maxSize));
+ Sprintf("try later, partition is full - already have %" PRIu64" from %" PRIu64 " count, %" PRIu64 " from %" PRIu64 " size", EndOffset - StartOffset, maxCount, Size(), maxSize));
return;
}
ui64 size = 0;
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 815d83f8ee5..df1b1c3963e 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -248,7 +248,7 @@ private:
void AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request);
void AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request,
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated);
-
+
TUserInfo& GetOrCreatePendingUser(const TString& user, const TActorContext& ctx, TMaybe<ui64> readRuleGeneration = {});
TUserInfo* GetPendingUserIfExists(const TString& user);
@@ -291,6 +291,11 @@ public:
void Bootstrap(const TActorContext& ctx);
+ ui64 Size() const {
+ return BodySize + Head.PackedSize;
+ }
+
+
//Bootstrap sends kvRead
//Become StateInit