diff options
author | tesseract <tesseract@yandex-team.com> | 2023-04-10 09:38:17 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-04-10 09:38:17 +0300 |
commit | 0898d2c2865676cf1622e78dd5a84f9e0b9daaf1 (patch) | |
tree | c5d1551e7c3b51dfce5dd46b6c8de7c470d8305b | |
parent | b5da19b8a2b0de7edae408ab328f3ae34acd421d (diff) | |
download | ydb-0898d2c2865676cf1622e78dd5a84f9e0b9daaf1.tar.gz |
Move initialization to separated file. Refactor initialization process
1. Уношу все , что связано с инициализацией, в отдельный файл partition_init.cpp
2. Инициализация разбивается на явные последовательные шаги. Шаги не знаю друг об друге.
3. Каждый шаг сам управляет на какие event-ы реагировать.
-rw-r--r-- | ydb/core/persqueue/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 815 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 42 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_init.cpp | 943 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_init.h | 152 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_util.h | 114 |
9 files changed, 1234 insertions, 836 deletions
diff --git a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt index a2ae4317b1e..ee7cf6c9143 100644 --- a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt @@ -52,6 +52,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq.cpp diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt index 017ec968c3d..c5e43d10ce4 100644 --- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt +++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt @@ -53,6 +53,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq.cpp diff --git a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt index 017ec968c3d..c5e43d10ce4 100644 --- a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt @@ -53,6 +53,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq.cpp diff --git a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt index a2ae4317b1e..ee7cf6c9143 100644 --- a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt @@ -52,6 +52,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ownerinfo.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition_init.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/partition.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/percentile_counter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq.cpp diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 92764223724..bfa2bdbc840 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1,5 +1,6 @@ #include "event_helpers.h" #include "mirrorer.h" +#include "partition_util.h" #include "partition.h" #include "read.h" @@ -30,7 +31,6 @@ static const ui32 MAX_WRITE_CYCLE_SIZE = 16_MB; static const ui32 MAX_USER_ACTS = 1000; static const TDuration WAKE_TIMEOUT = TDuration::Seconds(5); static const ui32 MAX_INLINE_SIZE = 1000; -static const ui32 LEVEL0 = 32; static const TDuration UPDATE_AVAIL_SIZE_INTERVAL = TDuration::MilliSeconds(100); static const TString WRITE_QUOTA_ROOT_PATH = "write-quota"; static const ui32 MAX_USERS = 1000; @@ -79,109 +79,6 @@ struct TMirrorerInfo { TTabletCountersBase Baseline; }; -class TKeyLevel { -public: - friend IOutputStream& operator <<(IOutputStream& out, const TKeyLevel& value); - - TKeyLevel(ui32 border) - : Border_(border) - , Sum_(0) - , RecsCount_(0) - , InternalPartsCount_(0) {} - - void Clear() { - Keys_.clear(); - Sum_ = 0; - RecsCount_ = 0; - InternalPartsCount_ = 0; - } - - ui32 KeysCount() const { - return Keys_.size(); - } - - ui32 RecsCount() const { - return RecsCount_; - } - - ui16 InternalPartsCount() const { - return InternalPartsCount_; - } - - bool NeedCompaction() const { - return Sum_ >= Border_; - } - - std::pair<TKey, ui32> Compact() { - Y_VERIFY(!Keys_.empty()); - TKey tmp(Keys_.front().first); - tmp.SetCount(RecsCount_); - tmp.SetInternalPartsCount(InternalPartsCount_); - std::pair<TKey, ui32> res(tmp, Sum_); - Clear(); - return res; - } - - std::pair<TKey, ui32> PopFront() { - Y_VERIFY(!Keys_.empty()); - Sum_ -= Keys_.front().second; - RecsCount_ -= Keys_.front().first.GetCount(); - InternalPartsCount_ -= Keys_.front().first.GetInternalPartsCount(); - auto res = Keys_.front(); - Keys_.pop_front(); - return res; - } - - std::pair<TKey, ui32> PopBack() { - Y_VERIFY(!Keys_.empty()); - Sum_ -= Keys_.back().second; - RecsCount_ -= Keys_.back().first.GetCount(); - InternalPartsCount_ -= Keys_.back().first.GetInternalPartsCount(); - auto res = Keys_.back(); - Keys_.pop_back(); - return res; - } - - ui32 Sum() const { - return Sum_; - } - - const TKey& GetKey(const ui32 pos) const { - Y_VERIFY(pos < Keys_.size()); - return Keys_[pos].first; - } - - const ui32& GetSize(const ui32 pos) const { - Y_VERIFY(pos < Keys_.size()); - return Keys_[pos].second; - } - - void PushKeyToFront(const TKey& key, ui32 size) { - Sum_ += size; - RecsCount_ += key.GetCount(); - InternalPartsCount_ += key.GetInternalPartsCount(); - Keys_.push_front(std::make_pair(key, size)); - } - - void AddKey(const TKey& key, ui32 size) { - Sum_ += size; - RecsCount_ += key.GetCount(); - InternalPartsCount_ += key.GetInternalPartsCount(); - Keys_.push_back(std::make_pair(key, size)); - } - - ui32 Border() const { - return Border_; - } - -private: - const ui32 Border_; - std::deque<std::pair<TKey, ui32>> Keys_; - ui32 Sum_; - ui32 RecsCount_; - ui16 InternalPartsCount_; -}; - void HtmlOutput(IOutputStream& out, const TString& line, const std::deque<std::pair<TKey, ui32>>& keys) { HTML(out) { TABLE() { @@ -389,43 +286,10 @@ void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels) { } -static void RequestDiskStatus(const TActorContext& ctx, const TActorId& dst, ui32 numChannels) { - THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); - - AddCheckDiskRequest(request.Get(), numChannels); - - ctx.Send(dst, request.Release()); -} - - void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) { RequestRange(ctx, dst, partition, TKeyPrefix::TypeInfo, true, key, key == ""); } -void RequestMetaRead(const TActorContext& ctx, const TActorId& dst, ui32 partition) { - auto addKey = [](NKikimrClient::TKeyValueRequest& request, TKeyPrefix::EType type, ui32 partition) { - auto read = request.AddCmdRead(); - TKeyPrefix key{type, partition}; - read->SetKey(key.Data(), key.Size()); - }; - - THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); - - addKey(request->Record, TKeyPrefix::TypeMeta, partition); - addKey(request->Record, TKeyPrefix::TypeTxMeta, partition); - - ctx.Send(dst, request.Release()); -} - -void RequestData(const TActorContext& ctx, const TActorId& dst, const TVector<TString>& keys) { - THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); - for (auto& key: keys) { - auto read = request->Record.AddCmdRead(); - read->SetKey(key); - } - ctx.Send(dst, request.Release()); -} - void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) { RequestRange(ctx, dst, partition, TKeyPrefix::TypeData, false, key); } @@ -486,7 +350,8 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co const NKikimrPQ::TPQTabletConfig& tabletConfig, const TTabletCountersBase& counters, bool subDomainOutOfSpace, bool newPartition, TVector<TTransaction> distrTxs) - : TabletID(tabletId) + : Initializer(this) + , TabletID(tabletId) , Partition(partition) , TabletConfig(tabletConfig) , Counters(counters) @@ -498,7 +363,6 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co , WriteInflightSize(0) , Tablet(tablet) , BlobCache(blobCache) - , InitState(WaitConfig) , PartitionedBlob(partition, 0, 0, 0, 0, 0, Head, NewHead, true, false, 8_MB) , NewHeadKey{TKey{}, 0, TInstant::Zero(), 0} , BodySize(0) @@ -734,147 +598,6 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo ctx.Send(ev->Sender, new TEvPQ::TEvMonResponse(Partition, res, out.Str())); } -void TPartition::RequestConfig(const TActorContext& ctx) -{ - auto event = MakeHolder<TEvKeyValue::TEvRequest>(); - auto read = event->Record.AddCmdRead(); - read->SetKey(GetKeyConfig()); - ctx.Send(Tablet, event.Release()); -} - -void TPartition::HandleConfig(const NKikimrClient::TResponse& res, const TActorContext& ctx) -{ - auto& response = res.GetReadResult(0); - - switch (response.GetStatus()) { - case NKikimrProto::OK: - Y_VERIFY(Config.ParseFromString(response.GetValue())); - Y_VERIFY(Config.GetVersion() <= TabletConfig.GetVersion()); - if (Config.GetVersion() < TabletConfig.GetVersion()) { - auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(TopicConverter, - TabletConfig); - PushFrontDistrTx(event.Release()); - } - break; - case NKikimrProto::NODATA: - Config = TabletConfig; - break; - case NKikimrProto::ERROR: - LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, - "Partition " << Partition << " can't read config"); - ctx.Send(Tablet, new TEvents::TEvPoisonPill()); - break; - default: - Cerr << "ERROR " << response.GetStatus() << "\n"; - Y_FAIL("bad status"); - }; - - InitState = WaitDiskStatus; - Initialize(ctx); -} - -void TPartition::Bootstrap(const TActorContext& ctx) { - Y_VERIFY(InitState == WaitConfig); - RequestConfig(ctx); - Become(&TThis::StateInit); -} - -void TPartition::Initialize(const TActorContext& ctx) { - CreationTime = ctx.Now(); - WriteCycleStartTime = ctx.Now(); - WriteQuota.ConstructInPlace(Config.GetPartitionConfig().GetBurstSize(), - Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), - ctx.Now()); - WriteTimestamp = ctx.Now(); - LastUsedStorageMeterTimestamp = ctx.Now(); - WriteTimestampEstimate = ManageWriteTimestampEstimate ? ctx.Now() : TInstant::Zero(); - - CloudId = Config.GetYcCloudId(); - DbId = Config.GetYdbDatabaseId(); - DbPath = Config.GetYdbDatabasePath(); - FolderId = Config.GetYcFolderId(); - - CalcTopicWriteQuotaParams(AppData()->PQConfig, - IsLocalDC, - TopicConverter, - TabletID, - ctx, - TopicWriteQuoterPath, - TopicWriteQuotaResourcePath); - - UsersInfoStorage.ConstructInPlace(DCId, - TabletID, - TopicConverter, - Partition, - Counters, - Config, - CloudId, - DbId, - Config.GetYdbDatabasePath(), - IsServerless, - FolderId); - TotalChannelWritesByHead.resize(Config.GetPartitionConfig().GetNumChannels()); - - if (Config.GetPartitionConfig().HasMirrorFrom()) { - ManageWriteTimestampEstimate = !Config.GetPartitionConfig().GetMirrorFrom().GetSyncWriteTime(); - } else { - ManageWriteTimestampEstimate = IsLocalDC; - } - - if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) { - PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(EscapeBadChars(TopicConverter->GetClientsideName()), - Partition, - Config.GetYdbDatabasePath())); - } else { - PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(TopicConverter->GetClientsideName(), - Partition)); - } - - UsersInfoStorage->Init(Tablet, SelfId(), ctx); - - Y_VERIFY(AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() > 0); - ui32 border = LEVEL0; - MaxSizeCheck = 0; - MaxBlobSize = AppData(ctx)->PQConfig.GetMaxBlobSize(); - PartitionedBlob = TPartitionedBlob(Partition, 0, 0, 0, 0, 0, Head, NewHead, true, false, MaxBlobSize); - for (ui32 i = 0; i < TotalLevels; ++i) { - CompactLevelBorder.push_back(border); - MaxSizeCheck += border; - Y_VERIFY(i + 1 < TotalLevels && border < MaxBlobSize || i + 1 == TotalLevels && border == MaxBlobSize); - border *= AppData(ctx)->PQConfig.GetMaxBlobsPerLevel(); - border = Min(border, MaxBlobSize); - } - TotalMaxCount = AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() * TotalLevels; - - std::reverse(CompactLevelBorder.begin(), CompactLevelBorder.end()); - - for (ui32 i = 0; i < TotalLevels; ++i) { - DataKeysHead.push_back(TKeyLevel(CompactLevelBorder[i])); - } - - for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) { - auto &userInfo = UsersInfoStorage->GetOrCreate(readQuota.GetClientId(), ctx); - userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond()); - } - - LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "bootstrapping " << Partition << " " << ctx.SelfID); - - if (NewPartition) { - InitComplete(ctx); - } else { - Y_VERIFY(InitState == WaitDiskStatus); - RequestDiskStatus(ctx, Tablet, Config.GetPartitionConfig().GetNumChannels()); - Become(&TThis::StateInit); - } - - if (AppData(ctx)->Counters) { - if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) { - SetupStreamCounters(ctx); - } else { - SetupTopicCounters(ctx); - } - } -} void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) { Responses.emplace_back( @@ -885,165 +608,6 @@ void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) { ); } -void TPartition::SetupTopicCounters(const TActorContext& ctx) { - auto counters = AppData(ctx)->Counters; - auto labels = NPersQueue::GetLabels(TopicConverter); - const TString suffix = IsLocalDC ? "Original" : "Mirrored"; - - WriteBufferIsFullCounter.SetCounter( - NPersQueue::GetCounters(counters, "writingTime", TopicConverter), - {{"host", DCId}, - {"Partition", ToString<ui32>(Partition)}}, - {"sensor", "BufferFullTime" + suffix, true}); - - auto subGroup = GetServiceCounters(counters, "pqproxy|writeTimeLag"); - InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( - subGroup, labels, {{"sensor", "TimeLags" + suffix}}, "Interval", - TVector<std::pair<ui64, TString>>{ - {100, "100ms"}, {200, "200ms"}, {500, "500ms"}, {1000, "1000ms"}, - {2000, "2000ms"}, {5000, "5000ms"}, {10'000, "10000ms"}, {30'000, "30000ms"}, - {60'000, "60000ms"}, {180'000,"180000ms"}, {9'999'999, "999999ms"}}, true)); - - - subGroup = GetServiceCounters(counters, "pqproxy|writeInfo"); - MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( - subGroup, labels, {{"sensor", "MessageSize" + suffix}}, "Size", - TVector<std::pair<ui64, TString>>{ - {1_KB, "1kb"}, {5_KB, "5kb"}, {10_KB, "10kb"}, - {20_KB, "20kb"}, {50_KB, "50kb"}, {100_KB, "100kb"}, {200_KB, "200kb"}, - {512_KB, "512kb"},{1024_KB, "1024kb"}, {2048_KB,"2048kb"}, {5120_KB, "5120kb"}, - {10240_KB, "10240kb"}, {65536_KB, "65536kb"}, {999'999'999, "99999999kb"}}, true)); - - subGroup = GetServiceCounters(counters, "pqproxy|writeSession"); - BytesWrittenTotal = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"BytesWritten" + suffix}, true); - BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"UncompressedBytesWritten" + suffix}, true); - BytesWrittenComp = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"CompactedBytesWritten" + suffix}, true); - MsgsWrittenTotal = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"MessagesWritten" + suffix}, true); - - TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}}; - ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs(); - subGroup = GetServiceCounters(counters, "pqproxy|SLI"); - WriteLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, aggr, "Write", border, - {100, 200, 500, 1000, 1500, 2000, - 5000, 10'000, 30'000, 99'999'999}); - SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "sensor", false); - WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "sensor", false); - if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) { - TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( - new NKikimr::NPQ::TPercentileCounter( - GetServiceCounters(counters, "pqproxy|topicWriteQuotaWait"), labels, - {{"sensor", "TopicWriteQuotaWait" + suffix}}, "Interval", - TVector<std::pair<ui64, TString>>{ - {0, "0ms"}, {1, "1ms"}, {5, "5ms"}, {10, "10ms"}, - {20, "20ms"}, {50, "50ms"}, {100, "100ms"}, {500, "500ms"}, - {1000, "1000ms"}, {2500, "2500ms"}, {5000, "5000ms"}, - {10'000, "10000ms"}, {9'999'999, "999999ms"}}, true)); - } - - PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( - new NKikimr::NPQ::TPercentileCounter(GetServiceCounters(counters, "pqproxy|partitionWriteQuotaWait"), - labels, {{"sensor", "PartitionWriteQuotaWait" + suffix}}, "Interval", - TVector<std::pair<ui64, TString>>{ - {0, "0ms"}, {1, "1ms"}, {5, "5ms"}, {10, "10ms"}, - {20, "20ms"}, {50, "50ms"}, {100, "100ms"}, {500, "500ms"}, - {1000, "1000ms"}, {2500, "2500ms"}, {5000, "5000ms"}, - {10'000, "10000ms"}, {9'999'999, "999999ms"}}, true)); -} - -void TPartition::SetupStreamCounters(const TActorContext& ctx) { - const auto topicName = TopicConverter->GetModernName(); - auto counters = AppData(ctx)->Counters; - auto subgroups = NPersQueue::GetSubgroupsForTopic(TopicConverter, CloudId, DbId, DbPath, FolderId); -/* - WriteBufferIsFullCounter.SetCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), - { - {"database", DbPath}, - {"cloud_id", CloudId}, - {"folder_id", FolderId}, - {"database_id", DbId}, - {"topic", TopicConverter->GetFederationPath()}, - {"host", DCId}, - {"partition", ToString<ui32>(Partition)}}, - {"name", "api.grpc.topic.stream_write.buffer_brimmed_milliseconds", true}); -*/ - - subgroups.push_back({"name", "topic.write.lag_milliseconds"}); - - InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), {}, - subgroups, "bin", - TVector<std::pair<ui64, TString>>{ - {100, "100"}, {200, "200"}, {500, "500"}, - {1000, "1000"}, {2000, "2000"}, {5000, "5000"}, - {10'000, "10000"}, {30'000, "30000"}, {60'000, "60000"}, - {180'000,"180000"}, {9'999'999, "999999"}}, true)); - - subgroups.back().second = "topic.write.message_size_bytes"; - MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), {}, - subgroups, "bin", - TVector<std::pair<ui64, TString>>{ - {1024, "1024"}, {5120, "5120"}, {10'240, "10240"}, - {20'480, "20480"}, {51'200, "51200"}, {102'400, "102400"}, - {204'800, "204800"}, {524'288, "524288"},{1'048'576, "1048576"}, - {2'097'152,"2097152"}, {5'242'880, "5242880"}, {10'485'760, "10485760"}, - {67'108'864, "67108864"}, {999'999'999, "99999999"}}, true)); - - subgroups.pop_back(); - BytesWrittenGrpc = NKikimr::NPQ::TMultiCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, - {"api.grpc.topic.stream_write.bytes"} , true, "name"); - BytesWrittenTotal = NKikimr::NPQ::TMultiCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, - {"topic.write.bytes"} , true, "name"); - - MsgsWrittenGrpc = NKikimr::NPQ::TMultiCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, - {"api.grpc.topic.stream_write.messages"}, true, "name"); - MsgsWrittenTotal = NKikimr::NPQ::TMultiCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, - {"topic.write.messages"}, true, "name"); - - - BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter( - - NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, - {"topic.write.uncompressed_bytes"}, true, "name"); - - TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}}; - ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs(); - auto subGroup = GetServiceCounters(counters, "pqproxy|SLI"); - WriteLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, aggr, "Write", border, - {100, 200, 500, 1000, 1500, 2000, - 5000, 10'000, 30'000, 99'999'999}); - SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false); - WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false); - if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) { - subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"}); - TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( - new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), {}, - subgroups, "bin", - TVector<std::pair<ui64, TString>>{ - {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"}, - {20, "20"}, {50, "50"}, {100, "100"}, {500, "500"}, - {1000, "1000"}, {2500, "2500"}, {5000, "5000"}, - {10'000, "10000"}, {9'999'999, "999999"}}, true)); - subgroups.pop_back(); - } - - subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"}); - PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( - new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin", - TVector<std::pair<ui64, TString>>{ - {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"}, - {20, "20"}, {50, "50"}, {100, "100"}, {500, "500"}, - {1000, "1000"}, {2500, "2500"}, {5000, "5000"}, - {10'000, "10000"}, {9'999'999, "999999"}}, true)); -} - void TPartition::ProcessHasDataRequests(const TActorContext& ctx) { if (!InitDone) return; @@ -1451,7 +1015,6 @@ void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx) { ProcessReserveRequests(ctx); } - void TPartition::FailBadClient(const TActorContext& ctx) { for (auto it = Owners.begin(); it != Owners.end();) { it = DropOwner(it, ctx); @@ -1487,373 +1050,6 @@ bool CheckDiskStatus(const TStorageStatusFlags status) { return !status.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove); } -void TPartition::HandleGetDiskStatus(const NKikimrClient::TResponse& response, const TActorContext& ctx) { - bool diskIsOk = true; - for (ui32 i = 0; i < response.GetStatusResultSize(); ++i) { - auto& res = response.GetGetStatusResult(i); - - if (res.GetStatus() != NKikimrProto::OK) { - LOG_ERROR_S( - ctx, NKikimrServices::PERSQUEUE, - "commands for topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << - " are not processed at all, got KV error in CmdGetStatus " << res.GetStatus() - ); - ctx.Send(Tablet, new TEvents::TEvPoisonPill()); - return; - } - diskIsOk = diskIsOk && CheckDiskStatus(res.GetStatusFlags()); - } - DiskIsFull = !diskIsOk; - if (DiskIsFull) { - LogAndCollectError(NKikimrServices::PERSQUEUE, "disk is full", ctx); - } - - InitState = WaitMetaRead; - RequestMetaRead(ctx, Tablet, Partition); -} - -void TPartition::HandleMetaRead(const NKikimrClient::TResponse& response, const TActorContext& ctx) -{ - auto handleReadResult = [&](const NKikimrClient::TKeyValueResponse::TReadResult& response, - auto&& action) { - switch (response.GetStatus()) { - case NKikimrProto::OK: - action(response); - break; - case NKikimrProto::NODATA: - break; - case NKikimrProto::ERROR: - LOG_ERROR_S( - ctx, NKikimrServices::PERSQUEUE, - "read topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " error" - ); - ctx.Send(Tablet, new TEvents::TEvPoisonPill()); - break; - default: - Cerr << "ERROR " << response.GetStatus() << "\n"; - Y_FAIL("bad status"); - }; - }; - - auto loadMeta = [&](const NKikimrClient::TKeyValueResponse::TReadResult& response) { - NKikimrPQ::TPartitionMeta meta; - bool res = meta.ParseFromString(response.GetValue()); - Y_VERIFY(res); - /* Bring back later, when switch to 21-2 will be unable - StartOffset = meta.GetStartOffset(); - EndOffset = meta.GetEndOffset(); - if (StartOffset == EndOffset) { - NewHead.Offset = Head.Offset = EndOffset; - } - */ - if (CurrentStateFunc() == &TThis::StateInit) { - SubDomainOutOfSpace = meta.GetSubDomainOutOfSpace(); - } - }; - handleReadResult(response.GetReadResult(0), loadMeta); - - auto loadTxMeta = [this](const NKikimrClient::TKeyValueResponse::TReadResult& response) { - NKikimrPQ::TPartitionTxMeta meta; - bool res = meta.ParseFromString(response.GetValue()); - Y_VERIFY(res); - - if (meta.HasPlanStep()) { - PlanStep = meta.GetPlanStep(); - } - if (meta.HasTxId()) { - TxId = meta.GetTxId(); - } - }; - handleReadResult(response.GetReadResult(1), loadTxMeta); - - InitState = WaitInfoRange; - RequestInfoRange(ctx, Tablet, Partition, ""); -} - -void TPartition::HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) { - //megaqc check here all results - Y_VERIFY(range.HasStatus()); - const TString *key = nullptr; - switch (range.GetStatus()) { - case NKikimrProto::OK: - case NKikimrProto::OVERRUN: - for (ui32 i = 0; i < range.PairSize(); ++i) { - const auto& pair = range.GetPair(i); - Y_VERIFY(pair.HasStatus()); - if (pair.GetStatus() != NKikimrProto::OK) { - LOG_ERROR_S( - ctx, NKikimrServices::PERSQUEUE, - "read range error topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition - << " got status " << pair.GetStatus() << " for key " << (pair.HasKey() ? pair.GetKey() : "unknown") - ); - - ctx.Send(Tablet, new TEvents::TEvPoisonPill()); - return; - } - Y_VERIFY(pair.HasKey()); - Y_VERIFY(pair.HasValue()); - - key = &pair.GetKey(); - if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkSourceId) { - SourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), ctx.Now()); - } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkProtoSourceId) { - SourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), ctx.Now()); - } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkUser) { - UsersInfoStorage->Parse(*key, pair.GetValue(), ctx); - } else if ((*key)[TKeyPrefix::MarkPosition()] == TKeyPrefix::MarkUserDeprecated) { - UsersInfoStorage->ParseDeprecated(*key, pair.GetValue(), ctx); - } - } - //make next step - if (range.GetStatus() == NKikimrProto::OVERRUN) { - Y_VERIFY(key); - RequestInfoRange(ctx, Tablet, Partition, *key); - } else { - InitState = WaitDataRange; - RequestDataRange(ctx, Tablet, Partition, ""); - } - break; - case NKikimrProto::NODATA: - InitState = WaitDataRange; - RequestDataRange(ctx, Tablet, Partition, ""); - break; - case NKikimrProto::ERROR: - LOG_ERROR_S( - ctx, NKikimrServices::PERSQUEUE, - "read topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " error" - ); - ctx.Send(Tablet, new TEvents::TEvPoisonPill()); - break; - default: - Cerr << "ERROR " << range.GetStatus() << "\n"; - Y_FAIL("bad status"); - }; -} - -void TPartition::FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) { - for (ui32 i = 0; i < range.PairSize(); ++i) { - auto pair = range.GetPair(i); - Y_VERIFY(pair.GetStatus() == NKikimrProto::OK); //this is readrange without keys, only OK could be here - TKey k(pair.GetKey()); - if (DataKeysBody.empty()) { //no data - this is first pair of first range - Head.Offset = EndOffset = StartOffset = k.GetOffset(); - if (k.GetPartNo() > 0) ++StartOffset; - Head.PartNo = 0; - } else { - Y_VERIFY(EndOffset <= k.GetOffset(), "%s", pair.GetKey().c_str()); - if (EndOffset < k.GetOffset()) { - GapOffsets.push_back(std::make_pair(EndOffset, k.GetOffset())); - GapSize += k.GetOffset() - EndOffset; - } - } - Y_VERIFY(k.GetCount() + k.GetInternalPartsCount() > 0); - Y_VERIFY(k.GetOffset() >= EndOffset); - EndOffset = k.GetOffset() + k.GetCount(); - //at this point EndOffset > StartOffset - if (!k.IsHead()) //head.Size will be filled after read or head blobs - BodySize += pair.GetValueSize(); - - LOG_DEBUG_S( - ctx, NKikimrServices::PERSQUEUE, - "Got data topic " << TopicConverter->GetClientsideName() << " partition " << k.GetPartition() - << " offset " << k.GetOffset() << " count " << k.GetCount() << " size " << pair.GetValueSize() - << " so " << StartOffset << " eo " << EndOffset << " " << pair.GetKey() - ); - DataKeysBody.push_back({k, pair.GetValueSize(), TInstant::Seconds(pair.GetCreationUnixTime()), DataKeysBody.empty() ? 0 : DataKeysBody.back().CumulativeSize + DataKeysBody.back().Size}); - } - - Y_VERIFY(EndOffset >= StartOffset); -} - -void TPartition::FormHeadAndProceed(const TActorContext& ctx) { - Head.Offset = EndOffset; - Head.PartNo = 0; - TVector<TString> keys; - while (DataKeysBody.size() > 0 && DataKeysBody.back().Key.IsHead()) { - Y_VERIFY(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() == Head.Offset); //no gaps in head allowed - HeadKeys.push_front(DataKeysBody.back()); - Head.Offset = DataKeysBody.back().Key.GetOffset(); - Head.PartNo = DataKeysBody.back().Key.GetPartNo(); - DataKeysBody.pop_back(); - } - for (const auto& p : DataKeysBody) { - Y_VERIFY(!p.Key.IsHead()); - } - - Y_VERIFY(HeadKeys.empty() || Head.Offset == HeadKeys.front().Key.GetOffset() && Head.PartNo == HeadKeys.front().Key.GetPartNo()); - Y_VERIFY(Head.Offset < EndOffset || Head.Offset == EndOffset && HeadKeys.empty()); - Y_VERIFY(Head.Offset >= StartOffset || Head.Offset == StartOffset - 1 && Head.PartNo > 0); - - //form head request - for (auto& p : HeadKeys) { - keys.push_back({p.Key.Data(), p.Key.Size()}); - } - Y_VERIFY(keys.size() < TotalMaxCount); - if (keys.empty()) { - InitComplete(ctx); - return; - } - InitState = WaitDataRead; - RequestData(ctx, Tablet, keys); -} - -void TPartition::HandleDataRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) { - Y_VERIFY(range.HasStatus()); - switch(range.GetStatus()) { - case NKikimrProto::OK: - case NKikimrProto::OVERRUN: - - FillBlobsMetaData(range, ctx); - - if (range.GetStatus() == NKikimrProto::OVERRUN) { //request rest of range - Y_VERIFY(range.PairSize()); - RequestDataRange(ctx, Tablet, Partition, range.GetPair(range.PairSize() - 1).GetKey()); - return; - } - FormHeadAndProceed(ctx); - break; - case NKikimrProto::NODATA: - InitComplete(ctx); - break; - default: - Cerr << "ERROR " << range.GetStatus() << "\n"; - Y_FAIL("bad status"); - }; -} - -void TPartition::HandleDataRead(const NKikimrClient::TResponse& response, const TActorContext& ctx) { - Y_VERIFY(InitState == WaitDataRead); - ui32 currentLevel = 0; - Y_VERIFY(HeadKeys.size() == response.ReadResultSize()); - for (ui32 i = 0; i < response.ReadResultSize(); ++i) { - auto& read = response.GetReadResult(i); - Y_VERIFY(read.HasStatus()); - switch(read.GetStatus()) { - case NKikimrProto::OK: { - const TKey& key = HeadKeys[i].Key; - ui32 size = HeadKeys[i].Size; - Y_VERIFY(key.IsHead()); - ui64 offset = key.GetOffset(); - while (currentLevel + 1 < TotalLevels && size < CompactLevelBorder[currentLevel + 1]) - ++currentLevel; - Y_VERIFY(size < CompactLevelBorder[currentLevel]); - - DataKeysHead[currentLevel].AddKey(key, size); - Y_VERIFY(DataKeysHead[currentLevel].KeysCount() < AppData(ctx)->PQConfig.GetMaxBlobsPerLevel()); - Y_VERIFY(!DataKeysHead[currentLevel].NeedCompaction()); - - LOG_DEBUG_S( - ctx, NKikimrServices::PERSQUEUE, - "read res partition topic '" << TopicConverter->GetClientsideName() - << "' parititon " << key.GetPartition() << " offset " << offset << " endOffset " << EndOffset - << " key " << key.GetOffset() << "," << key.GetCount() << " valuesize " << read.GetValue().size() - << " expected " << size - ); - - Y_VERIFY(offset + 1 >= StartOffset); - Y_VERIFY(offset < EndOffset); - Y_VERIFY(size == read.GetValue().size()); - - for (TBlobIterator it(key, read.GetValue()); it.IsValid(); it.Next()) { - Head.Batches.push_back(it.GetBatch()); - } - Head.PackedSize += size; - - break; - } - case NKikimrProto::OVERRUN: - Y_FAIL("implement overrun in readresult!!"); - return; - case NKikimrProto::NODATA: - Y_FAIL("NODATA can't be here"); - return; - case NKikimrProto::ERROR: - LOG_ERROR_S( - ctx, NKikimrServices::PERSQUEUE, - "tablet " << TabletID << " HandleOnInit topic '" << TopicConverter->GetClientsideName() - << "' partition " << Partition - << " ReadResult " << i << " status NKikimrProto::ERROR result message: \"" << read.GetMessage() - << " \" errorReason: \"" << response.GetErrorReason() << "\"" - ); - ctx.Send(Tablet, new TEvents::TEvPoisonPill()); - return; - default: - Cerr << "ERROR " << read.GetStatus() << " message: \"" << read.GetMessage() << "\"\n"; - Y_FAIL("bad status"); - - }; - } - - Y_VERIFY(Head.PackedSize > 0); - Y_VERIFY(Head.PackedSize < MaxBlobSize); - Y_VERIFY(Head.GetNextOffset() == EndOffset); - Y_VERIFY(std::accumulate(DataKeysHead.begin(), DataKeysHead.end(), 0u, - [](ui32 sum, const TKeyLevel& level){return sum + level.Sum();}) == Head.PackedSize); - - InitComplete(ctx); -} - - -void TPartition::HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) { - - auto& response = ev->Get()->Record; - if (response.GetStatus() != NMsgBusProxy::MSTATUS_OK) { - LOG_ERROR_S( - ctx, NKikimrServices::PERSQUEUE, - "commands for topic '" << TopicConverter->GetClientsideName() << " partition " << Partition - << " are not processed at all, got KV error " << response.GetStatus() - ); - ctx.Send(Tablet, new TEvents::TEvPoisonPill()); - return; - } - bool diskIsOk = true; - for (ui32 i = 0; i < response.GetStatusResultSize(); ++i) { - auto& res = response.GetGetStatusResult(i); - if (res.GetStatus() != NKikimrProto::OK) { - LOG_ERROR_S( - ctx, NKikimrServices::PERSQUEUE, - "commands for topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << - " are not processed at all, got KV error in CmdGetStatus " << res.GetStatus() - ); - ctx.Send(Tablet, new TEvents::TEvPoisonPill()); - return; - } - diskIsOk = diskIsOk && CheckDiskStatus(res.GetStatusFlags()); - } - if (response.GetStatusResultSize()) - DiskIsFull = !diskIsOk; - - switch(InitState) { - case WaitConfig: - Y_VERIFY(response.ReadResultSize() == 1); - HandleConfig(response, ctx); - break; - case WaitDiskStatus: - Y_VERIFY(response.GetStatusResultSize()); - HandleGetDiskStatus(response, ctx); - break; - case WaitMetaRead: - Y_VERIFY(response.ReadResultSize() == 2); - HandleMetaRead(response, ctx); - break; - case WaitInfoRange: - Y_VERIFY(response.ReadRangeResultSize() == 1); - HandleInfoRangeRead(response.GetReadRangeResult(0), ctx); - break; - case WaitDataRange: - Y_VERIFY(response.ReadRangeResultSize() == 1); - HandleDataRangeRead(response.GetReadRangeResult(0), ctx); - break; - case WaitDataRead: - Y_VERIFY(response.ReadResultSize()); - HandleDataRead(response, ctx); - break; - default: - Y_FAIL("Unknown state"); - - }; -} - void TPartition::InitComplete(const TActorContext& ctx) { if (StartOffset == EndOffset && EndOffset == 0) { for (auto& [user, info] : UsersInfoStorage->GetAll()) { @@ -1937,7 +1133,6 @@ void TPartition::UpdateUserInfoEndOffset(const TInstant& now) { userInfo.second.EndOffset = (i64)EndOffset; userInfo.second.UpdateReadingTimeAndState(now); } - } void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx) { @@ -2112,8 +1307,6 @@ void TPartition::Handle(TEvPQ::TEvReserveBytes::TPtr& ev, const TActorContext& c } - - void TPartition::Handle(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx) { NKikimrPQ::TOffsetsResponse::TPartResult result; result.SetPartition(Partition); @@ -2843,8 +2036,6 @@ TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(const ui64 startOffse return blobs; } - - TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset) { Y_UNUSED(readTimestampMs); ui32& count = *rcount; diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 8af1bb681ce..319b3d4355c 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -3,6 +3,7 @@ #include "blob.h" #include "header.h" #include "key.h" +#include "partition_init.h" #include "partition_types.h" #include "quota_tracker.h" #include "sourceid.h" @@ -73,6 +74,16 @@ struct TTransaction { }; class TPartition : public TActorBootstrapped<TPartition> { + friend TInitializer; + friend TInitializerStep; + friend TInitConfigStep; + friend TInitInternalFieldsStep; + friend TInitDiskStatusStep; + friend TInitMetaStep; + friend TInitInfoRangeStep; + friend TInitDataRangeStep; + friend TInitDataStep; + private: static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10; @@ -98,10 +109,8 @@ private: void CreateMirrorerActor(); void DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const TActorContext& ctx); void FailBadClient(const TActorContext& ctx); - void FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx); void FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx); void FilterDeadlinedWrites(const TActorContext& ctx); - void FormHeadAndProceed(const TActorContext& ctx); void Handle(NReadSpeedLimiterEvents::TEvCounters::TPtr& ev, const TActorContext& ctx); void Handle(NReadSpeedLimiterEvents::TEvResponse::TPtr& ev, const TActorContext& ctx); @@ -134,18 +143,12 @@ private: void Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx); void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext& ctx); - void HandleDataRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx); - void HandleDataRead(const NKikimrClient::TResponse& range, const TActorContext& ctx); - void HandleGetDiskStatus(const NKikimrClient::TResponse& res, const TActorContext& ctx); - void HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx); - void HandleMetaRead(const NKikimrClient::TResponse& response, const TActorContext& ctx); void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx); void HandleOnIdle(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx); void HandleOnIdle(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx); void HandleOnIdle(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx); void HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr& ev, const TActorContext& ctx); void HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx); - void HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx); void HandleOnInit(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx); void HandleOnInit(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx); void HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx); @@ -302,8 +305,6 @@ private: void InitPendingUserInfoForImportantClients(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx); - void RequestConfig(const TActorContext& ctx); - void HandleConfig(const NKikimrClient::TResponse& res, const TActorContext& ctx); void Initialize(const TActorContext& ctx); template <typename T> @@ -375,6 +376,8 @@ private: return ss.Str(); } + TInitializer Initializer; + STFUNC(StateInit) { NPersQueue::TCounterTimeKeeper keeper(TabletCounters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]); @@ -384,7 +387,6 @@ private: TRACE_EVENT(NKikimrServices::PERSQUEUE); switch (ev->GetTypeRewrite()) { CFunc(TEvents::TSystem::Wakeup, HandleWakeup); - HFuncTraced(TEvKeyValue::TEvResponse, HandleOnInit); //result of reads HFuncTraced(TEvents::TEvPoisonPill, Handle); HFuncTraced(TEvPQ::TEvMonRequest, HandleMonitoring); HFuncTraced(TEvPQ::TEvChangePartitionConfig, Handle); @@ -401,7 +403,9 @@ private: HFuncTraced(TEvPQ::TEvTxRollback, HandleOnInit); HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle); default: - LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev)); + if (!Initializer.Handle(ev, ctx)) { + LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev)); + } break; }; } @@ -512,17 +516,8 @@ private: break; }; } -private: - enum EInitState { - WaitConfig, - WaitDiskStatus, - WaitInfoRange, - WaitDataRange, - WaitDataRead, - WaitMetaRead, - WaitTxInfo - }; +private: ui64 TabletID; ui32 Partition; NKikimrPQ::TPQTabletConfig Config; @@ -550,8 +545,6 @@ private: TActorId Tablet; TActorId BlobCache; - EInitState InitState; - std::deque<TMessage> Requests; std::deque<TMessage> Responses; @@ -697,4 +690,5 @@ private: TDeque<std::unique_ptr<IEventBase>> PendingEvents; }; + } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp new file mode 100644 index 00000000000..09a9341039c --- /dev/null +++ b/ydb/core/persqueue/partition_init.cpp @@ -0,0 +1,943 @@ +#include "partition.h" +#include "partition_util.h" + +namespace NKikimr::NPQ { + +static const ui32 LEVEL0 = 32; + +bool DiskIsFull(TEvKeyValue::TEvResponse::TPtr& ev); +bool ValidateResponse(const TInitializerStep& step, TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx); + +// +// TInitializer +// + +TInitializer::TInitializer(TPartition* partition) + : Partition(partition) + , InProgress(false) { + + Steps.push_back(MakeHolder<TInitConfigStep>(this)); + Steps.push_back(MakeHolder<TInitInternalFieldsStep>(this)); + Steps.push_back(MakeHolder<TInitDiskStatusStep>(this)); + Steps.push_back(MakeHolder<TInitMetaStep>(this)); + Steps.push_back(MakeHolder<TInitInfoRangeStep>(this)); + Steps.push_back(MakeHolder<TInitDataRangeStep>(this)); + Steps.push_back(MakeHolder<TInitDataStep>(this)); + + CurrentStep = Steps.begin(); +} + +void TInitializer::Execute(const TActorContext& ctx) { + Y_VERIFY(!InProgress, "Initialization already in progress"); + InProgress = true; + DoNext(ctx); +} + +bool TInitializer::Handle(STFUNC_SIG) { + Y_VERIFY(InProgress, "Initialization is not started"); + return CurrentStep->Get()->Handle(ev,ctx); +} + +void TInitializer::Next(const TActorContext& ctx) { + ++CurrentStep; + DoNext(ctx); +} + +void TInitializer::Done(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Initializing topic '" << Partition->TopicConverter->GetClientsideName() + << "' partition " << Partition->Partition + << ". Completed."); + InProgress = false; + Partition->InitComplete(ctx); +} + +void TInitializer::DoNext(const TActorContext& ctx) { + if (CurrentStep == Steps.end()) { + Done(ctx); + return; + } + + if (Partition->NewPartition) { + while(CurrentStep->Get()->SkipNewPartition) { + if (++CurrentStep == Steps.end()) { + Done(ctx); + return; + } + } + } + + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Initializing topic '" << Partition->TopicConverter->GetClientsideName() + << "' partition " << Partition->Partition + << ". Step " << CurrentStep->Get()->Name); + CurrentStep->Get()->Execute(ctx); +} + + +// +// TInitializerStep +// + +TInitializerStep::TInitializerStep(TInitializer* initializer, TString name, bool skipNewPartition) + : Name(name) + , SkipNewPartition(skipNewPartition) + , Initializer(initializer) { +} + +void TInitializerStep::Done(const TActorContext& ctx) { + Initializer->Next(ctx); +} + +bool TInitializerStep::Handle(STFUNC_SIG) { + Y_UNUSED(ev); + Y_UNUSED(ctx); + + return false; +} + +TPartition* TInitializerStep::Partition() const { + return Initializer->Partition; +} + +ui32 TInitializerStep::PartitionId() const { + return Initializer->Partition->Partition; +} + +void TInitializerStep::PoisonPill(const TActorContext& ctx) { + ctx.Send(Partition()->Tablet, new TEvents::TEvPoisonPill()); +} + +TString TInitializerStep::TopicName() const { + return Partition()->TopicConverter->GetClientsideName(); +} + + +// +// TBaseKVStep +// + +TBaseKVStep::TBaseKVStep(TInitializer* initializer, TString name, bool skipNewPartition) + : TInitializerStep(initializer, name, skipNewPartition) { +} + +bool TBaseKVStep::Handle(STFUNC_SIG) { + switch(ev->GetTypeRewrite()) + { + HFunc(TEvKeyValue::TEvResponse, Handle); + default: + return false; + } + return true; +} + + +// +// TInitConfigStep +// + +TInitConfigStep::TInitConfigStep(TInitializer* initializer) + : TBaseKVStep(initializer, "TInitConfigStep", false) { +} + +void TInitConfigStep::Execute(const TActorContext& ctx) { + auto event = MakeHolder<TEvKeyValue::TEvRequest>(); + auto read = event->Record.AddCmdRead(); + read->SetKey(Partition()->GetKeyConfig()); + + ctx.Send(Partition()->Tablet, event.Release()); +} + +void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) { + if (!ValidateResponse(*this, ev, ctx)) { + PoisonPill(ctx); + return; + } + + auto& res = ev->Get()->Record; + Y_VERIFY(res.ReadResultSize() == 1); + + auto& response = res.GetReadResult(0); + + switch (response.GetStatus()) { + case NKikimrProto::OK: + Y_VERIFY(Partition()->Config.ParseFromString(response.GetValue())); + Y_VERIFY(Partition()->Config.GetVersion() <= Partition()->TabletConfig.GetVersion()); + + if (Partition()->Config.GetVersion() < Partition()->TabletConfig.GetVersion()) { + auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(Partition()->TopicConverter, + Partition()->TabletConfig); + Partition()->PushFrontDistrTx(event.Release()); + } + break; + + case NKikimrProto::NODATA: + Partition()->Config = Partition()->TabletConfig; + break; + + case NKikimrProto::ERROR: + LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, + "Partition " << Partition()->Partition << " can't read config"); + PoisonPill(ctx); + return; + + default: + Cerr << "ERROR " << response.GetStatus() << "\n"; + Y_FAIL("bad status"); + }; + + Done(ctx); +} + + +// +// TInitInternalFieldsStep +// +TInitInternalFieldsStep::TInitInternalFieldsStep(TInitializer* initializer) + : TInitializerStep(initializer, "TInitializerStep", false) { +} + +void TInitInternalFieldsStep::Execute(const TActorContext &ctx) { + Partition()->Initialize(ctx); + + Done(ctx); +} + + +// +// InitDiskStatusStep +// + +TInitDiskStatusStep::TInitDiskStatusStep(TInitializer* initializer) + : TBaseKVStep(initializer, "TInitDiskStatusStep", true) { +} + +void TInitDiskStatusStep::Execute(const TActorContext& ctx) { + THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); + + AddCheckDiskRequest(request.Get(), Partition()->Config.GetPartitionConfig().GetNumChannels()); + + ctx.Send(Partition()->Tablet, request.Release()); +} + +void TInitDiskStatusStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) { + if (!ValidateResponse(*this, ev, ctx)) { + PoisonPill(ctx); + return; + } + + auto& response = ev->Get()->Record; + Y_VERIFY(response.GetStatusResultSize()); + + Partition()->DiskIsFull = DiskIsFull(ev); + if (!Partition()->DiskIsFull) { + Partition()->LogAndCollectError(NKikimrServices::PERSQUEUE, "disk is full", ctx); + } + + Done(ctx); +} + + +// +// TInitMetaStep +// + +TInitMetaStep::TInitMetaStep(TInitializer* initializer) + : TBaseKVStep(initializer, "TInitMetaStep", true) { +} + +void TInitMetaStep::Execute(const TActorContext& ctx) { + auto addKey = [](NKikimrClient::TKeyValueRequest& request, TKeyPrefix::EType type, ui32 partition) { + auto read = request.AddCmdRead(); + TKeyPrefix key{type, partition}; + read->SetKey(key.Data(), key.Size()); + }; + + THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); + + addKey(request->Record, TKeyPrefix::TypeMeta, PartitionId()); + addKey(request->Record, TKeyPrefix::TypeTxMeta, PartitionId()); + + ctx.Send(Partition()->Tablet, request.Release()); +} + +void TInitMetaStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorContext &ctx) { + if (!ValidateResponse(*this, ev, ctx)) { + PoisonPill(ctx); + return; + } + + auto& response = ev->Get()->Record; + Y_VERIFY(response.ReadResultSize() == 2); + + auto handleReadResult = [&](const NKikimrClient::TKeyValueResponse::TReadResult& response, auto&& action) { + switch (response.GetStatus()) { + case NKikimrProto::OK: + action(response); + break; + case NKikimrProto::NODATA: + break; + case NKikimrProto::ERROR: + LOG_ERROR_S( + ctx, NKikimrServices::PERSQUEUE, + "read topic '" << TopicName() << "' partition " << PartitionId() << " error" + ); + PoisonPill(ctx); + break; + default: + Cerr << "ERROR " << response.GetStatus() << "\n"; + Y_FAIL("bad status"); + }; + }; + + auto loadMeta = [&](const NKikimrClient::TKeyValueResponse::TReadResult& response) { + NKikimrPQ::TPartitionMeta meta; + bool res = meta.ParseFromString(response.GetValue()); + Y_VERIFY(res); + /* Bring back later, when switch to 21-2 will be unable + StartOffset = meta.GetStartOffset(); + EndOffset = meta.GetEndOffset(); + if (StartOffset == EndOffset) { + NewHead.Offset = Head.Offset = EndOffset; + } + */ + Partition()->SubDomainOutOfSpace = meta.GetSubDomainOutOfSpace(); + }; + handleReadResult(response.GetReadResult(0), loadMeta); + + auto loadTxMeta = [this](const NKikimrClient::TKeyValueResponse::TReadResult& response) { + NKikimrPQ::TPartitionTxMeta meta; + bool res = meta.ParseFromString(response.GetValue()); + Y_VERIFY(res); + + if (meta.HasPlanStep()) { + Partition()->PlanStep = meta.GetPlanStep(); + } + if (meta.HasTxId()) { + Partition()->TxId = meta.GetTxId(); + } + }; + handleReadResult(response.GetReadResult(1), loadTxMeta); + + Done(ctx); +} + + +// +// TInitInfoRangeStep +// + +TInitInfoRangeStep::TInitInfoRangeStep(TInitializer* initializer) + : TBaseKVStep(initializer, "TInitInfoRangeStep", true) { +} + +void TInitInfoRangeStep::Execute(const TActorContext &ctx) { + RequestInfoRange(ctx, Partition()->Tablet, PartitionId(), ""); +} + +void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorContext &ctx) { + if (!ValidateResponse(*this, ev, ctx)) { + PoisonPill(ctx); + return; + } + + auto& response = ev->Get()->Record; + Y_VERIFY(response.ReadRangeResultSize() == 1); + + auto& range = response.GetReadRangeResult(0); + auto now = ctx.Now(); + + Y_VERIFY(response.ReadRangeResultSize() == 1); + //megaqc check here all results + Y_VERIFY(range.HasStatus()); + const TString *key = nullptr; + switch (range.GetStatus()) { + case NKikimrProto::OK: + case NKikimrProto::OVERRUN: { + auto& sourceIdStorage = Partition()->SourceIdStorage; + auto& usersInfoStorage = Partition()->UsersInfoStorage; + + for (ui32 i = 0; i < range.PairSize(); ++i) { + const auto& pair = range.GetPair(i); + Y_VERIFY(pair.HasStatus()); + if (pair.GetStatus() != NKikimrProto::OK) { + LOG_ERROR_S( + ctx, NKikimrServices::PERSQUEUE, + "read range error topic '" << TopicName() << "' partition " << PartitionId() + << " got status " << pair.GetStatus() << " for key " << (pair.HasKey() ? pair.GetKey() : "unknown") + ); + + PoisonPill(ctx); + return; + } + + Y_VERIFY(pair.HasKey()); + Y_VERIFY(pair.HasValue()); + + key = &pair.GetKey(); + const auto type = (*key)[TKeyPrefix::MarkPosition()]; + if (type == TKeyPrefix::MarkSourceId) { + sourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), now); + } else if (type == TKeyPrefix::MarkProtoSourceId) { + sourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), now); + } else if (type == TKeyPrefix::MarkUser) { + usersInfoStorage->Parse(*key, pair.GetValue(), ctx); + } else if (type == TKeyPrefix::MarkUserDeprecated) { + usersInfoStorage->ParseDeprecated(*key, pair.GetValue(), ctx); + } + } + //make next step + if (range.GetStatus() == NKikimrProto::OVERRUN) { + Y_VERIFY(key); + RequestInfoRange(ctx, Partition()->Tablet, PartitionId(), *key); + } else { + Done(ctx); + } + break; + } + case NKikimrProto::NODATA: + Done(ctx); + break; + case NKikimrProto::ERROR: + LOG_ERROR_S( + ctx, NKikimrServices::PERSQUEUE, + "read topic '" << TopicName() << "' partition " << PartitionId() << " error" + ); + PoisonPill(ctx); + break; + default: + Cerr << "ERROR " << range.GetStatus() << "\n"; + Y_FAIL("bad status"); + }; +} + + +// +// TInitDataRangeStep +// + +TInitDataRangeStep::TInitDataRangeStep(TInitializer* initializer) + : TBaseKVStep(initializer, "TInitDataRangeStep", true) { +} + +void TInitDataRangeStep::Execute(const TActorContext &ctx) { + RequestDataRange(ctx, Partition()->Tablet, PartitionId(), ""); +} + +void TInitDataRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorContext &ctx) { + if (!ValidateResponse(*this, ev, ctx)) { + PoisonPill(ctx); + return; + } + + auto& response = ev->Get()->Record; + Y_VERIFY(response.ReadRangeResultSize() == 1); + + auto& range = response.GetReadRangeResult(0); + + Y_VERIFY(range.HasStatus()); + switch(range.GetStatus()) { + case NKikimrProto::OK: + case NKikimrProto::OVERRUN: + + FillBlobsMetaData(range, ctx); + + if (range.GetStatus() == NKikimrProto::OVERRUN) { //request rest of range + Y_VERIFY(range.PairSize()); + RequestDataRange(ctx, Partition()->Tablet, PartitionId(), range.GetPair(range.PairSize() - 1).GetKey()); + return; + } + FormHeadAndProceed(); + + Done(ctx); + break; + case NKikimrProto::NODATA: + Done(ctx); + break; + default: + Cerr << "ERROR " << range.GetStatus() << "\n"; + Y_FAIL("bad status"); + }; +} + +void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) { + auto& endOffset = Partition()->EndOffset; + auto& startOffset = Partition()->StartOffset; + auto& head = Partition()->Head; + auto& dataKeysBody = Partition()->DataKeysBody; + auto& gapOffsets = Partition()->GapOffsets; + auto& gapSize = Partition()->GapSize; + auto& bodySize = Partition()->BodySize; + + for (ui32 i = 0; i < range.PairSize(); ++i) { + auto pair = range.GetPair(i); + Y_VERIFY(pair.GetStatus() == NKikimrProto::OK); //this is readrange without keys, only OK could be here + TKey k(pair.GetKey()); + if (dataKeysBody.empty()) { //no data - this is first pair of first range + head.Offset = endOffset = startOffset = k.GetOffset(); + if (k.GetPartNo() > 0) ++startOffset; + head.PartNo = 0; + } else { + Y_VERIFY(endOffset <= k.GetOffset(), "%s", pair.GetKey().c_str()); + if (endOffset < k.GetOffset()) { + gapOffsets.push_back(std::make_pair(endOffset, k.GetOffset())); + gapSize += k.GetOffset() - endOffset; + } + } + Y_VERIFY(k.GetCount() + k.GetInternalPartsCount() > 0); + Y_VERIFY(k.GetOffset() >= endOffset); + endOffset = k.GetOffset() + k.GetCount(); + //at this point EndOffset > StartOffset + if (!k.IsHead()) //head.Size will be filled after read or head blobs + bodySize += pair.GetValueSize(); + + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "Got data topic " << TopicName() << " partition " << k.GetPartition() + << " offset " << k.GetOffset() << " count " << k.GetCount() << " size " << pair.GetValueSize() + << " so " << startOffset << " eo " << endOffset << " " << pair.GetKey() + ); + dataKeysBody.push_back({k, pair.GetValueSize(), + TInstant::Seconds(pair.GetCreationUnixTime()), + dataKeysBody.empty() ? 0 : dataKeysBody.back().CumulativeSize + dataKeysBody.back().Size}); + } + + Y_VERIFY(endOffset >= startOffset); +} + + +void TInitDataRangeStep::FormHeadAndProceed() { + auto& endOffset = Partition()->EndOffset; + auto& startOffset = Partition()->StartOffset; + auto& head = Partition()->Head; + auto& headKeys = Partition()->HeadKeys; + auto& dataKeysBody = Partition()->DataKeysBody; + + head.Offset = endOffset; + head.PartNo = 0; + + while (dataKeysBody.size() > 0 && dataKeysBody.back().Key.IsHead()) { + Y_VERIFY(dataKeysBody.back().Key.GetOffset() + dataKeysBody.back().Key.GetCount() == head.Offset); //no gaps in head allowed + headKeys.push_front(dataKeysBody.back()); + head.Offset = dataKeysBody.back().Key.GetOffset(); + head.PartNo = dataKeysBody.back().Key.GetPartNo(); + dataKeysBody.pop_back(); + } + for (const auto& p : dataKeysBody) { + Y_VERIFY(!p.Key.IsHead()); + } + + Y_VERIFY(headKeys.empty() || head.Offset == headKeys.front().Key.GetOffset() && head.PartNo == headKeys.front().Key.GetPartNo()); + Y_VERIFY(head.Offset < endOffset || head.Offset == endOffset && headKeys.empty()); + Y_VERIFY(head.Offset >= startOffset || head.Offset == startOffset - 1 && head.PartNo > 0); +} + + +// +// TInitDataStep +// + +TInitDataStep::TInitDataStep(TInitializer* initializer) + : TBaseKVStep(initializer, "TInitDataStep", true) { +} + +void TInitDataStep::Execute(const TActorContext &ctx) { + TVector<TString> keys; + //form head request + for (auto& p : Partition()->HeadKeys) { + keys.push_back({p.Key.Data(), p.Key.Size()}); + } + Y_VERIFY(keys.size() < Partition()->TotalMaxCount); + if (keys.empty()) { + Done(ctx); + return; + } + + THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); + for (auto& key: keys) { + auto read = request->Record.AddCmdRead(); + read->SetKey(key); + } + ctx.Send(Partition()->Tablet, request.Release()); +} + +void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorContext &ctx) { + if (!ValidateResponse(*this, ev, ctx)) { + PoisonPill(ctx); + return; + } + + auto& response = ev->Get()->Record; + Y_VERIFY(response.ReadResultSize()); + + auto& head = Partition()->Head; + auto& headKeys = Partition()->HeadKeys; + auto& dataKeysHead = Partition()->DataKeysHead; + auto& compactLevelBorder = Partition()->CompactLevelBorder; + auto totalLevels = Partition()->TotalLevels; + + ui32 currentLevel = 0; + Y_VERIFY(headKeys.size() == response.ReadResultSize()); + for (ui32 i = 0; i < response.ReadResultSize(); ++i) { + auto& read = response.GetReadResult(i); + Y_VERIFY(read.HasStatus()); + switch(read.GetStatus()) { + case NKikimrProto::OK: { + const TKey& key = headKeys[i].Key; + Y_VERIFY(key.IsHead()); + + ui32 size = headKeys[i].Size; + ui64 offset = key.GetOffset(); + while (currentLevel + 1 < totalLevels && size < compactLevelBorder[currentLevel + 1]) + ++currentLevel; + Y_VERIFY(size < compactLevelBorder[currentLevel]); + + dataKeysHead[currentLevel].AddKey(key, size); + Y_VERIFY(dataKeysHead[currentLevel].KeysCount() < AppData(ctx)->PQConfig.GetMaxBlobsPerLevel()); + Y_VERIFY(!dataKeysHead[currentLevel].NeedCompaction()); + + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "read res partition topic '" << TopicName() + << "' parititon " << key.GetPartition() << " offset " << offset << " endOffset " << Partition()->EndOffset + << " key " << key.GetOffset() << "," << key.GetCount() << " valuesize " << read.GetValue().size() + << " expected " << size + ); + + Y_VERIFY(offset + 1 >= Partition()->StartOffset); + Y_VERIFY(offset < Partition()->EndOffset); + Y_VERIFY(size == read.GetValue().size()); + + for (TBlobIterator it(key, read.GetValue()); it.IsValid(); it.Next()) { + head.Batches.push_back(it.GetBatch()); + } + head.PackedSize += size; + + break; + } + case NKikimrProto::OVERRUN: + Y_FAIL("implement overrun in readresult!!"); + return; + case NKikimrProto::NODATA: + Y_FAIL("NODATA can't be here"); + return; + case NKikimrProto::ERROR: + LOG_ERROR_S( + ctx, NKikimrServices::PERSQUEUE, + "tablet " << Partition()->TabletID << " HandleOnInit topic '" << TopicName() + << "' partition " << PartitionId() + << " ReadResult " << i << " status NKikimrProto::ERROR result message: \"" << read.GetMessage() + << " \" errorReason: \"" << response.GetErrorReason() << "\"" + ); + PoisonPill(ctx); + return; + default: + Cerr << "ERROR " << read.GetStatus() << " message: \"" << read.GetMessage() << "\"\n"; + Y_FAIL("bad status"); + + }; + } + + Done(ctx); +} + + +// +// TPartition +// + +void TPartition::Bootstrap(const TActorContext& ctx) { + Become(&TThis::StateInit); + Initializer.Execute(ctx); +} + +void TPartition::Initialize(const TActorContext& ctx) { + CreationTime = ctx.Now(); + WriteCycleStartTime = ctx.Now(); + WriteQuota.ConstructInPlace(Config.GetPartitionConfig().GetBurstSize(), + Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), + ctx.Now()); + WriteTimestamp = ctx.Now(); + LastUsedStorageMeterTimestamp = ctx.Now(); + WriteTimestampEstimate = ManageWriteTimestampEstimate ? ctx.Now() : TInstant::Zero(); + + CloudId = Config.GetYcCloudId(); + DbId = Config.GetYdbDatabaseId(); + DbPath = Config.GetYdbDatabasePath(); + FolderId = Config.GetYcFolderId(); + + CalcTopicWriteQuotaParams(AppData()->PQConfig, + IsLocalDC, + TopicConverter, + TabletID, + ctx, + TopicWriteQuoterPath, + TopicWriteQuotaResourcePath); + + UsersInfoStorage.ConstructInPlace(DCId, + TabletID, + TopicConverter, + Partition, + Counters, + Config, + CloudId, + DbId, + Config.GetYdbDatabasePath(), + IsServerless, + FolderId); + TotalChannelWritesByHead.resize(Config.GetPartitionConfig().GetNumChannels()); + + if (Config.GetPartitionConfig().HasMirrorFrom()) { + ManageWriteTimestampEstimate = !Config.GetPartitionConfig().GetMirrorFrom().GetSyncWriteTime(); + } else { + ManageWriteTimestampEstimate = IsLocalDC; + } + + if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) { + PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(EscapeBadChars(TopicConverter->GetClientsideName()), + Partition, + Config.GetYdbDatabasePath())); + } else { + PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(TopicConverter->GetClientsideName(), + Partition)); + } + + UsersInfoStorage->Init(Tablet, SelfId(), ctx); + + Y_VERIFY(AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() > 0); + ui32 border = LEVEL0; + MaxSizeCheck = 0; + MaxBlobSize = AppData(ctx)->PQConfig.GetMaxBlobSize(); + PartitionedBlob = TPartitionedBlob(Partition, 0, 0, 0, 0, 0, Head, NewHead, true, false, MaxBlobSize); + for (ui32 i = 0; i < TotalLevels; ++i) { + CompactLevelBorder.push_back(border); + MaxSizeCheck += border; + Y_VERIFY(i + 1 < TotalLevels && border < MaxBlobSize || i + 1 == TotalLevels && border == MaxBlobSize); + border *= AppData(ctx)->PQConfig.GetMaxBlobsPerLevel(); + border = Min(border, MaxBlobSize); + } + TotalMaxCount = AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() * TotalLevels; + + std::reverse(CompactLevelBorder.begin(), CompactLevelBorder.end()); + + for (ui32 i = 0; i < TotalLevels; ++i) { + DataKeysHead.push_back(TKeyLevel(CompactLevelBorder[i])); + } + + for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) { + auto &userInfo = UsersInfoStorage->GetOrCreate(readQuota.GetClientId(), ctx); + userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond()); + } + + LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "bootstrapping " << Partition << " " << ctx.SelfID); + + if (AppData(ctx)->Counters) { + if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) { + SetupStreamCounters(ctx); + } else { + SetupTopicCounters(ctx); + } + } +} + +void TPartition::SetupTopicCounters(const TActorContext& ctx) { + auto counters = AppData(ctx)->Counters; + auto labels = NPersQueue::GetLabels(TopicConverter); + const TString suffix = IsLocalDC ? "Original" : "Mirrored"; + + WriteBufferIsFullCounter.SetCounter( + NPersQueue::GetCounters(counters, "writingTime", TopicConverter), + {{"host", DCId}, + {"Partition", ToString<ui32>(Partition)}}, + {"sensor", "BufferFullTime" + suffix, true}); + + auto subGroup = GetServiceCounters(counters, "pqproxy|writeTimeLag"); + InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( + subGroup, labels, {{"sensor", "TimeLags" + suffix}}, "Interval", + TVector<std::pair<ui64, TString>>{ + {100, "100ms"}, {200, "200ms"}, {500, "500ms"}, {1000, "1000ms"}, + {2000, "2000ms"}, {5000, "5000ms"}, {10'000, "10000ms"}, {30'000, "30000ms"}, + {60'000, "60000ms"}, {180'000,"180000ms"}, {9'999'999, "999999ms"}}, true)); + + + subGroup = GetServiceCounters(counters, "pqproxy|writeInfo"); + MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( + subGroup, labels, {{"sensor", "MessageSize" + suffix}}, "Size", + TVector<std::pair<ui64, TString>>{ + {1_KB, "1kb"}, {5_KB, "5kb"}, {10_KB, "10kb"}, + {20_KB, "20kb"}, {50_KB, "50kb"}, {100_KB, "100kb"}, {200_KB, "200kb"}, + {512_KB, "512kb"},{1024_KB, "1024kb"}, {2048_KB,"2048kb"}, {5120_KB, "5120kb"}, + {10240_KB, "10240kb"}, {65536_KB, "65536kb"}, {999'999'999, "99999999kb"}}, true)); + + subGroup = GetServiceCounters(counters, "pqproxy|writeSession"); + BytesWrittenTotal = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"BytesWritten" + suffix}, true); + BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"UncompressedBytesWritten" + suffix}, true); + BytesWrittenComp = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"CompactedBytesWritten" + suffix}, true); + MsgsWrittenTotal = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"MessagesWritten" + suffix}, true); + + TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}}; + ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs(); + subGroup = GetServiceCounters(counters, "pqproxy|SLI"); + WriteLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, aggr, "Write", border, + {100, 200, 500, 1000, 1500, 2000, + 5000, 10'000, 30'000, 99'999'999}); + SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "sensor", false); + WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "sensor", false); + if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) { + TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( + new NKikimr::NPQ::TPercentileCounter( + GetServiceCounters(counters, "pqproxy|topicWriteQuotaWait"), labels, + {{"sensor", "TopicWriteQuotaWait" + suffix}}, "Interval", + TVector<std::pair<ui64, TString>>{ + {0, "0ms"}, {1, "1ms"}, {5, "5ms"}, {10, "10ms"}, + {20, "20ms"}, {50, "50ms"}, {100, "100ms"}, {500, "500ms"}, + {1000, "1000ms"}, {2500, "2500ms"}, {5000, "5000ms"}, + {10'000, "10000ms"}, {9'999'999, "999999ms"}}, true)); + } + + PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( + new NKikimr::NPQ::TPercentileCounter(GetServiceCounters(counters, "pqproxy|partitionWriteQuotaWait"), + labels, {{"sensor", "PartitionWriteQuotaWait" + suffix}}, "Interval", + TVector<std::pair<ui64, TString>>{ + {0, "0ms"}, {1, "1ms"}, {5, "5ms"}, {10, "10ms"}, + {20, "20ms"}, {50, "50ms"}, {100, "100ms"}, {500, "500ms"}, + {1000, "1000ms"}, {2500, "2500ms"}, {5000, "5000ms"}, + {10'000, "10000ms"}, {9'999'999, "999999ms"}}, true)); +} + +void TPartition::SetupStreamCounters(const TActorContext& ctx) { + const auto topicName = TopicConverter->GetModernName(); + auto counters = AppData(ctx)->Counters; + auto subgroups = NPersQueue::GetSubgroupsForTopic(TopicConverter, CloudId, DbId, DbPath, FolderId); +/* + WriteBufferIsFullCounter.SetCounter( + NPersQueue::GetCountersForTopic(counters, IsServerless), + { + {"database", DbPath}, + {"cloud_id", CloudId}, + {"folder_id", FolderId}, + {"database_id", DbId}, + {"topic", TopicConverter->GetFederationPath()}, + {"host", DCId}, + {"partition", ToString<ui32>(Partition)}}, + {"name", "api.grpc.topic.stream_write.buffer_brimmed_milliseconds", true}); +*/ + + subgroups.push_back({"name", "topic.write.lag_milliseconds"}); + + InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, + subgroups, "bin", + TVector<std::pair<ui64, TString>>{ + {100, "100"}, {200, "200"}, {500, "500"}, + {1000, "1000"}, {2000, "2000"}, {5000, "5000"}, + {10'000, "10000"}, {30'000, "30000"}, {60'000, "60000"}, + {180'000,"180000"}, {9'999'999, "999999"}}, true)); + + subgroups.back().second = "topic.write.message_size_bytes"; + MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, + subgroups, "bin", + TVector<std::pair<ui64, TString>>{ + {1024, "1024"}, {5120, "5120"}, {10'240, "10240"}, + {20'480, "20480"}, {51'200, "51200"}, {102'400, "102400"}, + {204'800, "204800"}, {524'288, "524288"},{1'048'576, "1048576"}, + {2'097'152,"2097152"}, {5'242'880, "5242880"}, {10'485'760, "10485760"}, + {67'108'864, "67108864"}, {999'999'999, "99999999"}}, true)); + + subgroups.pop_back(); + BytesWrittenGrpc = NKikimr::NPQ::TMultiCounter( + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, + {"api.grpc.topic.stream_write.bytes"} , true, "name"); + BytesWrittenTotal = NKikimr::NPQ::TMultiCounter( + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, + {"topic.write.bytes"} , true, "name"); + + MsgsWrittenGrpc = NKikimr::NPQ::TMultiCounter( + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, + {"api.grpc.topic.stream_write.messages"}, true, "name"); + MsgsWrittenTotal = NKikimr::NPQ::TMultiCounter( + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, + {"topic.write.messages"}, true, "name"); + + + BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter( + + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, + {"topic.write.uncompressed_bytes"}, true, "name"); + + TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}}; + ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs(); + auto subGroup = GetServiceCounters(counters, "pqproxy|SLI"); + WriteLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, aggr, "Write", border, + {100, 200, 500, 1000, 1500, 2000, + 5000, 10'000, 30'000, 99'999'999}); + SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false); + WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false); + if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) { + subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"}); + TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( + new NKikimr::NPQ::TPercentileCounter( + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, + subgroups, "bin", + TVector<std::pair<ui64, TString>>{ + {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"}, + {20, "20"}, {50, "50"}, {100, "100"}, {500, "500"}, + {1000, "1000"}, {2500, "2500"}, {5000, "5000"}, + {10'000, "10000"}, {9'999'999, "999999"}}, true)); + subgroups.pop_back(); + } + + subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"}); + PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( + new NKikimr::NPQ::TPercentileCounter( + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin", + TVector<std::pair<ui64, TString>>{ + {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"}, + {20, "20"}, {50, "50"}, {100, "100"}, {500, "500"}, + {1000, "1000"}, {2500, "2500"}, {5000, "5000"}, + {10'000, "10000"}, {9'999'999, "999999"}}, true)); +} + + +// +// Functions +// + +bool ValidateResponse(const TInitializerStep& step, TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) { + auto& response = ev->Get()->Record; + if (response.GetStatus() != NMsgBusProxy::MSTATUS_OK) { + LOG_ERROR_S( + ctx, NKikimrServices::PERSQUEUE, + "commands for topic '" << step.TopicName() << " partition " << step.PartitionId() + << " are not processed at all, got KV error " << response.GetStatus() + ); + return false; + } + + for (ui32 i = 0; i < response.GetStatusResultSize(); ++i) { + auto& res = response.GetGetStatusResult(i); + if (res.GetStatus() != NKikimrProto::OK) { + LOG_ERROR_S( + ctx, NKikimrServices::PERSQUEUE, + "commands for topic '" << step.TopicName() << "' partition " << step.PartitionId() << + " are not processed at all, got KV error in CmdGetStatus " << res.GetStatus() + ); + return false; + } + } + + return true; +} + +bool DiskIsFull(TEvKeyValue::TEvResponse::TPtr& ev) { + auto& response = ev->Get()->Record; + + bool diskIsOk = true; + for (ui32 i = 0; i < response.GetStatusResultSize(); ++i) { + auto& res = response.GetGetStatusResult(i); + TStorageStatusFlags status = res.GetStatusFlags(); + diskIsOk = diskIsOk && !status.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove); + } + return !diskIsOk; +} + +} // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_init.h b/ydb/core/persqueue/partition_init.h new file mode 100644 index 00000000000..ddabde99b3a --- /dev/null +++ b/ydb/core/persqueue/partition_init.h @@ -0,0 +1,152 @@ +#pragma once + +#include "header.h" +#include "utils.h" + +#include <ydb/core/keyvalue/keyvalue_events.h> +#include <ydb/library/persqueue/counter_time_keeper/counter_time_keeper.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> + +#include <util/generic/set.h> + + +namespace NKikimr::NPQ { + +class TInitializerStep; +class TPartition; + + +/** + * This class execute independent steps of parttition actor initialization. + * Each initialization step makes its own decision whether to perform it or not. + */ +class TInitializer { + friend TInitializerStep; + +public: + TInitializer(TPartition* partition); + + void Execute(const TActorContext& ctx); + + bool Handle(STFUNC_SIG); + +protected: + void Next(const TActorContext& ctx); + void Done(const TActorContext& ctx); + +private: + void DoNext(const TActorContext& ctx); + + TPartition* Partition; + + bool InProgress; + + TVector<THolder<TInitializerStep>> Steps; + std::vector<THolder<TInitializerStep>>::iterator CurrentStep; + +}; + +/** + * Its is independent initialization step. + * Step begin a execution when method Execute called and ends it after metheod Done called. + */ +class TInitializerStep { +public: + TInitializerStep(TInitializer* initializer, TString name, bool skipNewPartition); + virtual ~TInitializerStep() = default; + + virtual void Execute(const TActorContext& ctx) = 0; + virtual bool Handle(STFUNC_SIG); + + TPartition* Partition() const; + ui32 PartitionId() const; + TString TopicName() const; + + const TString Name; + const bool SkipNewPartition; + +protected: + void Done(const TActorContext& ctx); + void PoisonPill(const TActorContext& ctx); + +private: + TInitializer* Initializer; +}; + + +class TBaseKVStep: public TInitializerStep { +public: + TBaseKVStep(TInitializer* initializer, TString name, bool skipNewPartition); + + bool Handle(STFUNC_SIG) override; + virtual void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) = 0; +}; + + +// +// Initialization steps +// + +class TInitConfigStep: public TBaseKVStep { +public: + TInitConfigStep(TInitializer* initializer); + + void Execute(const TActorContext& ctx) override; + void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override; +}; + +class TInitInternalFieldsStep: public TInitializerStep { +public: + TInitInternalFieldsStep(TInitializer* initializer); + + void Execute(const TActorContext& ctx) override; +}; + +class TInitDiskStatusStep: public TBaseKVStep { +public: + TInitDiskStatusStep(TInitializer* initializer); + + void Execute(const TActorContext& ctx) override; + void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override; +}; + +class TInitMetaStep: public TBaseKVStep { +public: + TInitMetaStep(TInitializer* initializer); + + void Execute(const TActorContext& ctx) override; + void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override; +}; + +class TInitInfoRangeStep: public TBaseKVStep { +public: + TInitInfoRangeStep(TInitializer* initializer); + + void Execute(const TActorContext& ctx) override; + void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override; +}; + +class TInitDataRangeStep: public TBaseKVStep { +public: + TInitDataRangeStep(TInitializer* initializer); + + void Execute(const TActorContext& ctx) override; + void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override; + +private: + void FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx); + void FormHeadAndProceed(); +}; + +class TInitDataStep: public TBaseKVStep { +public: + TInitDataStep(TInitializer* initializer); + + void Execute(const TActorContext& ctx) override; + void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override; +}; + +} // NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_util.h b/ydb/core/persqueue/partition_util.h new file mode 100644 index 00000000000..e19e75434da --- /dev/null +++ b/ydb/core/persqueue/partition_util.h @@ -0,0 +1,114 @@ +#pragma once + +#include "partition.h" + +namespace NKikimr::NPQ { + +class TKeyLevel { +public: + friend IOutputStream& operator <<(IOutputStream& out, const TKeyLevel& value); + + TKeyLevel(ui32 border) + : Border_(border) + , Sum_(0) + , RecsCount_(0) + , InternalPartsCount_(0) {} + + void Clear() { + Keys_.clear(); + Sum_ = 0; + RecsCount_ = 0; + InternalPartsCount_ = 0; + } + + ui32 KeysCount() const { + return Keys_.size(); + } + + ui32 RecsCount() const { + return RecsCount_; + } + + ui16 InternalPartsCount() const { + return InternalPartsCount_; + } + + bool NeedCompaction() const { + return Sum_ >= Border_; + } + + std::pair<TKey, ui32> Compact() { + Y_VERIFY(!Keys_.empty()); + TKey tmp(Keys_.front().first); + tmp.SetCount(RecsCount_); + tmp.SetInternalPartsCount(InternalPartsCount_); + std::pair<TKey, ui32> res(tmp, Sum_); + Clear(); + return res; + } + + std::pair<TKey, ui32> PopFront() { + Y_VERIFY(!Keys_.empty()); + Sum_ -= Keys_.front().second; + RecsCount_ -= Keys_.front().first.GetCount(); + InternalPartsCount_ -= Keys_.front().first.GetInternalPartsCount(); + auto res = Keys_.front(); + Keys_.pop_front(); + return res; + } + + std::pair<TKey, ui32> PopBack() { + Y_VERIFY(!Keys_.empty()); + Sum_ -= Keys_.back().second; + RecsCount_ -= Keys_.back().first.GetCount(); + InternalPartsCount_ -= Keys_.back().first.GetInternalPartsCount(); + auto res = Keys_.back(); + Keys_.pop_back(); + return res; + } + + ui32 Sum() const { + return Sum_; + } + + const TKey& GetKey(const ui32 pos) const { + Y_VERIFY(pos < Keys_.size()); + return Keys_[pos].first; + } + + const ui32& GetSize(const ui32 pos) const { + Y_VERIFY(pos < Keys_.size()); + return Keys_[pos].second; + } + + void PushKeyToFront(const TKey& key, ui32 size) { + Sum_ += size; + RecsCount_ += key.GetCount(); + InternalPartsCount_ += key.GetInternalPartsCount(); + Keys_.push_front(std::make_pair(key, size)); + } + + void AddKey(const TKey& key, ui32 size) { + Sum_ += size; + RecsCount_ += key.GetCount(); + InternalPartsCount_ += key.GetInternalPartsCount(); + Keys_.push_back(std::make_pair(key, size)); + } + + ui32 Border() const { + return Border_; + } + +private: + const ui32 Border_; + std::deque<std::pair<TKey, ui32>> Keys_; + ui32 Sum_; + ui32 RecsCount_; + ui16 InternalPartsCount_; +}; + +void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels); +void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key); +void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key); + +} // namespace NKikimr::NPQ |