aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-04-10 09:38:17 +0300
committertesseract <tesseract@yandex-team.com>2023-04-10 09:38:17 +0300
commit0898d2c2865676cf1622e78dd5a84f9e0b9daaf1 (patch)
treec5d1551e7c3b51dfce5dd46b6c8de7c470d8305b
parentb5da19b8a2b0de7edae408ab328f3ae34acd421d (diff)
downloadydb-0898d2c2865676cf1622e78dd5a84f9e0b9daaf1.tar.gz
Move initialization to separated file. Refactor initialization process
1. Уношу все , что связано с инициализацией, в отдельный файл partition_init.cpp 2. Инициализация разбивается на явные последовательные шаги. Шаги не знаю друг об друге. 3. Каждый шаг сам управляет на какие event-ы реагировать.
-rw-r--r--ydb/core/persqueue/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/persqueue/partition.cpp815
-rw-r--r--ydb/core/persqueue/partition.h42
-rw-r--r--ydb/core/persqueue/partition_init.cpp943
-rw-r--r--ydb/core/persqueue/partition_init.h152
-rw-r--r--ydb/core/persqueue/partition_util.h114
9 files changed, 1234 insertions, 836 deletions
diff --git a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt
index a2ae4317b1e..ee7cf6c9143 100644
--- a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt
@@ -52,6 +52,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq.cpp
diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
index 017ec968c3d..c5e43d10ce4 100644
--- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
@@ -53,6 +53,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq.cpp
diff --git a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt
index 017ec968c3d..c5e43d10ce4 100644
--- a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt
@@ -53,6 +53,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq.cpp
diff --git a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt
index a2ae4317b1e..ee7cf6c9143 100644
--- a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt
@@ -52,6 +52,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq.cpp
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 92764223724..bfa2bdbc840 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -1,5 +1,6 @@
#include "event_helpers.h"
#include "mirrorer.h"
+#include "partition_util.h"
#include "partition.h"
#include "read.h"
@@ -30,7 +31,6 @@ static const ui32 MAX_WRITE_CYCLE_SIZE = 16_MB;
static const ui32 MAX_USER_ACTS = 1000;
static const TDuration WAKE_TIMEOUT = TDuration::Seconds(5);
static const ui32 MAX_INLINE_SIZE = 1000;
-static const ui32 LEVEL0 = 32;
static const TDuration UPDATE_AVAIL_SIZE_INTERVAL = TDuration::MilliSeconds(100);
static const TString WRITE_QUOTA_ROOT_PATH = "write-quota";
static const ui32 MAX_USERS = 1000;
@@ -79,109 +79,6 @@ struct TMirrorerInfo {
TTabletCountersBase Baseline;
};
-class TKeyLevel {
-public:
- friend IOutputStream& operator <<(IOutputStream& out, const TKeyLevel& value);
-
- TKeyLevel(ui32 border)
- : Border_(border)
- , Sum_(0)
- , RecsCount_(0)
- , InternalPartsCount_(0) {}
-
- void Clear() {
- Keys_.clear();
- Sum_ = 0;
- RecsCount_ = 0;
- InternalPartsCount_ = 0;
- }
-
- ui32 KeysCount() const {
- return Keys_.size();
- }
-
- ui32 RecsCount() const {
- return RecsCount_;
- }
-
- ui16 InternalPartsCount() const {
- return InternalPartsCount_;
- }
-
- bool NeedCompaction() const {
- return Sum_ >= Border_;
- }
-
- std::pair<TKey, ui32> Compact() {
- Y_VERIFY(!Keys_.empty());
- TKey tmp(Keys_.front().first);
- tmp.SetCount(RecsCount_);
- tmp.SetInternalPartsCount(InternalPartsCount_);
- std::pair<TKey, ui32> res(tmp, Sum_);
- Clear();
- return res;
- }
-
- std::pair<TKey, ui32> PopFront() {
- Y_VERIFY(!Keys_.empty());
- Sum_ -= Keys_.front().second;
- RecsCount_ -= Keys_.front().first.GetCount();
- InternalPartsCount_ -= Keys_.front().first.GetInternalPartsCount();
- auto res = Keys_.front();
- Keys_.pop_front();
- return res;
- }
-
- std::pair<TKey, ui32> PopBack() {
- Y_VERIFY(!Keys_.empty());
- Sum_ -= Keys_.back().second;
- RecsCount_ -= Keys_.back().first.GetCount();
- InternalPartsCount_ -= Keys_.back().first.GetInternalPartsCount();
- auto res = Keys_.back();
- Keys_.pop_back();
- return res;
- }
-
- ui32 Sum() const {
- return Sum_;
- }
-
- const TKey& GetKey(const ui32 pos) const {
- Y_VERIFY(pos < Keys_.size());
- return Keys_[pos].first;
- }
-
- const ui32& GetSize(const ui32 pos) const {
- Y_VERIFY(pos < Keys_.size());
- return Keys_[pos].second;
- }
-
- void PushKeyToFront(const TKey& key, ui32 size) {
- Sum_ += size;
- RecsCount_ += key.GetCount();
- InternalPartsCount_ += key.GetInternalPartsCount();
- Keys_.push_front(std::make_pair(key, size));
- }
-
- void AddKey(const TKey& key, ui32 size) {
- Sum_ += size;
- RecsCount_ += key.GetCount();
- InternalPartsCount_ += key.GetInternalPartsCount();
- Keys_.push_back(std::make_pair(key, size));
- }
-
- ui32 Border() const {
- return Border_;
- }
-
-private:
- const ui32 Border_;
- std::deque<std::pair<TKey, ui32>> Keys_;
- ui32 Sum_;
- ui32 RecsCount_;
- ui16 InternalPartsCount_;
-};
-
void HtmlOutput(IOutputStream& out, const TString& line, const std::deque<std::pair<TKey, ui32>>& keys) {
HTML(out) {
TABLE() {
@@ -389,43 +286,10 @@ void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels) {
}
-static void RequestDiskStatus(const TActorContext& ctx, const TActorId& dst, ui32 numChannels) {
- THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
-
- AddCheckDiskRequest(request.Get(), numChannels);
-
- ctx.Send(dst, request.Release());
-}
-
-
void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) {
RequestRange(ctx, dst, partition, TKeyPrefix::TypeInfo, true, key, key == "");
}
-void RequestMetaRead(const TActorContext& ctx, const TActorId& dst, ui32 partition) {
- auto addKey = [](NKikimrClient::TKeyValueRequest& request, TKeyPrefix::EType type, ui32 partition) {
- auto read = request.AddCmdRead();
- TKeyPrefix key{type, partition};
- read->SetKey(key.Data(), key.Size());
- };
-
- THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
-
- addKey(request->Record, TKeyPrefix::TypeMeta, partition);
- addKey(request->Record, TKeyPrefix::TypeTxMeta, partition);
-
- ctx.Send(dst, request.Release());
-}
-
-void RequestData(const TActorContext& ctx, const TActorId& dst, const TVector<TString>& keys) {
- THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
- for (auto& key: keys) {
- auto read = request->Record.AddCmdRead();
- read->SetKey(key);
- }
- ctx.Send(dst, request.Release());
-}
-
void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) {
RequestRange(ctx, dst, partition, TKeyPrefix::TypeData, false, key);
}
@@ -486,7 +350,8 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
const NKikimrPQ::TPQTabletConfig& tabletConfig, const TTabletCountersBase& counters, bool subDomainOutOfSpace,
bool newPartition,
TVector<TTransaction> distrTxs)
- : TabletID(tabletId)
+ : Initializer(this)
+ , TabletID(tabletId)
, Partition(partition)
, TabletConfig(tabletConfig)
, Counters(counters)
@@ -498,7 +363,6 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
, WriteInflightSize(0)
, Tablet(tablet)
, BlobCache(blobCache)
- , InitState(WaitConfig)
, PartitionedBlob(partition, 0, 0, 0, 0, 0, Head, NewHead, true, false, 8_MB)
, NewHeadKey{TKey{}, 0, TInstant::Zero(), 0}
, BodySize(0)
@@ -734,147 +598,6 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
ctx.Send(ev->Sender, new TEvPQ::TEvMonResponse(Partition, res, out.Str()));
}
-void TPartition::RequestConfig(const TActorContext& ctx)
-{
- auto event = MakeHolder<TEvKeyValue::TEvRequest>();
- auto read = event->Record.AddCmdRead();
- read->SetKey(GetKeyConfig());
- ctx.Send(Tablet, event.Release());
-}
-
-void TPartition::HandleConfig(const NKikimrClient::TResponse& res, const TActorContext& ctx)
-{
- auto& response = res.GetReadResult(0);
-
- switch (response.GetStatus()) {
- case NKikimrProto::OK:
- Y_VERIFY(Config.ParseFromString(response.GetValue()));
- Y_VERIFY(Config.GetVersion() <= TabletConfig.GetVersion());
- if (Config.GetVersion() < TabletConfig.GetVersion()) {
- auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(TopicConverter,
- TabletConfig);
- PushFrontDistrTx(event.Release());
- }
- break;
- case NKikimrProto::NODATA:
- Config = TabletConfig;
- break;
- case NKikimrProto::ERROR:
- LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
- "Partition " << Partition << " can't read config");
- ctx.Send(Tablet, new TEvents::TEvPoisonPill());
- break;
- default:
- Cerr << "ERROR " << response.GetStatus() << "\n";
- Y_FAIL("bad status");
- };
-
- InitState = WaitDiskStatus;
- Initialize(ctx);
-}
-
-void TPartition::Bootstrap(const TActorContext& ctx) {
- Y_VERIFY(InitState == WaitConfig);
- RequestConfig(ctx);
- Become(&TThis::StateInit);
-}
-
-void TPartition::Initialize(const TActorContext& ctx) {
- CreationTime = ctx.Now();
- WriteCycleStartTime = ctx.Now();
- WriteQuota.ConstructInPlace(Config.GetPartitionConfig().GetBurstSize(),
- Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(),
- ctx.Now());
- WriteTimestamp = ctx.Now();
- LastUsedStorageMeterTimestamp = ctx.Now();
- WriteTimestampEstimate = ManageWriteTimestampEstimate ? ctx.Now() : TInstant::Zero();
-
- CloudId = Config.GetYcCloudId();
- DbId = Config.GetYdbDatabaseId();
- DbPath = Config.GetYdbDatabasePath();
- FolderId = Config.GetYcFolderId();
-
- CalcTopicWriteQuotaParams(AppData()->PQConfig,
- IsLocalDC,
- TopicConverter,
- TabletID,
- ctx,
- TopicWriteQuoterPath,
- TopicWriteQuotaResourcePath);
-
- UsersInfoStorage.ConstructInPlace(DCId,
- TabletID,
- TopicConverter,
- Partition,
- Counters,
- Config,
- CloudId,
- DbId,
- Config.GetYdbDatabasePath(),
- IsServerless,
- FolderId);
- TotalChannelWritesByHead.resize(Config.GetPartitionConfig().GetNumChannels());
-
- if (Config.GetPartitionConfig().HasMirrorFrom()) {
- ManageWriteTimestampEstimate = !Config.GetPartitionConfig().GetMirrorFrom().GetSyncWriteTime();
- } else {
- ManageWriteTimestampEstimate = IsLocalDC;
- }
-
- if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) {
- PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(EscapeBadChars(TopicConverter->GetClientsideName()),
- Partition,
- Config.GetYdbDatabasePath()));
- } else {
- PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(TopicConverter->GetClientsideName(),
- Partition));
- }
-
- UsersInfoStorage->Init(Tablet, SelfId(), ctx);
-
- Y_VERIFY(AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() > 0);
- ui32 border = LEVEL0;
- MaxSizeCheck = 0;
- MaxBlobSize = AppData(ctx)->PQConfig.GetMaxBlobSize();
- PartitionedBlob = TPartitionedBlob(Partition, 0, 0, 0, 0, 0, Head, NewHead, true, false, MaxBlobSize);
- for (ui32 i = 0; i < TotalLevels; ++i) {
- CompactLevelBorder.push_back(border);
- MaxSizeCheck += border;
- Y_VERIFY(i + 1 < TotalLevels && border < MaxBlobSize || i + 1 == TotalLevels && border == MaxBlobSize);
- border *= AppData(ctx)->PQConfig.GetMaxBlobsPerLevel();
- border = Min(border, MaxBlobSize);
- }
- TotalMaxCount = AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() * TotalLevels;
-
- std::reverse(CompactLevelBorder.begin(), CompactLevelBorder.end());
-
- for (ui32 i = 0; i < TotalLevels; ++i) {
- DataKeysHead.push_back(TKeyLevel(CompactLevelBorder[i]));
- }
-
- for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) {
- auto &userInfo = UsersInfoStorage->GetOrCreate(readQuota.GetClientId(), ctx);
- userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond());
- }
-
- LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "bootstrapping " << Partition << " " << ctx.SelfID);
-
- if (NewPartition) {
- InitComplete(ctx);
- } else {
- Y_VERIFY(InitState == WaitDiskStatus);
- RequestDiskStatus(ctx, Tablet, Config.GetPartitionConfig().GetNumChannels());
- Become(&TThis::StateInit);
- }
-
- if (AppData(ctx)->Counters) {
- if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) {
- SetupStreamCounters(ctx);
- } else {
- SetupTopicCounters(ctx);
- }
- }
-}
void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) {
Responses.emplace_back(
@@ -885,165 +608,6 @@ void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) {
);
}
-void TPartition::SetupTopicCounters(const TActorContext& ctx) {
- auto counters = AppData(ctx)->Counters;
- auto labels = NPersQueue::GetLabels(TopicConverter);
- const TString suffix = IsLocalDC ? "Original" : "Mirrored";
-
- WriteBufferIsFullCounter.SetCounter(
- NPersQueue::GetCounters(counters, "writingTime", TopicConverter),
- {{"host", DCId},
- {"Partition", ToString<ui32>(Partition)}},
- {"sensor", "BufferFullTime" + suffix, true});
-
- auto subGroup = GetServiceCounters(counters, "pqproxy|writeTimeLag");
- InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
- subGroup, labels, {{"sensor", "TimeLags" + suffix}}, "Interval",
- TVector<std::pair<ui64, TString>>{
- {100, "100ms"}, {200, "200ms"}, {500, "500ms"}, {1000, "1000ms"},
- {2000, "2000ms"}, {5000, "5000ms"}, {10'000, "10000ms"}, {30'000, "30000ms"},
- {60'000, "60000ms"}, {180'000,"180000ms"}, {9'999'999, "999999ms"}}, true));
-
-
- subGroup = GetServiceCounters(counters, "pqproxy|writeInfo");
- MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
- subGroup, labels, {{"sensor", "MessageSize" + suffix}}, "Size",
- TVector<std::pair<ui64, TString>>{
- {1_KB, "1kb"}, {5_KB, "5kb"}, {10_KB, "10kb"},
- {20_KB, "20kb"}, {50_KB, "50kb"}, {100_KB, "100kb"}, {200_KB, "200kb"},
- {512_KB, "512kb"},{1024_KB, "1024kb"}, {2048_KB,"2048kb"}, {5120_KB, "5120kb"},
- {10240_KB, "10240kb"}, {65536_KB, "65536kb"}, {999'999'999, "99999999kb"}}, true));
-
- subGroup = GetServiceCounters(counters, "pqproxy|writeSession");
- BytesWrittenTotal = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"BytesWritten" + suffix}, true);
- BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"UncompressedBytesWritten" + suffix}, true);
- BytesWrittenComp = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"CompactedBytesWritten" + suffix}, true);
- MsgsWrittenTotal = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"MessagesWritten" + suffix}, true);
-
- TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}};
- ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs();
- subGroup = GetServiceCounters(counters, "pqproxy|SLI");
- WriteLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, aggr, "Write", border,
- {100, 200, 500, 1000, 1500, 2000,
- 5000, 10'000, 30'000, 99'999'999});
- SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "sensor", false);
- WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "sensor", false);
- if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) {
- TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
- new NKikimr::NPQ::TPercentileCounter(
- GetServiceCounters(counters, "pqproxy|topicWriteQuotaWait"), labels,
- {{"sensor", "TopicWriteQuotaWait" + suffix}}, "Interval",
- TVector<std::pair<ui64, TString>>{
- {0, "0ms"}, {1, "1ms"}, {5, "5ms"}, {10, "10ms"},
- {20, "20ms"}, {50, "50ms"}, {100, "100ms"}, {500, "500ms"},
- {1000, "1000ms"}, {2500, "2500ms"}, {5000, "5000ms"},
- {10'000, "10000ms"}, {9'999'999, "999999ms"}}, true));
- }
-
- PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
- new NKikimr::NPQ::TPercentileCounter(GetServiceCounters(counters, "pqproxy|partitionWriteQuotaWait"),
- labels, {{"sensor", "PartitionWriteQuotaWait" + suffix}}, "Interval",
- TVector<std::pair<ui64, TString>>{
- {0, "0ms"}, {1, "1ms"}, {5, "5ms"}, {10, "10ms"},
- {20, "20ms"}, {50, "50ms"}, {100, "100ms"}, {500, "500ms"},
- {1000, "1000ms"}, {2500, "2500ms"}, {5000, "5000ms"},
- {10'000, "10000ms"}, {9'999'999, "999999ms"}}, true));
-}
-
-void TPartition::SetupStreamCounters(const TActorContext& ctx) {
- const auto topicName = TopicConverter->GetModernName();
- auto counters = AppData(ctx)->Counters;
- auto subgroups = NPersQueue::GetSubgroupsForTopic(TopicConverter, CloudId, DbId, DbPath, FolderId);
-/*
- WriteBufferIsFullCounter.SetCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless),
- {
- {"database", DbPath},
- {"cloud_id", CloudId},
- {"folder_id", FolderId},
- {"database_id", DbId},
- {"topic", TopicConverter->GetFederationPath()},
- {"host", DCId},
- {"partition", ToString<ui32>(Partition)}},
- {"name", "api.grpc.topic.stream_write.buffer_brimmed_milliseconds", true});
-*/
-
- subgroups.push_back({"name", "topic.write.lag_milliseconds"});
-
- InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), {},
- subgroups, "bin",
- TVector<std::pair<ui64, TString>>{
- {100, "100"}, {200, "200"}, {500, "500"},
- {1000, "1000"}, {2000, "2000"}, {5000, "5000"},
- {10'000, "10000"}, {30'000, "30000"}, {60'000, "60000"},
- {180'000,"180000"}, {9'999'999, "999999"}}, true));
-
- subgroups.back().second = "topic.write.message_size_bytes";
- MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), {},
- subgroups, "bin",
- TVector<std::pair<ui64, TString>>{
- {1024, "1024"}, {5120, "5120"}, {10'240, "10240"},
- {20'480, "20480"}, {51'200, "51200"}, {102'400, "102400"},
- {204'800, "204800"}, {524'288, "524288"},{1'048'576, "1048576"},
- {2'097'152,"2097152"}, {5'242'880, "5242880"}, {10'485'760, "10485760"},
- {67'108'864, "67108864"}, {999'999'999, "99999999"}}, true));
-
- subgroups.pop_back();
- BytesWrittenGrpc = NKikimr::NPQ::TMultiCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
- {"api.grpc.topic.stream_write.bytes"} , true, "name");
- BytesWrittenTotal = NKikimr::NPQ::TMultiCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
- {"topic.write.bytes"} , true, "name");
-
- MsgsWrittenGrpc = NKikimr::NPQ::TMultiCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
- {"api.grpc.topic.stream_write.messages"}, true, "name");
- MsgsWrittenTotal = NKikimr::NPQ::TMultiCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
- {"topic.write.messages"}, true, "name");
-
-
- BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(
-
- NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
- {"topic.write.uncompressed_bytes"}, true, "name");
-
- TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}};
- ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs();
- auto subGroup = GetServiceCounters(counters, "pqproxy|SLI");
- WriteLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, aggr, "Write", border,
- {100, 200, 500, 1000, 1500, 2000,
- 5000, 10'000, 30'000, 99'999'999});
- SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false);
- WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false);
- if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) {
- subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"});
- TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
- new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), {},
- subgroups, "bin",
- TVector<std::pair<ui64, TString>>{
- {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"},
- {20, "20"}, {50, "50"}, {100, "100"}, {500, "500"},
- {1000, "1000"}, {2500, "2500"}, {5000, "5000"},
- {10'000, "10000"}, {9'999'999, "999999"}}, true));
- subgroups.pop_back();
- }
-
- subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"});
- PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
- new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin",
- TVector<std::pair<ui64, TString>>{
- {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"},
- {20, "20"}, {50, "50"}, {100, "100"}, {500, "500"},
- {1000, "1000"}, {2500, "2500"}, {5000, "5000"},
- {10'000, "10000"}, {9'999'999, "999999"}}, true));
-}
-
void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
if (!InitDone)
return;
@@ -1451,7 +1015,6 @@ void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx) {
ProcessReserveRequests(ctx);
}
-
void TPartition::FailBadClient(const TActorContext& ctx) {
for (auto it = Owners.begin(); it != Owners.end();) {
it = DropOwner(it, ctx);
@@ -1487,373 +1050,6 @@ bool CheckDiskStatus(const TStorageStatusFlags status) {
return !status.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove);
}
-void TPartition::HandleGetDiskStatus(const NKikimrClient::TResponse& response, const TActorContext& ctx) {
- bool diskIsOk = true;
- for (ui32 i = 0; i < response.GetStatusResultSize(); ++i) {
- auto& res = response.GetGetStatusResult(i);
-
- if (res.GetStatus() != NKikimrProto::OK) {
- LOG_ERROR_S(
- ctx, NKikimrServices::PERSQUEUE,
- "commands for topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition <<
- " are not processed at all, got KV error in CmdGetStatus " << res.GetStatus()
- );
- ctx.Send(Tablet, new TEvents::TEvPoisonPill());
- return;
- }
- diskIsOk = diskIsOk && CheckDiskStatus(res.GetStatusFlags());
- }
- DiskIsFull = !diskIsOk;
- if (DiskIsFull) {
- LogAndCollectError(NKikimrServices::PERSQUEUE, "disk is full", ctx);
- }
-
- InitState = WaitMetaRead;
- RequestMetaRead(ctx, Tablet, Partition);
-}
-
-void TPartition::HandleMetaRead(const NKikimrClient::TResponse& response, const TActorContext& ctx)
-{
- auto handleReadResult = [&](const NKikimrClient::TKeyValueResponse::TReadResult& response,
- auto&& action) {
- switch (response.GetStatus()) {
- case NKikimrProto::OK:
- action(response);
- break;
- case NKikimrProto::NODATA:
- break;
- case NKikimrProto::ERROR:
- LOG_ERROR_S(
- ctx, NKikimrServices::PERSQUEUE,
- "read topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " error"
- );
- ctx.Send(Tablet, new TEvents::TEvPoisonPill());
- break;
- default:
- Cerr << "ERROR " << response.GetStatus() << "\n";
- Y_FAIL("bad status");
- };
- };
-
- auto loadMeta = [&](const NKikimrClient::TKeyValueResponse::TReadResult& response) {
- NKikimrPQ::TPartitionMeta meta;
- bool res = meta.ParseFromString(response.GetValue());
- Y_VERIFY(res);
- /* Bring back later, when switch to 21-2 will be unable
- StartOffset = meta.GetStartOffset();
- EndOffset = meta.GetEndOffset();
- if (StartOffset == EndOffset) {
- NewHead.Offset = Head.Offset = EndOffset;
- }
- */
- if (CurrentStateFunc() == &TThis::StateInit) {
- SubDomainOutOfSpace = meta.GetSubDomainOutOfSpace();
- }
- };
- handleReadResult(response.GetReadResult(0), loadMeta);
-
- auto loadTxMeta = [this](const NKikimrClient::TKeyValueResponse::TReadResult& response) {
- NKikimrPQ::TPartitionTxMeta meta;
- bool res = meta.ParseFromString(response.GetValue());
- Y_VERIFY(res);
-
- if (meta.HasPlanStep()) {
- PlanStep = meta.GetPlanStep();
- }
- if (meta.HasTxId()) {
- TxId = meta.GetTxId();
- }
- };
- handleReadResult(response.GetReadResult(1), loadTxMeta);
-
- InitState = WaitInfoRange;
- RequestInfoRange(ctx, Tablet, Partition, "");
-}
-
-void TPartition::HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) {
- //megaqc check here all results
- Y_VERIFY(range.HasStatus());
- const TString *key = nullptr;
- switch (range.GetStatus()) {
- case NKikimrProto::OK:
- case NKikimrProto::OVERRUN:
- for (ui32 i = 0; i < range.PairSize(); ++i) {
- const auto& pair = range.GetPair(i);
- Y_VERIFY(pair.HasStatus());
- if (pair.GetStatus() != NKikimrProto::OK) {
- LOG_ERROR_S(
- ctx, NKikimrServices::PERSQUEUE,
- "read range error topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
- << " got status " << pair.GetStatus() << " for key " << (pair.HasKey() ? pair.GetKey() : "unknown")
- );
-
- ctx.Send(Tablet, new TEvents::TEvPoisonPill());
- return;
- }
- Y_VERIFY(pair.HasKey());
- Y_VERIFY(pair.HasValue());
-
- key = &pair.GetKey();
- if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkSourceId) {
- SourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), ctx.Now());
- } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkProtoSourceId) {
- SourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), ctx.Now());
- } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkUser) {
- UsersInfoStorage->Parse(*key, pair.GetValue(), ctx);
- } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkUserDeprecated) {
- UsersInfoStorage->ParseDeprecated(*key, pair.GetValue(), ctx);
- }
- }
- //make next step
- if (range.GetStatus() == NKikimrProto::OVERRUN) {
- Y_VERIFY(key);
- RequestInfoRange(ctx, Tablet, Partition, *key);
- } else {
- InitState = WaitDataRange;
- RequestDataRange(ctx, Tablet, Partition, "");
- }
- break;
- case NKikimrProto::NODATA:
- InitState = WaitDataRange;
- RequestDataRange(ctx, Tablet, Partition, "");
- break;
- case NKikimrProto::ERROR:
- LOG_ERROR_S(
- ctx, NKikimrServices::PERSQUEUE,
- "read topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " error"
- );
- ctx.Send(Tablet, new TEvents::TEvPoisonPill());
- break;
- default:
- Cerr << "ERROR " << range.GetStatus() << "\n";
- Y_FAIL("bad status");
- };
-}
-
-void TPartition::FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) {
- for (ui32 i = 0; i < range.PairSize(); ++i) {
- auto pair = range.GetPair(i);
- Y_VERIFY(pair.GetStatus() == NKikimrProto::OK); //this is readrange without keys, only OK could be here
- TKey k(pair.GetKey());
- if (DataKeysBody.empty()) { //no data - this is first pair of first range
- Head.Offset = EndOffset = StartOffset = k.GetOffset();
- if (k.GetPartNo() > 0) ++StartOffset;
- Head.PartNo = 0;
- } else {
- Y_VERIFY(EndOffset <= k.GetOffset(), "%s", pair.GetKey().c_str());
- if (EndOffset < k.GetOffset()) {
- GapOffsets.push_back(std::make_pair(EndOffset, k.GetOffset()));
- GapSize += k.GetOffset() - EndOffset;
- }
- }
- Y_VERIFY(k.GetCount() + k.GetInternalPartsCount() > 0);
- Y_VERIFY(k.GetOffset() >= EndOffset);
- EndOffset = k.GetOffset() + k.GetCount();
- //at this point EndOffset > StartOffset
- if (!k.IsHead()) //head.Size will be filled after read or head blobs
- BodySize += pair.GetValueSize();
-
- LOG_DEBUG_S(
- ctx, NKikimrServices::PERSQUEUE,
- "Got data topic " << TopicConverter->GetClientsideName() << " partition " << k.GetPartition()
- << " offset " << k.GetOffset() << " count " << k.GetCount() << " size " << pair.GetValueSize()
- << " so " << StartOffset << " eo " << EndOffset << " " << pair.GetKey()
- );
- DataKeysBody.push_back({k, pair.GetValueSize(), TInstant::Seconds(pair.GetCreationUnixTime()), DataKeysBody.empty() ? 0 : DataKeysBody.back().CumulativeSize + DataKeysBody.back().Size});
- }
-
- Y_VERIFY(EndOffset >= StartOffset);
-}
-
-void TPartition::FormHeadAndProceed(const TActorContext& ctx) {
- Head.Offset = EndOffset;
- Head.PartNo = 0;
- TVector<TString> keys;
- while (DataKeysBody.size() > 0 && DataKeysBody.back().Key.IsHead()) {
- Y_VERIFY(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() == Head.Offset); //no gaps in head allowed
- HeadKeys.push_front(DataKeysBody.back());
- Head.Offset = DataKeysBody.back().Key.GetOffset();
- Head.PartNo = DataKeysBody.back().Key.GetPartNo();
- DataKeysBody.pop_back();
- }
- for (const auto& p : DataKeysBody) {
- Y_VERIFY(!p.Key.IsHead());
- }
-
- Y_VERIFY(HeadKeys.empty() || Head.Offset == HeadKeys.front().Key.GetOffset() && Head.PartNo == HeadKeys.front().Key.GetPartNo());
- Y_VERIFY(Head.Offset < EndOffset || Head.Offset == EndOffset && HeadKeys.empty());
- Y_VERIFY(Head.Offset >= StartOffset || Head.Offset == StartOffset - 1 && Head.PartNo > 0);
-
- //form head request
- for (auto& p : HeadKeys) {
- keys.push_back({p.Key.Data(), p.Key.Size()});
- }
- Y_VERIFY(keys.size() < TotalMaxCount);
- if (keys.empty()) {
- InitComplete(ctx);
- return;
- }
- InitState = WaitDataRead;
- RequestData(ctx, Tablet, keys);
-}
-
-void TPartition::HandleDataRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) {
- Y_VERIFY(range.HasStatus());
- switch(range.GetStatus()) {
- case NKikimrProto::OK:
- case NKikimrProto::OVERRUN:
-
- FillBlobsMetaData(range, ctx);
-
- if (range.GetStatus() == NKikimrProto::OVERRUN) { //request rest of range
- Y_VERIFY(range.PairSize());
- RequestDataRange(ctx, Tablet, Partition, range.GetPair(range.PairSize() - 1).GetKey());
- return;
- }
- FormHeadAndProceed(ctx);
- break;
- case NKikimrProto::NODATA:
- InitComplete(ctx);
- break;
- default:
- Cerr << "ERROR " << range.GetStatus() << "\n";
- Y_FAIL("bad status");
- };
-}
-
-void TPartition::HandleDataRead(const NKikimrClient::TResponse& response, const TActorContext& ctx) {
- Y_VERIFY(InitState == WaitDataRead);
- ui32 currentLevel = 0;
- Y_VERIFY(HeadKeys.size() == response.ReadResultSize());
- for (ui32 i = 0; i < response.ReadResultSize(); ++i) {
- auto& read = response.GetReadResult(i);
- Y_VERIFY(read.HasStatus());
- switch(read.GetStatus()) {
- case NKikimrProto::OK: {
- const TKey& key = HeadKeys[i].Key;
- ui32 size = HeadKeys[i].Size;
- Y_VERIFY(key.IsHead());
- ui64 offset = key.GetOffset();
- while (currentLevel + 1 < TotalLevels && size < CompactLevelBorder[currentLevel + 1])
- ++currentLevel;
- Y_VERIFY(size < CompactLevelBorder[currentLevel]);
-
- DataKeysHead[currentLevel].AddKey(key, size);
- Y_VERIFY(DataKeysHead[currentLevel].KeysCount() < AppData(ctx)->PQConfig.GetMaxBlobsPerLevel());
- Y_VERIFY(!DataKeysHead[currentLevel].NeedCompaction());
-
- LOG_DEBUG_S(
- ctx, NKikimrServices::PERSQUEUE,
- "read res partition topic '" << TopicConverter->GetClientsideName()
- << "' parititon " << key.GetPartition() << " offset " << offset << " endOffset " << EndOffset
- << " key " << key.GetOffset() << "," << key.GetCount() << " valuesize " << read.GetValue().size()
- << " expected " << size
- );
-
- Y_VERIFY(offset + 1 >= StartOffset);
- Y_VERIFY(offset < EndOffset);
- Y_VERIFY(size == read.GetValue().size());
-
- for (TBlobIterator it(key, read.GetValue()); it.IsValid(); it.Next()) {
- Head.Batches.push_back(it.GetBatch());
- }
- Head.PackedSize += size;
-
- break;
- }
- case NKikimrProto::OVERRUN:
- Y_FAIL("implement overrun in readresult!!");
- return;
- case NKikimrProto::NODATA:
- Y_FAIL("NODATA can't be here");
- return;
- case NKikimrProto::ERROR:
- LOG_ERROR_S(
- ctx, NKikimrServices::PERSQUEUE,
- "tablet " << TabletID << " HandleOnInit topic '" << TopicConverter->GetClientsideName()
- << "' partition " << Partition
- << " ReadResult " << i << " status NKikimrProto::ERROR result message: \"" << read.GetMessage()
- << " \" errorReason: \"" << response.GetErrorReason() << "\""
- );
- ctx.Send(Tablet, new TEvents::TEvPoisonPill());
- return;
- default:
- Cerr << "ERROR " << read.GetStatus() << " message: \"" << read.GetMessage() << "\"\n";
- Y_FAIL("bad status");
-
- };
- }
-
- Y_VERIFY(Head.PackedSize > 0);
- Y_VERIFY(Head.PackedSize < MaxBlobSize);
- Y_VERIFY(Head.GetNextOffset() == EndOffset);
- Y_VERIFY(std::accumulate(DataKeysHead.begin(), DataKeysHead.end(), 0u,
- [](ui32 sum, const TKeyLevel& level){return sum + level.Sum();}) == Head.PackedSize);
-
- InitComplete(ctx);
-}
-
-
-void TPartition::HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
-
- auto& response = ev->Get()->Record;
- if (response.GetStatus() != NMsgBusProxy::MSTATUS_OK) {
- LOG_ERROR_S(
- ctx, NKikimrServices::PERSQUEUE,
- "commands for topic '" << TopicConverter->GetClientsideName() << " partition " << Partition
- << " are not processed at all, got KV error " << response.GetStatus()
- );
- ctx.Send(Tablet, new TEvents::TEvPoisonPill());
- return;
- }
- bool diskIsOk = true;
- for (ui32 i = 0; i < response.GetStatusResultSize(); ++i) {
- auto& res = response.GetGetStatusResult(i);
- if (res.GetStatus() != NKikimrProto::OK) {
- LOG_ERROR_S(
- ctx, NKikimrServices::PERSQUEUE,
- "commands for topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition <<
- " are not processed at all, got KV error in CmdGetStatus " << res.GetStatus()
- );
- ctx.Send(Tablet, new TEvents::TEvPoisonPill());
- return;
- }
- diskIsOk = diskIsOk && CheckDiskStatus(res.GetStatusFlags());
- }
- if (response.GetStatusResultSize())
- DiskIsFull = !diskIsOk;
-
- switch(InitState) {
- case WaitConfig:
- Y_VERIFY(response.ReadResultSize() == 1);
- HandleConfig(response, ctx);
- break;
- case WaitDiskStatus:
- Y_VERIFY(response.GetStatusResultSize());
- HandleGetDiskStatus(response, ctx);
- break;
- case WaitMetaRead:
- Y_VERIFY(response.ReadResultSize() == 2);
- HandleMetaRead(response, ctx);
- break;
- case WaitInfoRange:
- Y_VERIFY(response.ReadRangeResultSize() == 1);
- HandleInfoRangeRead(response.GetReadRangeResult(0), ctx);
- break;
- case WaitDataRange:
- Y_VERIFY(response.ReadRangeResultSize() == 1);
- HandleDataRangeRead(response.GetReadRangeResult(0), ctx);
- break;
- case WaitDataRead:
- Y_VERIFY(response.ReadResultSize());
- HandleDataRead(response, ctx);
- break;
- default:
- Y_FAIL("Unknown state");
-
- };
-}
-
void TPartition::InitComplete(const TActorContext& ctx) {
if (StartOffset == EndOffset && EndOffset == 0) {
for (auto& [user, info] : UsersInfoStorage->GetAll()) {
@@ -1937,7 +1133,6 @@ void TPartition::UpdateUserInfoEndOffset(const TInstant& now) {
userInfo.second.EndOffset = (i64)EndOffset;
userInfo.second.UpdateReadingTimeAndState(now);
}
-
}
void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx) {
@@ -2112,8 +1307,6 @@ void TPartition::Handle(TEvPQ::TEvReserveBytes::TPtr& ev, const TActorContext& c
}
-
-
void TPartition::Handle(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx) {
NKikimrPQ::TOffsetsResponse::TPartResult result;
result.SetPartition(Partition);
@@ -2843,8 +2036,6 @@ TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(const ui64 startOffse
return blobs;
}
-
-
TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset) {
Y_UNUSED(readTimestampMs);
ui32& count = *rcount;
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 8af1bb681ce..319b3d4355c 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -3,6 +3,7 @@
#include "blob.h"
#include "header.h"
#include "key.h"
+#include "partition_init.h"
#include "partition_types.h"
#include "quota_tracker.h"
#include "sourceid.h"
@@ -73,6 +74,16 @@ struct TTransaction {
};
class TPartition : public TActorBootstrapped<TPartition> {
+ friend TInitializer;
+ friend TInitializerStep;
+ friend TInitConfigStep;
+ friend TInitInternalFieldsStep;
+ friend TInitDiskStatusStep;
+ friend TInitMetaStep;
+ friend TInitInfoRangeStep;
+ friend TInitDataRangeStep;
+ friend TInitDataStep;
+
private:
static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10;
@@ -98,10 +109,8 @@ private:
void CreateMirrorerActor();
void DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const TActorContext& ctx);
void FailBadClient(const TActorContext& ctx);
- void FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx);
void FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx);
void FilterDeadlinedWrites(const TActorContext& ctx);
- void FormHeadAndProceed(const TActorContext& ctx);
void Handle(NReadSpeedLimiterEvents::TEvCounters::TPtr& ev, const TActorContext& ctx);
void Handle(NReadSpeedLimiterEvents::TEvResponse::TPtr& ev, const TActorContext& ctx);
@@ -134,18 +143,12 @@ private:
void Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx);
void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext& ctx);
- void HandleDataRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx);
- void HandleDataRead(const NKikimrClient::TResponse& range, const TActorContext& ctx);
- void HandleGetDiskStatus(const NKikimrClient::TResponse& res, const TActorContext& ctx);
- void HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx);
- void HandleMetaRead(const NKikimrClient::TResponse& response, const TActorContext& ctx);
void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx);
void HandleOnIdle(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx);
void HandleOnIdle(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx);
void HandleOnIdle(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx);
void HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr& ev, const TActorContext& ctx);
void HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx);
- void HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx);
void HandleOnInit(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx);
void HandleOnInit(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx);
void HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx);
@@ -302,8 +305,6 @@ private:
void InitPendingUserInfoForImportantClients(const NKikimrPQ::TPQTabletConfig& config,
const TActorContext& ctx);
- void RequestConfig(const TActorContext& ctx);
- void HandleConfig(const NKikimrClient::TResponse& res, const TActorContext& ctx);
void Initialize(const TActorContext& ctx);
template <typename T>
@@ -375,6 +376,8 @@ private:
return ss.Str();
}
+ TInitializer Initializer;
+
STFUNC(StateInit)
{
NPersQueue::TCounterTimeKeeper keeper(TabletCounters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]);
@@ -384,7 +387,6 @@ private:
TRACE_EVENT(NKikimrServices::PERSQUEUE);
switch (ev->GetTypeRewrite()) {
CFunc(TEvents::TSystem::Wakeup, HandleWakeup);
- HFuncTraced(TEvKeyValue::TEvResponse, HandleOnInit); //result of reads
HFuncTraced(TEvents::TEvPoisonPill, Handle);
HFuncTraced(TEvPQ::TEvMonRequest, HandleMonitoring);
HFuncTraced(TEvPQ::TEvChangePartitionConfig, Handle);
@@ -401,7 +403,9 @@ private:
HFuncTraced(TEvPQ::TEvTxRollback, HandleOnInit);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
default:
- LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev));
+ if (!Initializer.Handle(ev, ctx)) {
+ LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev));
+ }
break;
};
}
@@ -512,17 +516,8 @@ private:
break;
};
}
-private:
- enum EInitState {
- WaitConfig,
- WaitDiskStatus,
- WaitInfoRange,
- WaitDataRange,
- WaitDataRead,
- WaitMetaRead,
- WaitTxInfo
- };
+private:
ui64 TabletID;
ui32 Partition;
NKikimrPQ::TPQTabletConfig Config;
@@ -550,8 +545,6 @@ private:
TActorId Tablet;
TActorId BlobCache;
- EInitState InitState;
-
std::deque<TMessage> Requests;
std::deque<TMessage> Responses;
@@ -697,4 +690,5 @@ private:
TDeque<std::unique_ptr<IEventBase>> PendingEvents;
};
+
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp
new file mode 100644
index 00000000000..09a9341039c
--- /dev/null
+++ b/ydb/core/persqueue/partition_init.cpp
@@ -0,0 +1,943 @@
+#include "partition.h"
+#include "partition_util.h"
+
+namespace NKikimr::NPQ {
+
+static const ui32 LEVEL0 = 32;
+
+bool DiskIsFull(TEvKeyValue::TEvResponse::TPtr& ev);
+bool ValidateResponse(const TInitializerStep& step, TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx);
+
+//
+// TInitializer
+//
+
+TInitializer::TInitializer(TPartition* partition)
+ : Partition(partition)
+ , InProgress(false) {
+
+ Steps.push_back(MakeHolder<TInitConfigStep>(this));
+ Steps.push_back(MakeHolder<TInitInternalFieldsStep>(this));
+ Steps.push_back(MakeHolder<TInitDiskStatusStep>(this));
+ Steps.push_back(MakeHolder<TInitMetaStep>(this));
+ Steps.push_back(MakeHolder<TInitInfoRangeStep>(this));
+ Steps.push_back(MakeHolder<TInitDataRangeStep>(this));
+ Steps.push_back(MakeHolder<TInitDataStep>(this));
+
+ CurrentStep = Steps.begin();
+}
+
+void TInitializer::Execute(const TActorContext& ctx) {
+ Y_VERIFY(!InProgress, "Initialization already in progress");
+ InProgress = true;
+ DoNext(ctx);
+}
+
+bool TInitializer::Handle(STFUNC_SIG) {
+ Y_VERIFY(InProgress, "Initialization is not started");
+ return CurrentStep->Get()->Handle(ev,ctx);
+}
+
+void TInitializer::Next(const TActorContext& ctx) {
+ ++CurrentStep;
+ DoNext(ctx);
+}
+
+void TInitializer::Done(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Initializing topic '" << Partition->TopicConverter->GetClientsideName()
+ << "' partition " << Partition->Partition
+ << ". Completed.");
+ InProgress = false;
+ Partition->InitComplete(ctx);
+}
+
+void TInitializer::DoNext(const TActorContext& ctx) {
+ if (CurrentStep == Steps.end()) {
+ Done(ctx);
+ return;
+ }
+
+ if (Partition->NewPartition) {
+ while(CurrentStep->Get()->SkipNewPartition) {
+ if (++CurrentStep == Steps.end()) {
+ Done(ctx);
+ return;
+ }
+ }
+ }
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Initializing topic '" << Partition->TopicConverter->GetClientsideName()
+ << "' partition " << Partition->Partition
+ << ". Step " << CurrentStep->Get()->Name);
+ CurrentStep->Get()->Execute(ctx);
+}
+
+
+//
+// TInitializerStep
+//
+
+TInitializerStep::TInitializerStep(TInitializer* initializer, TString name, bool skipNewPartition)
+ : Name(name)
+ , SkipNewPartition(skipNewPartition)
+ , Initializer(initializer) {
+}
+
+void TInitializerStep::Done(const TActorContext& ctx) {
+ Initializer->Next(ctx);
+}
+
+bool TInitializerStep::Handle(STFUNC_SIG) {
+ Y_UNUSED(ev);
+ Y_UNUSED(ctx);
+
+ return false;
+}
+
+TPartition* TInitializerStep::Partition() const {
+ return Initializer->Partition;
+}
+
+ui32 TInitializerStep::PartitionId() const {
+ return Initializer->Partition->Partition;
+}
+
+void TInitializerStep::PoisonPill(const TActorContext& ctx) {
+ ctx.Send(Partition()->Tablet, new TEvents::TEvPoisonPill());
+}
+
+TString TInitializerStep::TopicName() const {
+ return Partition()->TopicConverter->GetClientsideName();
+}
+
+
+//
+// TBaseKVStep
+//
+
+TBaseKVStep::TBaseKVStep(TInitializer* initializer, TString name, bool skipNewPartition)
+ : TInitializerStep(initializer, name, skipNewPartition) {
+}
+
+bool TBaseKVStep::Handle(STFUNC_SIG) {
+ switch(ev->GetTypeRewrite())
+ {
+ HFunc(TEvKeyValue::TEvResponse, Handle);
+ default:
+ return false;
+ }
+ return true;
+}
+
+
+//
+// TInitConfigStep
+//
+
+TInitConfigStep::TInitConfigStep(TInitializer* initializer)
+ : TBaseKVStep(initializer, "TInitConfigStep", false) {
+}
+
+void TInitConfigStep::Execute(const TActorContext& ctx) {
+ auto event = MakeHolder<TEvKeyValue::TEvRequest>();
+ auto read = event->Record.AddCmdRead();
+ read->SetKey(Partition()->GetKeyConfig());
+
+ ctx.Send(Partition()->Tablet, event.Release());
+}
+
+void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
+ if (!ValidateResponse(*this, ev, ctx)) {
+ PoisonPill(ctx);
+ return;
+ }
+
+ auto& res = ev->Get()->Record;
+ Y_VERIFY(res.ReadResultSize() == 1);
+
+ auto& response = res.GetReadResult(0);
+
+ switch (response.GetStatus()) {
+ case NKikimrProto::OK:
+ Y_VERIFY(Partition()->Config.ParseFromString(response.GetValue()));
+ Y_VERIFY(Partition()->Config.GetVersion() <= Partition()->TabletConfig.GetVersion());
+
+ if (Partition()->Config.GetVersion() < Partition()->TabletConfig.GetVersion()) {
+ auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(Partition()->TopicConverter,
+ Partition()->TabletConfig);
+ Partition()->PushFrontDistrTx(event.Release());
+ }
+ break;
+
+ case NKikimrProto::NODATA:
+ Partition()->Config = Partition()->TabletConfig;
+ break;
+
+ case NKikimrProto::ERROR:
+ LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
+ "Partition " << Partition()->Partition << " can't read config");
+ PoisonPill(ctx);
+ return;
+
+ default:
+ Cerr << "ERROR " << response.GetStatus() << "\n";
+ Y_FAIL("bad status");
+ };
+
+ Done(ctx);
+}
+
+
+//
+// TInitInternalFieldsStep
+//
+TInitInternalFieldsStep::TInitInternalFieldsStep(TInitializer* initializer)
+ : TInitializerStep(initializer, "TInitializerStep", false) {
+}
+
+void TInitInternalFieldsStep::Execute(const TActorContext &ctx) {
+ Partition()->Initialize(ctx);
+
+ Done(ctx);
+}
+
+
+//
+// InitDiskStatusStep
+//
+
+TInitDiskStatusStep::TInitDiskStatusStep(TInitializer* initializer)
+ : TBaseKVStep(initializer, "TInitDiskStatusStep", true) {
+}
+
+void TInitDiskStatusStep::Execute(const TActorContext& ctx) {
+ THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
+
+ AddCheckDiskRequest(request.Get(), Partition()->Config.GetPartitionConfig().GetNumChannels());
+
+ ctx.Send(Partition()->Tablet, request.Release());
+}
+
+void TInitDiskStatusStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
+ if (!ValidateResponse(*this, ev, ctx)) {
+ PoisonPill(ctx);
+ return;
+ }
+
+ auto& response = ev->Get()->Record;
+ Y_VERIFY(response.GetStatusResultSize());
+
+ Partition()->DiskIsFull = DiskIsFull(ev);
+ if (!Partition()->DiskIsFull) {
+ Partition()->LogAndCollectError(NKikimrServices::PERSQUEUE, "disk is full", ctx);
+ }
+
+ Done(ctx);
+}
+
+
+//
+// TInitMetaStep
+//
+
+TInitMetaStep::TInitMetaStep(TInitializer* initializer)
+ : TBaseKVStep(initializer, "TInitMetaStep", true) {
+}
+
+void TInitMetaStep::Execute(const TActorContext& ctx) {
+ auto addKey = [](NKikimrClient::TKeyValueRequest& request, TKeyPrefix::EType type, ui32 partition) {
+ auto read = request.AddCmdRead();
+ TKeyPrefix key{type, partition};
+ read->SetKey(key.Data(), key.Size());
+ };
+
+ THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
+
+ addKey(request->Record, TKeyPrefix::TypeMeta, PartitionId());
+ addKey(request->Record, TKeyPrefix::TypeTxMeta, PartitionId());
+
+ ctx.Send(Partition()->Tablet, request.Release());
+}
+
+void TInitMetaStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorContext &ctx) {
+ if (!ValidateResponse(*this, ev, ctx)) {
+ PoisonPill(ctx);
+ return;
+ }
+
+ auto& response = ev->Get()->Record;
+ Y_VERIFY(response.ReadResultSize() == 2);
+
+ auto handleReadResult = [&](const NKikimrClient::TKeyValueResponse::TReadResult& response, auto&& action) {
+ switch (response.GetStatus()) {
+ case NKikimrProto::OK:
+ action(response);
+ break;
+ case NKikimrProto::NODATA:
+ break;
+ case NKikimrProto::ERROR:
+ LOG_ERROR_S(
+ ctx, NKikimrServices::PERSQUEUE,
+ "read topic '" << TopicName() << "' partition " << PartitionId() << " error"
+ );
+ PoisonPill(ctx);
+ break;
+ default:
+ Cerr << "ERROR " << response.GetStatus() << "\n";
+ Y_FAIL("bad status");
+ };
+ };
+
+ auto loadMeta = [&](const NKikimrClient::TKeyValueResponse::TReadResult& response) {
+ NKikimrPQ::TPartitionMeta meta;
+ bool res = meta.ParseFromString(response.GetValue());
+ Y_VERIFY(res);
+ /* Bring back later, when switch to 21-2 will be unable
+ StartOffset = meta.GetStartOffset();
+ EndOffset = meta.GetEndOffset();
+ if (StartOffset == EndOffset) {
+ NewHead.Offset = Head.Offset = EndOffset;
+ }
+ */
+ Partition()->SubDomainOutOfSpace = meta.GetSubDomainOutOfSpace();
+ };
+ handleReadResult(response.GetReadResult(0), loadMeta);
+
+ auto loadTxMeta = [this](const NKikimrClient::TKeyValueResponse::TReadResult& response) {
+ NKikimrPQ::TPartitionTxMeta meta;
+ bool res = meta.ParseFromString(response.GetValue());
+ Y_VERIFY(res);
+
+ if (meta.HasPlanStep()) {
+ Partition()->PlanStep = meta.GetPlanStep();
+ }
+ if (meta.HasTxId()) {
+ Partition()->TxId = meta.GetTxId();
+ }
+ };
+ handleReadResult(response.GetReadResult(1), loadTxMeta);
+
+ Done(ctx);
+}
+
+
+//
+// TInitInfoRangeStep
+//
+
+TInitInfoRangeStep::TInitInfoRangeStep(TInitializer* initializer)
+ : TBaseKVStep(initializer, "TInitInfoRangeStep", true) {
+}
+
+void TInitInfoRangeStep::Execute(const TActorContext &ctx) {
+ RequestInfoRange(ctx, Partition()->Tablet, PartitionId(), "");
+}
+
+void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorContext &ctx) {
+ if (!ValidateResponse(*this, ev, ctx)) {
+ PoisonPill(ctx);
+ return;
+ }
+
+ auto& response = ev->Get()->Record;
+ Y_VERIFY(response.ReadRangeResultSize() == 1);
+
+ auto& range = response.GetReadRangeResult(0);
+ auto now = ctx.Now();
+
+ Y_VERIFY(response.ReadRangeResultSize() == 1);
+ //megaqc check here all results
+ Y_VERIFY(range.HasStatus());
+ const TString *key = nullptr;
+ switch (range.GetStatus()) {
+ case NKikimrProto::OK:
+ case NKikimrProto::OVERRUN: {
+ auto& sourceIdStorage = Partition()->SourceIdStorage;
+ auto& usersInfoStorage = Partition()->UsersInfoStorage;
+
+ for (ui32 i = 0; i < range.PairSize(); ++i) {
+ const auto& pair = range.GetPair(i);
+ Y_VERIFY(pair.HasStatus());
+ if (pair.GetStatus() != NKikimrProto::OK) {
+ LOG_ERROR_S(
+ ctx, NKikimrServices::PERSQUEUE,
+ "read range error topic '" << TopicName() << "' partition " << PartitionId()
+ << " got status " << pair.GetStatus() << " for key " << (pair.HasKey() ? pair.GetKey() : "unknown")
+ );
+
+ PoisonPill(ctx);
+ return;
+ }
+
+ Y_VERIFY(pair.HasKey());
+ Y_VERIFY(pair.HasValue());
+
+ key = &pair.GetKey();
+ const auto type = (*key)[TKeyPrefix::MarkPosition()];
+ if (type == TKeyPrefix::MarkSourceId) {
+ sourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), now);
+ } else if (type == TKeyPrefix::MarkProtoSourceId) {
+ sourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), now);
+ } else if (type == TKeyPrefix::MarkUser) {
+ usersInfoStorage->Parse(*key, pair.GetValue(), ctx);
+ } else if (type == TKeyPrefix::MarkUserDeprecated) {
+ usersInfoStorage->ParseDeprecated(*key, pair.GetValue(), ctx);
+ }
+ }
+ //make next step
+ if (range.GetStatus() == NKikimrProto::OVERRUN) {
+ Y_VERIFY(key);
+ RequestInfoRange(ctx, Partition()->Tablet, PartitionId(), *key);
+ } else {
+ Done(ctx);
+ }
+ break;
+ }
+ case NKikimrProto::NODATA:
+ Done(ctx);
+ break;
+ case NKikimrProto::ERROR:
+ LOG_ERROR_S(
+ ctx, NKikimrServices::PERSQUEUE,
+ "read topic '" << TopicName() << "' partition " << PartitionId() << " error"
+ );
+ PoisonPill(ctx);
+ break;
+ default:
+ Cerr << "ERROR " << range.GetStatus() << "\n";
+ Y_FAIL("bad status");
+ };
+}
+
+
+//
+// TInitDataRangeStep
+//
+
+TInitDataRangeStep::TInitDataRangeStep(TInitializer* initializer)
+ : TBaseKVStep(initializer, "TInitDataRangeStep", true) {
+}
+
+void TInitDataRangeStep::Execute(const TActorContext &ctx) {
+ RequestDataRange(ctx, Partition()->Tablet, PartitionId(), "");
+}
+
+void TInitDataRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorContext &ctx) {
+ if (!ValidateResponse(*this, ev, ctx)) {
+ PoisonPill(ctx);
+ return;
+ }
+
+ auto& response = ev->Get()->Record;
+ Y_VERIFY(response.ReadRangeResultSize() == 1);
+
+ auto& range = response.GetReadRangeResult(0);
+
+ Y_VERIFY(range.HasStatus());
+ switch(range.GetStatus()) {
+ case NKikimrProto::OK:
+ case NKikimrProto::OVERRUN:
+
+ FillBlobsMetaData(range, ctx);
+
+ if (range.GetStatus() == NKikimrProto::OVERRUN) { //request rest of range
+ Y_VERIFY(range.PairSize());
+ RequestDataRange(ctx, Partition()->Tablet, PartitionId(), range.GetPair(range.PairSize() - 1).GetKey());
+ return;
+ }
+ FormHeadAndProceed();
+
+ Done(ctx);
+ break;
+ case NKikimrProto::NODATA:
+ Done(ctx);
+ break;
+ default:
+ Cerr << "ERROR " << range.GetStatus() << "\n";
+ Y_FAIL("bad status");
+ };
+}
+
+void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) {
+ auto& endOffset = Partition()->EndOffset;
+ auto& startOffset = Partition()->StartOffset;
+ auto& head = Partition()->Head;
+ auto& dataKeysBody = Partition()->DataKeysBody;
+ auto& gapOffsets = Partition()->GapOffsets;
+ auto& gapSize = Partition()->GapSize;
+ auto& bodySize = Partition()->BodySize;
+
+ for (ui32 i = 0; i < range.PairSize(); ++i) {
+ auto pair = range.GetPair(i);
+ Y_VERIFY(pair.GetStatus() == NKikimrProto::OK); //this is readrange without keys, only OK could be here
+ TKey k(pair.GetKey());
+ if (dataKeysBody.empty()) { //no data - this is first pair of first range
+ head.Offset = endOffset = startOffset = k.GetOffset();
+ if (k.GetPartNo() > 0) ++startOffset;
+ head.PartNo = 0;
+ } else {
+ Y_VERIFY(endOffset <= k.GetOffset(), "%s", pair.GetKey().c_str());
+ if (endOffset < k.GetOffset()) {
+ gapOffsets.push_back(std::make_pair(endOffset, k.GetOffset()));
+ gapSize += k.GetOffset() - endOffset;
+ }
+ }
+ Y_VERIFY(k.GetCount() + k.GetInternalPartsCount() > 0);
+ Y_VERIFY(k.GetOffset() >= endOffset);
+ endOffset = k.GetOffset() + k.GetCount();
+ //at this point EndOffset > StartOffset
+ if (!k.IsHead()) //head.Size will be filled after read or head blobs
+ bodySize += pair.GetValueSize();
+
+ LOG_DEBUG_S(
+ ctx, NKikimrServices::PERSQUEUE,
+ "Got data topic " << TopicName() << " partition " << k.GetPartition()
+ << " offset " << k.GetOffset() << " count " << k.GetCount() << " size " << pair.GetValueSize()
+ << " so " << startOffset << " eo " << endOffset << " " << pair.GetKey()
+ );
+ dataKeysBody.push_back({k, pair.GetValueSize(),
+ TInstant::Seconds(pair.GetCreationUnixTime()),
+ dataKeysBody.empty() ? 0 : dataKeysBody.back().CumulativeSize + dataKeysBody.back().Size});
+ }
+
+ Y_VERIFY(endOffset >= startOffset);
+}
+
+
+void TInitDataRangeStep::FormHeadAndProceed() {
+ auto& endOffset = Partition()->EndOffset;
+ auto& startOffset = Partition()->StartOffset;
+ auto& head = Partition()->Head;
+ auto& headKeys = Partition()->HeadKeys;
+ auto& dataKeysBody = Partition()->DataKeysBody;
+
+ head.Offset = endOffset;
+ head.PartNo = 0;
+
+ while (dataKeysBody.size() > 0 && dataKeysBody.back().Key.IsHead()) {
+ Y_VERIFY(dataKeysBody.back().Key.GetOffset() + dataKeysBody.back().Key.GetCount() == head.Offset); //no gaps in head allowed
+ headKeys.push_front(dataKeysBody.back());
+ head.Offset = dataKeysBody.back().Key.GetOffset();
+ head.PartNo = dataKeysBody.back().Key.GetPartNo();
+ dataKeysBody.pop_back();
+ }
+ for (const auto& p : dataKeysBody) {
+ Y_VERIFY(!p.Key.IsHead());
+ }
+
+ Y_VERIFY(headKeys.empty() || head.Offset == headKeys.front().Key.GetOffset() && head.PartNo == headKeys.front().Key.GetPartNo());
+ Y_VERIFY(head.Offset < endOffset || head.Offset == endOffset && headKeys.empty());
+ Y_VERIFY(head.Offset >= startOffset || head.Offset == startOffset - 1 && head.PartNo > 0);
+}
+
+
+//
+// TInitDataStep
+//
+
+TInitDataStep::TInitDataStep(TInitializer* initializer)
+ : TBaseKVStep(initializer, "TInitDataStep", true) {
+}
+
+void TInitDataStep::Execute(const TActorContext &ctx) {
+ TVector<TString> keys;
+ //form head request
+ for (auto& p : Partition()->HeadKeys) {
+ keys.push_back({p.Key.Data(), p.Key.Size()});
+ }
+ Y_VERIFY(keys.size() < Partition()->TotalMaxCount);
+ if (keys.empty()) {
+ Done(ctx);
+ return;
+ }
+
+ THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
+ for (auto& key: keys) {
+ auto read = request->Record.AddCmdRead();
+ read->SetKey(key);
+ }
+ ctx.Send(Partition()->Tablet, request.Release());
+}
+
+void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorContext &ctx) {
+ if (!ValidateResponse(*this, ev, ctx)) {
+ PoisonPill(ctx);
+ return;
+ }
+
+ auto& response = ev->Get()->Record;
+ Y_VERIFY(response.ReadResultSize());
+
+ auto& head = Partition()->Head;
+ auto& headKeys = Partition()->HeadKeys;
+ auto& dataKeysHead = Partition()->DataKeysHead;
+ auto& compactLevelBorder = Partition()->CompactLevelBorder;
+ auto totalLevels = Partition()->TotalLevels;
+
+ ui32 currentLevel = 0;
+ Y_VERIFY(headKeys.size() == response.ReadResultSize());
+ for (ui32 i = 0; i < response.ReadResultSize(); ++i) {
+ auto& read = response.GetReadResult(i);
+ Y_VERIFY(read.HasStatus());
+ switch(read.GetStatus()) {
+ case NKikimrProto::OK: {
+ const TKey& key = headKeys[i].Key;
+ Y_VERIFY(key.IsHead());
+
+ ui32 size = headKeys[i].Size;
+ ui64 offset = key.GetOffset();
+ while (currentLevel + 1 < totalLevels && size < compactLevelBorder[currentLevel + 1])
+ ++currentLevel;
+ Y_VERIFY(size < compactLevelBorder[currentLevel]);
+
+ dataKeysHead[currentLevel].AddKey(key, size);
+ Y_VERIFY(dataKeysHead[currentLevel].KeysCount() < AppData(ctx)->PQConfig.GetMaxBlobsPerLevel());
+ Y_VERIFY(!dataKeysHead[currentLevel].NeedCompaction());
+
+ LOG_DEBUG_S(
+ ctx, NKikimrServices::PERSQUEUE,
+ "read res partition topic '" << TopicName()
+ << "' parititon " << key.GetPartition() << " offset " << offset << " endOffset " << Partition()->EndOffset
+ << " key " << key.GetOffset() << "," << key.GetCount() << " valuesize " << read.GetValue().size()
+ << " expected " << size
+ );
+
+ Y_VERIFY(offset + 1 >= Partition()->StartOffset);
+ Y_VERIFY(offset < Partition()->EndOffset);
+ Y_VERIFY(size == read.GetValue().size());
+
+ for (TBlobIterator it(key, read.GetValue()); it.IsValid(); it.Next()) {
+ head.Batches.push_back(it.GetBatch());
+ }
+ head.PackedSize += size;
+
+ break;
+ }
+ case NKikimrProto::OVERRUN:
+ Y_FAIL("implement overrun in readresult!!");
+ return;
+ case NKikimrProto::NODATA:
+ Y_FAIL("NODATA can't be here");
+ return;
+ case NKikimrProto::ERROR:
+ LOG_ERROR_S(
+ ctx, NKikimrServices::PERSQUEUE,
+ "tablet " << Partition()->TabletID << " HandleOnInit topic '" << TopicName()
+ << "' partition " << PartitionId()
+ << " ReadResult " << i << " status NKikimrProto::ERROR result message: \"" << read.GetMessage()
+ << " \" errorReason: \"" << response.GetErrorReason() << "\""
+ );
+ PoisonPill(ctx);
+ return;
+ default:
+ Cerr << "ERROR " << read.GetStatus() << " message: \"" << read.GetMessage() << "\"\n";
+ Y_FAIL("bad status");
+
+ };
+ }
+
+ Done(ctx);
+}
+
+
+//
+// TPartition
+//
+
+void TPartition::Bootstrap(const TActorContext& ctx) {
+ Become(&TThis::StateInit);
+ Initializer.Execute(ctx);
+}
+
+void TPartition::Initialize(const TActorContext& ctx) {
+ CreationTime = ctx.Now();
+ WriteCycleStartTime = ctx.Now();
+ WriteQuota.ConstructInPlace(Config.GetPartitionConfig().GetBurstSize(),
+ Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(),
+ ctx.Now());
+ WriteTimestamp = ctx.Now();
+ LastUsedStorageMeterTimestamp = ctx.Now();
+ WriteTimestampEstimate = ManageWriteTimestampEstimate ? ctx.Now() : TInstant::Zero();
+
+ CloudId = Config.GetYcCloudId();
+ DbId = Config.GetYdbDatabaseId();
+ DbPath = Config.GetYdbDatabasePath();
+ FolderId = Config.GetYcFolderId();
+
+ CalcTopicWriteQuotaParams(AppData()->PQConfig,
+ IsLocalDC,
+ TopicConverter,
+ TabletID,
+ ctx,
+ TopicWriteQuoterPath,
+ TopicWriteQuotaResourcePath);
+
+ UsersInfoStorage.ConstructInPlace(DCId,
+ TabletID,
+ TopicConverter,
+ Partition,
+ Counters,
+ Config,
+ CloudId,
+ DbId,
+ Config.GetYdbDatabasePath(),
+ IsServerless,
+ FolderId);
+ TotalChannelWritesByHead.resize(Config.GetPartitionConfig().GetNumChannels());
+
+ if (Config.GetPartitionConfig().HasMirrorFrom()) {
+ ManageWriteTimestampEstimate = !Config.GetPartitionConfig().GetMirrorFrom().GetSyncWriteTime();
+ } else {
+ ManageWriteTimestampEstimate = IsLocalDC;
+ }
+
+ if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) {
+ PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(EscapeBadChars(TopicConverter->GetClientsideName()),
+ Partition,
+ Config.GetYdbDatabasePath()));
+ } else {
+ PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(TopicConverter->GetClientsideName(),
+ Partition));
+ }
+
+ UsersInfoStorage->Init(Tablet, SelfId(), ctx);
+
+ Y_VERIFY(AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() > 0);
+ ui32 border = LEVEL0;
+ MaxSizeCheck = 0;
+ MaxBlobSize = AppData(ctx)->PQConfig.GetMaxBlobSize();
+ PartitionedBlob = TPartitionedBlob(Partition, 0, 0, 0, 0, 0, Head, NewHead, true, false, MaxBlobSize);
+ for (ui32 i = 0; i < TotalLevels; ++i) {
+ CompactLevelBorder.push_back(border);
+ MaxSizeCheck += border;
+ Y_VERIFY(i + 1 < TotalLevels && border < MaxBlobSize || i + 1 == TotalLevels && border == MaxBlobSize);
+ border *= AppData(ctx)->PQConfig.GetMaxBlobsPerLevel();
+ border = Min(border, MaxBlobSize);
+ }
+ TotalMaxCount = AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() * TotalLevels;
+
+ std::reverse(CompactLevelBorder.begin(), CompactLevelBorder.end());
+
+ for (ui32 i = 0; i < TotalLevels; ++i) {
+ DataKeysHead.push_back(TKeyLevel(CompactLevelBorder[i]));
+ }
+
+ for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) {
+ auto &userInfo = UsersInfoStorage->GetOrCreate(readQuota.GetClientId(), ctx);
+ userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond());
+ }
+
+ LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "bootstrapping " << Partition << " " << ctx.SelfID);
+
+ if (AppData(ctx)->Counters) {
+ if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) {
+ SetupStreamCounters(ctx);
+ } else {
+ SetupTopicCounters(ctx);
+ }
+ }
+}
+
+void TPartition::SetupTopicCounters(const TActorContext& ctx) {
+ auto counters = AppData(ctx)->Counters;
+ auto labels = NPersQueue::GetLabels(TopicConverter);
+ const TString suffix = IsLocalDC ? "Original" : "Mirrored";
+
+ WriteBufferIsFullCounter.SetCounter(
+ NPersQueue::GetCounters(counters, "writingTime", TopicConverter),
+ {{"host", DCId},
+ {"Partition", ToString<ui32>(Partition)}},
+ {"sensor", "BufferFullTime" + suffix, true});
+
+ auto subGroup = GetServiceCounters(counters, "pqproxy|writeTimeLag");
+ InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
+ subGroup, labels, {{"sensor", "TimeLags" + suffix}}, "Interval",
+ TVector<std::pair<ui64, TString>>{
+ {100, "100ms"}, {200, "200ms"}, {500, "500ms"}, {1000, "1000ms"},
+ {2000, "2000ms"}, {5000, "5000ms"}, {10'000, "10000ms"}, {30'000, "30000ms"},
+ {60'000, "60000ms"}, {180'000,"180000ms"}, {9'999'999, "999999ms"}}, true));
+
+
+ subGroup = GetServiceCounters(counters, "pqproxy|writeInfo");
+ MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
+ subGroup, labels, {{"sensor", "MessageSize" + suffix}}, "Size",
+ TVector<std::pair<ui64, TString>>{
+ {1_KB, "1kb"}, {5_KB, "5kb"}, {10_KB, "10kb"},
+ {20_KB, "20kb"}, {50_KB, "50kb"}, {100_KB, "100kb"}, {200_KB, "200kb"},
+ {512_KB, "512kb"},{1024_KB, "1024kb"}, {2048_KB,"2048kb"}, {5120_KB, "5120kb"},
+ {10240_KB, "10240kb"}, {65536_KB, "65536kb"}, {999'999'999, "99999999kb"}}, true));
+
+ subGroup = GetServiceCounters(counters, "pqproxy|writeSession");
+ BytesWrittenTotal = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"BytesWritten" + suffix}, true);
+ BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"UncompressedBytesWritten" + suffix}, true);
+ BytesWrittenComp = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"CompactedBytesWritten" + suffix}, true);
+ MsgsWrittenTotal = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"MessagesWritten" + suffix}, true);
+
+ TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}};
+ ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs();
+ subGroup = GetServiceCounters(counters, "pqproxy|SLI");
+ WriteLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, aggr, "Write", border,
+ {100, 200, 500, 1000, 1500, 2000,
+ 5000, 10'000, 30'000, 99'999'999});
+ SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "sensor", false);
+ WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "sensor", false);
+ if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) {
+ TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
+ new NKikimr::NPQ::TPercentileCounter(
+ GetServiceCounters(counters, "pqproxy|topicWriteQuotaWait"), labels,
+ {{"sensor", "TopicWriteQuotaWait" + suffix}}, "Interval",
+ TVector<std::pair<ui64, TString>>{
+ {0, "0ms"}, {1, "1ms"}, {5, "5ms"}, {10, "10ms"},
+ {20, "20ms"}, {50, "50ms"}, {100, "100ms"}, {500, "500ms"},
+ {1000, "1000ms"}, {2500, "2500ms"}, {5000, "5000ms"},
+ {10'000, "10000ms"}, {9'999'999, "999999ms"}}, true));
+ }
+
+ PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
+ new NKikimr::NPQ::TPercentileCounter(GetServiceCounters(counters, "pqproxy|partitionWriteQuotaWait"),
+ labels, {{"sensor", "PartitionWriteQuotaWait" + suffix}}, "Interval",
+ TVector<std::pair<ui64, TString>>{
+ {0, "0ms"}, {1, "1ms"}, {5, "5ms"}, {10, "10ms"},
+ {20, "20ms"}, {50, "50ms"}, {100, "100ms"}, {500, "500ms"},
+ {1000, "1000ms"}, {2500, "2500ms"}, {5000, "5000ms"},
+ {10'000, "10000ms"}, {9'999'999, "999999ms"}}, true));
+}
+
+void TPartition::SetupStreamCounters(const TActorContext& ctx) {
+ const auto topicName = TopicConverter->GetModernName();
+ auto counters = AppData(ctx)->Counters;
+ auto subgroups = NPersQueue::GetSubgroupsForTopic(TopicConverter, CloudId, DbId, DbPath, FolderId);
+/*
+ WriteBufferIsFullCounter.SetCounter(
+ NPersQueue::GetCountersForTopic(counters, IsServerless),
+ {
+ {"database", DbPath},
+ {"cloud_id", CloudId},
+ {"folder_id", FolderId},
+ {"database_id", DbId},
+ {"topic", TopicConverter->GetFederationPath()},
+ {"host", DCId},
+ {"partition", ToString<ui32>(Partition)}},
+ {"name", "api.grpc.topic.stream_write.buffer_brimmed_milliseconds", true});
+*/
+
+ subgroups.push_back({"name", "topic.write.lag_milliseconds"});
+
+ InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {},
+ subgroups, "bin",
+ TVector<std::pair<ui64, TString>>{
+ {100, "100"}, {200, "200"}, {500, "500"},
+ {1000, "1000"}, {2000, "2000"}, {5000, "5000"},
+ {10'000, "10000"}, {30'000, "30000"}, {60'000, "60000"},
+ {180'000,"180000"}, {9'999'999, "999999"}}, true));
+
+ subgroups.back().second = "topic.write.message_size_bytes";
+ MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {},
+ subgroups, "bin",
+ TVector<std::pair<ui64, TString>>{
+ {1024, "1024"}, {5120, "5120"}, {10'240, "10240"},
+ {20'480, "20480"}, {51'200, "51200"}, {102'400, "102400"},
+ {204'800, "204800"}, {524'288, "524288"},{1'048'576, "1048576"},
+ {2'097'152,"2097152"}, {5'242'880, "5242880"}, {10'485'760, "10485760"},
+ {67'108'864, "67108864"}, {999'999'999, "99999999"}}, true));
+
+ subgroups.pop_back();
+ BytesWrittenGrpc = NKikimr::NPQ::TMultiCounter(
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
+ {"api.grpc.topic.stream_write.bytes"} , true, "name");
+ BytesWrittenTotal = NKikimr::NPQ::TMultiCounter(
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
+ {"topic.write.bytes"} , true, "name");
+
+ MsgsWrittenGrpc = NKikimr::NPQ::TMultiCounter(
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
+ {"api.grpc.topic.stream_write.messages"}, true, "name");
+ MsgsWrittenTotal = NKikimr::NPQ::TMultiCounter(
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
+ {"topic.write.messages"}, true, "name");
+
+
+ BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(
+
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
+ {"topic.write.uncompressed_bytes"}, true, "name");
+
+ TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}};
+ ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs();
+ auto subGroup = GetServiceCounters(counters, "pqproxy|SLI");
+ WriteLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, aggr, "Write", border,
+ {100, 200, 500, 1000, 1500, 2000,
+ 5000, 10'000, 30'000, 99'999'999});
+ SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false);
+ WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false);
+ if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) {
+ subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"});
+ TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
+ new NKikimr::NPQ::TPercentileCounter(
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {},
+ subgroups, "bin",
+ TVector<std::pair<ui64, TString>>{
+ {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"},
+ {20, "20"}, {50, "50"}, {100, "100"}, {500, "500"},
+ {1000, "1000"}, {2500, "2500"}, {5000, "5000"},
+ {10'000, "10000"}, {9'999'999, "999999"}}, true));
+ subgroups.pop_back();
+ }
+
+ subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"});
+ PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
+ new NKikimr::NPQ::TPercentileCounter(
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin",
+ TVector<std::pair<ui64, TString>>{
+ {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"},
+ {20, "20"}, {50, "50"}, {100, "100"}, {500, "500"},
+ {1000, "1000"}, {2500, "2500"}, {5000, "5000"},
+ {10'000, "10000"}, {9'999'999, "999999"}}, true));
+}
+
+
+//
+// Functions
+//
+
+bool ValidateResponse(const TInitializerStep& step, TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
+ auto& response = ev->Get()->Record;
+ if (response.GetStatus() != NMsgBusProxy::MSTATUS_OK) {
+ LOG_ERROR_S(
+ ctx, NKikimrServices::PERSQUEUE,
+ "commands for topic '" << step.TopicName() << " partition " << step.PartitionId()
+ << " are not processed at all, got KV error " << response.GetStatus()
+ );
+ return false;
+ }
+
+ for (ui32 i = 0; i < response.GetStatusResultSize(); ++i) {
+ auto& res = response.GetGetStatusResult(i);
+ if (res.GetStatus() != NKikimrProto::OK) {
+ LOG_ERROR_S(
+ ctx, NKikimrServices::PERSQUEUE,
+ "commands for topic '" << step.TopicName() << "' partition " << step.PartitionId() <<
+ " are not processed at all, got KV error in CmdGetStatus " << res.GetStatus()
+ );
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool DiskIsFull(TEvKeyValue::TEvResponse::TPtr& ev) {
+ auto& response = ev->Get()->Record;
+
+ bool diskIsOk = true;
+ for (ui32 i = 0; i < response.GetStatusResultSize(); ++i) {
+ auto& res = response.GetGetStatusResult(i);
+ TStorageStatusFlags status = res.GetStatusFlags();
+ diskIsOk = diskIsOk && !status.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove);
+ }
+ return !diskIsOk;
+}
+
+} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/partition_init.h b/ydb/core/persqueue/partition_init.h
new file mode 100644
index 00000000000..ddabde99b3a
--- /dev/null
+++ b/ydb/core/persqueue/partition_init.h
@@ -0,0 +1,152 @@
+#pragma once
+
+#include "header.h"
+#include "utils.h"
+
+#include <ydb/core/keyvalue/keyvalue_events.h>
+#include <ydb/library/persqueue/counter_time_keeper/counter_time_keeper.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+
+#include <util/generic/set.h>
+
+
+namespace NKikimr::NPQ {
+
+class TInitializerStep;
+class TPartition;
+
+
+/**
+ * This class execute independent steps of parttition actor initialization.
+ * Each initialization step makes its own decision whether to perform it or not.
+ */
+class TInitializer {
+ friend TInitializerStep;
+
+public:
+ TInitializer(TPartition* partition);
+
+ void Execute(const TActorContext& ctx);
+
+ bool Handle(STFUNC_SIG);
+
+protected:
+ void Next(const TActorContext& ctx);
+ void Done(const TActorContext& ctx);
+
+private:
+ void DoNext(const TActorContext& ctx);
+
+ TPartition* Partition;
+
+ bool InProgress;
+
+ TVector<THolder<TInitializerStep>> Steps;
+ std::vector<THolder<TInitializerStep>>::iterator CurrentStep;
+
+};
+
+/**
+ * Its is independent initialization step.
+ * Step begin a execution when method Execute called and ends it after metheod Done called.
+ */
+class TInitializerStep {
+public:
+ TInitializerStep(TInitializer* initializer, TString name, bool skipNewPartition);
+ virtual ~TInitializerStep() = default;
+
+ virtual void Execute(const TActorContext& ctx) = 0;
+ virtual bool Handle(STFUNC_SIG);
+
+ TPartition* Partition() const;
+ ui32 PartitionId() const;
+ TString TopicName() const;
+
+ const TString Name;
+ const bool SkipNewPartition;
+
+protected:
+ void Done(const TActorContext& ctx);
+ void PoisonPill(const TActorContext& ctx);
+
+private:
+ TInitializer* Initializer;
+};
+
+
+class TBaseKVStep: public TInitializerStep {
+public:
+ TBaseKVStep(TInitializer* initializer, TString name, bool skipNewPartition);
+
+ bool Handle(STFUNC_SIG) override;
+ virtual void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) = 0;
+};
+
+
+//
+// Initialization steps
+//
+
+class TInitConfigStep: public TBaseKVStep {
+public:
+ TInitConfigStep(TInitializer* initializer);
+
+ void Execute(const TActorContext& ctx) override;
+ void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
+};
+
+class TInitInternalFieldsStep: public TInitializerStep {
+public:
+ TInitInternalFieldsStep(TInitializer* initializer);
+
+ void Execute(const TActorContext& ctx) override;
+};
+
+class TInitDiskStatusStep: public TBaseKVStep {
+public:
+ TInitDiskStatusStep(TInitializer* initializer);
+
+ void Execute(const TActorContext& ctx) override;
+ void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
+};
+
+class TInitMetaStep: public TBaseKVStep {
+public:
+ TInitMetaStep(TInitializer* initializer);
+
+ void Execute(const TActorContext& ctx) override;
+ void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
+};
+
+class TInitInfoRangeStep: public TBaseKVStep {
+public:
+ TInitInfoRangeStep(TInitializer* initializer);
+
+ void Execute(const TActorContext& ctx) override;
+ void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
+};
+
+class TInitDataRangeStep: public TBaseKVStep {
+public:
+ TInitDataRangeStep(TInitializer* initializer);
+
+ void Execute(const TActorContext& ctx) override;
+ void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
+
+private:
+ void FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx);
+ void FormHeadAndProceed();
+};
+
+class TInitDataStep: public TBaseKVStep {
+public:
+ TInitDataStep(TInitializer* initializer);
+
+ void Execute(const TActorContext& ctx) override;
+ void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
+};
+
+} // NKikimr::NPQ
diff --git a/ydb/core/persqueue/partition_util.h b/ydb/core/persqueue/partition_util.h
new file mode 100644
index 00000000000..e19e75434da
--- /dev/null
+++ b/ydb/core/persqueue/partition_util.h
@@ -0,0 +1,114 @@
+#pragma once
+
+#include "partition.h"
+
+namespace NKikimr::NPQ {
+
+class TKeyLevel {
+public:
+ friend IOutputStream& operator <<(IOutputStream& out, const TKeyLevel& value);
+
+ TKeyLevel(ui32 border)
+ : Border_(border)
+ , Sum_(0)
+ , RecsCount_(0)
+ , InternalPartsCount_(0) {}
+
+ void Clear() {
+ Keys_.clear();
+ Sum_ = 0;
+ RecsCount_ = 0;
+ InternalPartsCount_ = 0;
+ }
+
+ ui32 KeysCount() const {
+ return Keys_.size();
+ }
+
+ ui32 RecsCount() const {
+ return RecsCount_;
+ }
+
+ ui16 InternalPartsCount() const {
+ return InternalPartsCount_;
+ }
+
+ bool NeedCompaction() const {
+ return Sum_ >= Border_;
+ }
+
+ std::pair<TKey, ui32> Compact() {
+ Y_VERIFY(!Keys_.empty());
+ TKey tmp(Keys_.front().first);
+ tmp.SetCount(RecsCount_);
+ tmp.SetInternalPartsCount(InternalPartsCount_);
+ std::pair<TKey, ui32> res(tmp, Sum_);
+ Clear();
+ return res;
+ }
+
+ std::pair<TKey, ui32> PopFront() {
+ Y_VERIFY(!Keys_.empty());
+ Sum_ -= Keys_.front().second;
+ RecsCount_ -= Keys_.front().first.GetCount();
+ InternalPartsCount_ -= Keys_.front().first.GetInternalPartsCount();
+ auto res = Keys_.front();
+ Keys_.pop_front();
+ return res;
+ }
+
+ std::pair<TKey, ui32> PopBack() {
+ Y_VERIFY(!Keys_.empty());
+ Sum_ -= Keys_.back().second;
+ RecsCount_ -= Keys_.back().first.GetCount();
+ InternalPartsCount_ -= Keys_.back().first.GetInternalPartsCount();
+ auto res = Keys_.back();
+ Keys_.pop_back();
+ return res;
+ }
+
+ ui32 Sum() const {
+ return Sum_;
+ }
+
+ const TKey& GetKey(const ui32 pos) const {
+ Y_VERIFY(pos < Keys_.size());
+ return Keys_[pos].first;
+ }
+
+ const ui32& GetSize(const ui32 pos) const {
+ Y_VERIFY(pos < Keys_.size());
+ return Keys_[pos].second;
+ }
+
+ void PushKeyToFront(const TKey& key, ui32 size) {
+ Sum_ += size;
+ RecsCount_ += key.GetCount();
+ InternalPartsCount_ += key.GetInternalPartsCount();
+ Keys_.push_front(std::make_pair(key, size));
+ }
+
+ void AddKey(const TKey& key, ui32 size) {
+ Sum_ += size;
+ RecsCount_ += key.GetCount();
+ InternalPartsCount_ += key.GetInternalPartsCount();
+ Keys_.push_back(std::make_pair(key, size));
+ }
+
+ ui32 Border() const {
+ return Border_;
+ }
+
+private:
+ const ui32 Border_;
+ std::deque<std::pair<TKey, ui32>> Keys_;
+ ui32 Sum_;
+ ui32 RecsCount_;
+ ui16 InternalPartsCount_;
+};
+
+void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels);
+void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key);
+void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key);
+
+} // namespace NKikimr::NPQ