diff options
author | mokhotskii <mokhotskii@ydb.tech> | 2022-07-28 18:33:00 +0300 |
---|---|---|
committer | mokhotskii <mokhotskii@ydb.tech> | 2022-07-28 18:33:00 +0300 |
commit | b9a10e543a5624c857f7c598b2605e161e01d688 (patch) | |
tree | bf40bf6efe67f9e2a5f0b5517ed639efc04cb9bf | |
parent | b4e0b9fbb7463d8633199df9ae0fe44f8712c770 (diff) | |
download | ydb-b9a10e543a5624c857f7c598b2605e161e01d688.tar.gz |
Clean up pq partition.h
Clean up pq partition.h/cpp:
- move all types from partition.h to partition_types.h
- move and sort TPartition methods within a class definition
- rename TPartition::Counters -> TabletCounters
- rename TPartition::PartitionLabeledCounters -> PartitionCounters
- move every { to the previous line
- move helper test functions to pq_ut_impl.cpp
- clean up pq_ut.h
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 579 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 418 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_types.h | 130 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_ut.cpp | 12 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_ut.h | 1111 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_ut_impl.cpp | 908 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut_slow/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut_slow/CMakeLists.linux.txt | 1 |
10 files changed, 1605 insertions, 1557 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 197c154968..a4151cb566 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1,9 +1,7 @@ -#include "partition.h" #include "event_helpers.h" -#include "read.h" -#include "sourceid.h" -#include "ownerinfo.h" #include "mirrorer.h" +#include "partition.h" +#include "read.h" #include <ydb/core/base/appdata.h> #include <ydb/core/base/blobstorage.h> @@ -25,23 +23,15 @@ Y_VERIFY(!blob.Data.empty(), "Empty data. SourceId: %s, SeqNo: %" PRIu64, blob.SourceId.data(), blob.SeqNo); \ Y_VERIFY(blob.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, blob.SeqNo); -namespace NKikimr { -namespace NPQ { - -static const ui32 BATCH_UNPACK_SIZE_BORDER = 500_KB; //500kb - -static const ui32 MAX_WRITE_CYCLE_SIZE = 16_MB; //16MB +namespace NKikimr::NPQ { +static const ui32 BATCH_UNPACK_SIZE_BORDER = 500_KB; +static const ui32 MAX_WRITE_CYCLE_SIZE = 16_MB; static const ui32 MAX_USER_ACTS = 1000; - static const TDuration WAKE_TIMEOUT = TDuration::Seconds(5); - static const ui32 MAX_INLINE_SIZE = 1000; - static const ui32 LEVEL0 = 32; - static const TDuration UPDATE_AVAIL_SIZE_INTERVAL = TDuration::MilliSeconds(100); - static const TString WRITE_QUOTA_ROOT_PATH = "write-quota"; struct TPartition::THasDataReq { @@ -67,6 +57,15 @@ struct TPartition::THasDataDeadline { } }; +struct TMirrorerInfo { + TMirrorerInfo(const TActorId& actor, const TTabletCountersBase& baseline) + : Actor(actor) { + Baseline.Populate(baseline); + } + + TActorId Actor; + TTabletCountersBase Baseline; +}; class TKeyLevel { public: @@ -76,8 +75,7 @@ public: : Border_(border) , Sum_(0) , RecsCount_(0) - , InternalPartsCount_(0) - {} + , InternalPartsCount_(0) {} void Clear() { Keys_.clear(); @@ -132,7 +130,6 @@ public: return res; } - ui32 Sum() const { return Sum_; } @@ -146,6 +143,7 @@ public: Y_VERIFY(pos < Keys_.size()); return Keys_[pos].second; } + void PushKeyToFront(const TKey& key, ui32 size) { Sum_ += size; RecsCount_ += key.GetCount(); @@ -172,9 +170,7 @@ private: ui16 InternalPartsCount_; }; - -void HtmlOutput(IOutputStream& out, const TString& line, const std::deque<std::pair<TKey, ui32>>& keys) -{ +void HtmlOutput(IOutputStream& out, const TString& line, const std::deque<std::pair<TKey, ui32>>& keys) { HTML(out) { TABLE() { TABLEHEAD() { @@ -206,7 +202,6 @@ void HtmlOutput(IOutputStream& out, const TString& line, const std::deque<std::p } } - IOutputStream& operator <<(IOutputStream& out, const TKeyLevel& value) { TStringStream str; str << "count=" << value.Keys_.size() << " sum=" << value.Sum_ << " border=" << value.Border_ << " recs= " << value.RecsCount_ << ":"; @@ -228,27 +223,14 @@ ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp } } -struct TMirrorerInfo { - TMirrorerInfo(const TActorId& actor, const TTabletCountersBase& baseline) - : Actor(actor) - { - Baseline.Populate(baseline); - } - - TActorId Actor; - TTabletCountersBase Baseline; -}; - -void TPartition::ReplyError(const TActorContext& ctx, const ui64 dst, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error) -{ +void TPartition::ReplyError(const TActorContext& ctx, const ui64 dst, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error) { ReplyPersQueueError( - dst == 0 ? ctx.SelfID : Tablet, ctx, TabletID, TopicConverter->GetClientsideName(), Partition, Counters, - NKikimrServices::PERSQUEUE, dst, errorCode, error, true + dst == 0 ? ctx.SelfID : Tablet, ctx, TabletID, TopicConverter->GetClientsideName(), Partition, + TabletCounters, NKikimrServices::PERSQUEUE, dst, errorCode, error, true ); } -void TPartition::ReplyOk(const TActorContext& ctx, const ui64 dst) -{ +void TPartition::ReplyOk(const TActorContext& ctx, const ui64 dst) { THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst); NKikimrClient::TResponse& resp = response->Response; resp.SetStatus(NMsgBusProxy::MSTATUS_OK); @@ -256,8 +238,7 @@ void TPartition::ReplyOk(const TActorContext& ctx, const ui64 dst) ctx.Send(Tablet, response.Release()); } -void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) -{ +void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) { THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst); NKikimrClient::TResponse& resp = response->Response; resp.SetStatus(NMsgBusProxy::MSTATUS_OK); @@ -269,8 +250,7 @@ void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TS void TPartition::ReplyWrite( const TActorContext& ctx, const ui64 dst, const TString& sourceId, const ui64 seqNo, const ui16 partNo, const ui16 totalParts, const ui64 offset, const TInstant writeTimestamp, bool already, const ui64 maxSeqNo, - const ui64 partitionQuotedTime, const TDuration topicQuotedTime, const ui64 queueTime, const ui64 writeTime) -{ + const ui64 partitionQuotedTime, const TDuration topicQuotedTime, const ui64 queueTime, const ui64 writeTime) { Y_VERIFY(offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, offset); Y_VERIFY(seqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, seqNo); @@ -299,8 +279,7 @@ void TPartition::ReplyWrite( void TPartition::ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, - const TInstant writeTimestamp, const TInstant createTimestamp) -{ + const TInstant writeTimestamp, const TInstant createTimestamp) { THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst); NKikimrClient::TResponse& resp = response->Response; resp.SetStatus(NMsgBusProxy::MSTATUS_OK); @@ -323,8 +302,7 @@ void TPartition::ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst static void RequestRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, - TKeyPrefix::EType c, bool includeData = false, const TString& key = "", bool dropTmp = false) -{ + TKeyPrefix::EType c, bool includeData = false, const TString& key = "", bool dropTmp = false) { THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); auto read = request->Record.AddCmdReadRange(); auto range = read->MutableRange(); @@ -356,8 +334,7 @@ static void RequestRange(const TActorContext& ctx, const TActorId& dst, ui32 par } -NKikimrClient::TKeyValueRequest::EStorageChannel GetChannel(ui32 i) -{ +NKikimrClient::TKeyValueRequest::EStorageChannel GetChannel(ui32 i) { return NKikimrClient::TKeyValueRequest::EStorageChannel(NKikimrClient::TKeyValueRequest::MAIN + i); } @@ -369,8 +346,7 @@ void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels) { } -static void RequestDiskStatus(const TActorContext& ctx, const TActorId& dst, ui32 numChannels) -{ +static void RequestDiskStatus(const TActorContext& ctx, const TActorId& dst, ui32 numChannels) { THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); AddCheckDiskRequest(request.Get(), numChannels); @@ -379,13 +355,11 @@ static void RequestDiskStatus(const TActorContext& ctx, const TActorId& dst, ui3 } -void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) -{ +void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) { RequestRange(ctx, dst, partition, TKeyPrefix::TypeInfo, true, key, key == ""); } -void RequestMetaRead(const TActorContext& ctx, const TActorId& dst, ui32 partition) -{ +void RequestMetaRead(const TActorContext& ctx, const TActorId& dst, ui32 partition) { THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); auto read = request->Record.AddCmdRead(); TKeyPrefix key{TKeyPrefix::TypeMeta, partition}; @@ -393,8 +367,7 @@ void RequestMetaRead(const TActorContext& ctx, const TActorId& dst, ui32 partiti ctx.Send(dst, request.Release()); } -void RequestData(const TActorContext& ctx, const TActorId& dst, const TVector<TString>& keys) -{ +void RequestData(const TActorContext& ctx, const TActorId& dst, const TVector<TString>& keys) { THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); for (auto& key: keys) { auto read = request->Record.AddCmdRead(); @@ -403,8 +376,7 @@ void RequestData(const TActorContext& ctx, const TActorId& dst, const TVector<TS ctx.Send(dst, request.Release()); } -void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) -{ +void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) { RequestRange(ctx, dst, partition, TKeyPrefix::TypeData, false, key); } @@ -488,11 +460,11 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co , InitDuration(TDuration::Zero()) , InitDone(false) , NewPartition(newPartition) - // ToDo ToReview - Which name to use here? It verifies in tablet_counters_protobuf.h:633 on proper path - , PartitionLabeledCounters( + // TODO: ToReview - Which name to use here? It verifies in tablet_counters_protobuf.h:633 on proper path + , PartitionCounters( topicConverter->IsFirstClass() ? nullptr : new TPartitionLabeledCounters(topicConverter->GetClientsideName(), partition)) - , Subscriber(partition, Counters, Tablet) + , Subscriber(partition, TabletCounters, Tablet) , WriteCycleStartTime(ctx.Now()) , WriteCycleSize(0) , WriteNewSize(0) @@ -510,8 +482,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co , TotalChannelWritesByHead(Config.GetPartitionConfig().GetNumChannels(), 0) , WriteBufferIsFullCounter(nullptr) , WriteTimestamp(ctx.Now()) - , WriteLagMs(TDuration::Minutes(1), 100) -{ + , WriteLagMs(TDuration::Minutes(1), 100) { if (Config.GetPartitionConfig().HasMirrorFrom()) { ManageWriteTimestampEstimate = !Config.GetPartitionConfig().GetMirrorFrom().GetSyncWriteTime(); } else { @@ -522,11 +493,10 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co CalcTopicWriteQuotaParams(); - Counters.Populate(counters); + TabletCounters.Populate(counters); } -void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx) -{ +void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx) { TVector<TString> res; TString str; if (CurrentStateFunc() == &TThis::StateInit) { @@ -570,8 +540,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo res.push_back(out.Str()); out.Clear(); } out << Config.DebugString(); res.push_back(out.Str()); out.Clear(); - HTML(out) - { + HTML(out) { DIV_CLASS_ID("tab-pane fade", Sprintf("partition_%u", ui32(Partition))) { TABLE_SORTABLE_CLASS("table") { TABLEHEAD() { @@ -721,8 +690,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo } -void TPartition::Bootstrap(const TActorContext& ctx) -{ +void TPartition::Bootstrap(const TActorContext& ctx) { UsersInfoStorage.Init(Tablet, SelfId()); Y_VERIFY(AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() > 0); @@ -741,7 +709,6 @@ void TPartition::Bootstrap(const TActorContext& ctx) std::reverse(CompactLevelBorder.begin(), CompactLevelBorder.end()); - for (ui32 i = 0; i < TotalLevels; ++i) { DataKeysHead.push_back(TKeyLevel(CompactLevelBorder[i])); } @@ -762,7 +729,6 @@ void TPartition::Bootstrap(const TActorContext& ctx) } if (AppData(ctx)->Counters) { - TVector<NPersQueue::TPQLabelsInfo> labels; if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) { SetupStreamCounters(ctx); } else { @@ -966,7 +932,6 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) { void TPartition::UpdateAvailableSize(const TActorContext& ctx) { - FilterDeadlinedWrites(ctx); auto now = ctx.Now(); @@ -987,7 +952,7 @@ void TPartition::UpdateAvailableSize(const TActorContext& ctx) { } } ScheduleUpdateAvailableSize(ctx); - ReportLabeledCounters(ctx); + ReportCounters(ctx); } void TPartition::HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActorContext& ctx) { @@ -1004,9 +969,9 @@ void TPartition::HandleWakeup(const TActorContext& ctx) { FilterDeadlinedWrites(ctx); ctx.Schedule(WAKE_TIMEOUT, new TEvents::TEvWakeup()); - ctx.Send(Tablet, new TEvPQ::TEvPartitionCounters(Partition, Counters)); + ctx.Send(Tablet, new TEvPQ::TEvPartitionCounters(Partition, TabletCounters)); - ReportLabeledCounters(ctx); + ReportCounters(ctx); ProcessHasDataRequests(ctx); @@ -1231,7 +1196,7 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont void TPartition::Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContext& /*ctx*/) { if (Mirrorer) { auto diff = ev->Get()->Counters.MakeDiffForAggr(Mirrorer->Baseline); - Counters.Populate(*diff.Get()); + TabletCounters.Populate(*diff.Get()); ev->Get()->Counters.RememberCurrentStateAsBaseline(Mirrorer->Baseline); } } @@ -1240,7 +1205,7 @@ void TPartition::Handle(NReadSpeedLimiterEvents::TEvCounters::TPtr& ev, const TA auto userInfo = UsersInfoStorage.GetIfExists(ev->Get()->User); if (userInfo && userInfo->ReadSpeedLimiter) { auto diff = ev->Get()->Counters.MakeDiffForAggr(userInfo->ReadSpeedLimiter->Baseline); - Counters.Populate(*diff.Get()); + TabletCounters.Populate(*diff.Get()); ev->Get()->Counters.RememberCurrentStateAsBaseline(userInfo->ReadSpeedLimiter->Baseline); } } @@ -1276,14 +1241,13 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) Die(ctx); } -void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx) -{ +void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx) { for (const auto& w : Requests) { ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::WRITE_ERROR_DISK_IS_FULL, "Disk is full"); if (w.IsWrite()) { const auto& msg = w.GetWrite().Msg; - Counters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); - Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size()); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size()); WriteInflightSize -= msg.Data.size(); } } @@ -1298,8 +1262,7 @@ void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx) } -void TPartition::FailBadClient(const TActorContext& ctx) -{ +void TPartition::FailBadClient(const TActorContext& ctx) { for (auto it = Owners.begin(); it != Owners.end();) { it = DropOwner(it, ctx); } @@ -1310,8 +1273,8 @@ void TPartition::FailBadClient(const TActorContext& ctx) ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::BAD_REQUEST, "previous write request failed"); if (w.IsWrite()) { const auto& msg = w.GetWrite().Msg; - Counters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); - Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size()); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size()); WriteInflightSize -= msg.Data.size(); } } @@ -1321,9 +1284,9 @@ void TPartition::FailBadClient(const TActorContext& ctx) for (const auto& w : Responses) { ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::BAD_REQUEST, "previous write request failed"); if (w.IsWrite()) - Counters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); } - Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(WriteNewSize); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(WriteNewSize); Responses.clear(); ProcessChangeOwnerRequests(ctx); @@ -1331,13 +1294,11 @@ void TPartition::FailBadClient(const TActorContext& ctx) } -bool CheckDiskStatus(const TStorageStatusFlags status) -{ +bool CheckDiskStatus(const TStorageStatusFlags status) { return !status.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove); } -void TPartition::HandleGetDiskStatus(const NKikimrClient::TResponse& response, const TActorContext& ctx) -{ +void TPartition::HandleGetDiskStatus(const NKikimrClient::TResponse& response, const TActorContext& ctx) { bool diskIsOk = true; for (ui32 i = 0; i < response.GetStatusResultSize(); ++i) { auto& res = response.GetGetStatusResult(i); @@ -1362,8 +1323,7 @@ void TPartition::HandleGetDiskStatus(const NKikimrClient::TResponse& response, c RequestMetaRead(ctx, Tablet, Partition); } -void TPartition::HandleMetaRead(const NKikimrClient::TKeyValueResponse::TReadResult& response, const TActorContext& ctx) -{ +void TPartition::HandleMetaRead(const NKikimrClient::TKeyValueResponse::TReadResult& response, const TActorContext& ctx) { NKikimrPQ::TPartitionMeta meta; switch (response.GetStatus()) { case NKikimrProto::OK: { @@ -1398,8 +1358,7 @@ void TPartition::HandleMetaRead(const NKikimrClient::TKeyValueResponse::TReadRes -void TPartition::HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) -{ +void TPartition::HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) { //megaqc check here all results Y_VERIFY(range.HasStatus()); const TString *key = nullptr; @@ -1459,8 +1418,7 @@ void TPartition::HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TRe }; } -void TPartition::FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) -{ +void TPartition::FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) { for (ui32 i = 0; i < range.PairSize(); ++i) { auto pair = range.GetPair(i); Y_VERIFY(pair.GetStatus() == NKikimrProto::OK); //this is readrange without keys, only OK could be here @@ -1495,8 +1453,7 @@ void TPartition::FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TRead Y_VERIFY(EndOffset >= StartOffset); } -void TPartition::FormHeadAndProceed(const TActorContext& ctx) -{ +void TPartition::FormHeadAndProceed(const TActorContext& ctx) { Head.Offset = EndOffset; Head.PartNo = 0; TVector<TString> keys; @@ -1528,8 +1485,7 @@ void TPartition::FormHeadAndProceed(const TActorContext& ctx) RequestData(ctx, Tablet, keys); } -void TPartition::HandleDataRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) -{ +void TPartition::HandleDataRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) { Y_VERIFY(range.HasStatus()); switch(range.GetStatus()) { case NKikimrProto::OK: @@ -1553,8 +1509,7 @@ void TPartition::HandleDataRangeRead(const NKikimrClient::TKeyValueResponse::TRe }; } -void TPartition::HandleDataRead(const NKikimrClient::TResponse& response, const TActorContext& ctx) -{ +void TPartition::HandleDataRead(const NKikimrClient::TResponse& response, const TActorContext& ctx) { Y_VERIFY(InitState == WaitDataRead); ui32 currentLevel = 0; Y_VERIFY(HeadKeys.size() == response.ReadResultSize()); @@ -1716,7 +1671,7 @@ void TPartition::InitComplete(const TActorContext& ctx) { Become(&TThis::StateIdle); InitDuration = ctx.Now() - CreationTime; InitDone = true; - Counters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds()); + TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds()); FillReadFromTimestamps(Config, ctx); @@ -1744,11 +1699,11 @@ void TPartition::InitComplete(const TActorContext& ctx) { Y_VERIFY(userInfoPair.second.Offset >= 0); ReadTimestampForOffset(userInfoPair.first, userInfoPair.second, ctx); } - if (PartitionLabeledCounters) { - PartitionLabeledCounters->GetCounters()[METRIC_INIT_TIME] = InitDuration.MilliSeconds(); - PartitionLabeledCounters->GetCounters()[METRIC_LIFE_TIME] = CreationTime.MilliSeconds(); - PartitionLabeledCounters->GetCounters()[METRIC_PARTITIONS] = 1; - ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *PartitionLabeledCounters)); + if (PartitionCounters) { + PartitionCounters->GetCounters()[METRIC_INIT_TIME] = InitDuration.MilliSeconds(); + PartitionCounters->GetCounters()[METRIC_LIFE_TIME] = CreationTime.MilliSeconds(); + PartitionCounters->GetCounters()[METRIC_PARTITIONS] = 1; + ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *PartitionCounters)); } UpdateUserInfoEndOffset(ctx.Now()); @@ -1785,7 +1740,7 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c //cookie is generated. but answer will be sent when all inflight writes will be done - they in the same queue 'Requests' Requests.emplace_back(TOwnershipMsg{ev->Cookie, it->second.OwnerCookie}, WriteQuota.GetQuotedTime(), ctx.Now().MilliSeconds(), 0); - Counters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); + TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); UpdateWriteBufferIsFullState(ctx.Now()); ProcessReserveRequests(ctx); @@ -1799,7 +1754,7 @@ THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THas Y_VERIFY(ReservedSize >= it->second.ReservedSize); ReservedSize -= it->second.ReservedSize; UpdateWriteBufferIsFullState(ctx.Now()); - Counters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); + TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); for (auto& ev : it->second.WaitToChangeOwner) { //this request maybe could be done right now WaitToChangeOwner.push_back(THolder<TEvPQ::TEvChangeOwner>(ev.Release())); } @@ -1939,7 +1894,7 @@ void TPartition::ProcessReserveRequests(const TActorContext& ctx) { break; } UpdateWriteBufferIsFullState(ctx.Now()); - Counters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); + TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); } void TPartition::UpdateWriteBufferIsFullState(const TInstant& now) { @@ -2155,25 +2110,32 @@ void TPartition::LogAndCollectError(NKikimrServices::EServiceKikimr service, con LogAndCollectError(error, ctx); } -std::pair<TInstant, TInstant> TPartition::GetTime(const TUserInfo& userInfo, ui64 offset) const -{ +std::pair<TInstant, TInstant> TPartition::GetTime(const TUserInfo& userInfo, ui64 offset) const { TInstant wtime = userInfo.WriteTimestamp > TInstant::Zero() ? userInfo.WriteTimestamp : GetWriteTimeEstimate(offset); return std::make_pair(wtime, userInfo.CreateTimestamp); } //zero means no such record -TInstant TPartition::GetWriteTimeEstimate(ui64 offset) const -{ +TInstant TPartition::GetWriteTimeEstimate(ui64 offset) const { if (offset < StartOffset) offset = StartOffset; if (offset >= EndOffset) return TInstant::Zero(); - const std::deque<TDataKey>& container = (offset < Head.Offset || offset == Head.Offset && Head.PartNo > 0) ? DataKeysBody : HeadKeys; + const std::deque<TDataKey>& container = + (offset < Head.Offset || offset == Head.Offset && Head.PartNo > 0) ? DataKeysBody : HeadKeys; Y_VERIFY(!container.empty()); auto it = std::upper_bound(container.begin(), container.end(), offset, - [](const ui64 offset, const TDataKey& p) { return offset < p.Key.GetOffset() || offset == p.Key.GetOffset() && p.Key.GetPartNo() > 0;}); - Y_VERIFY(it != container.begin(),"Tablet %lu StartOffset %lu, HeadOffset %lu, offset %lu, containter size %lu, first-elem: %s", - TabletID, StartOffset, Head.Offset, offset, container.size(), container.front().Key.ToString().c_str()); //always greater - Y_VERIFY(it == container.end() || it->Key.GetOffset() > offset || it->Key.GetOffset() == offset && it->Key.GetPartNo() > 0); + [](const ui64 offset, const TDataKey& p) { + return offset < p.Key.GetOffset() || + offset == p.Key.GetOffset() && p.Key.GetPartNo() > 0; + }); + // Always greater + Y_VERIFY(it != container.begin(), + "Tablet %lu StartOffset %lu, HeadOffset %lu, offset %lu, containter size %lu, first-elem: %s", + TabletID, StartOffset, Head.Offset, offset, container.size(), + container.front().Key.ToString().c_str()); + Y_VERIFY(it == container.end() || + it->Key.GetOffset() > offset || + it->Key.GetOffset() == offset && it->Key.GetPartNo() > 0); --it; if (it != container.begin()) --it; @@ -2186,7 +2148,7 @@ void TPartition::Handle(TEvPQ::TEvGetClientOffset::TPtr& ev, const TActorContext Y_VERIFY(userInfo.Offset >= -1, "Unexpected Offset: %" PRIi64, userInfo.Offset); ui64 offset = Max<i64>(userInfo.Offset, 0); auto ts = GetTime(userInfo, offset); - Counters.Cumulative()[COUNTER_PQ_GET_CLIENT_OFFSET_OK].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_GET_CLIENT_OFFSET_OK].Increment(1); ReplyGetClientOffsetOk(ctx, ev->Get()->Cookie, userInfo.Offset, ts.first, ts.second); } @@ -2207,7 +2169,7 @@ void TPartition::Handle(TEvPQ::TEvSetClientInfo::TPtr& ev, const TActorContext& auto& userInfo = UsersInfoStorage.GetOrCreate(user, ctx); if (userInfo.UserActs.size() > MAX_USER_ACTS) { - Counters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1); ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::OVERLOAD, TStringBuilder() << "too big inflight: " << userInfo.UserActs.size()); return; @@ -2222,8 +2184,7 @@ void TPartition::Handle(TEvPQ::TEvSetClientInfo::TPtr& ev, const TActorContext& ProcessUserActs(userInfo, ctx); } -void TPartition::ProcessUserActs(TUserInfo& userInfo, const TActorContext& ctx) -{ +void TPartition::ProcessUserActs(TUserInfo& userInfo, const TActorContext& ctx) { if (userInfo.WriteInProgress || userInfo.UserActs.empty()) return; ui64 cookie = ++SetOffsetCookie; @@ -2280,28 +2241,27 @@ void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& c if (HasError(*ev->Get())) { if (info.IsSubscription) { - Counters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_ERROR].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_ERROR].Increment(1); } - Counters.Cumulative()[COUNTER_PQ_READ_ERROR].Increment(1); - Counters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds()); + TabletCounters.Cumulative()[COUNTER_PQ_READ_ERROR].Increment(1); + TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds()); } else { if (info.IsSubscription) { - Counters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_OK].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_OK].Increment(1); } const auto& resp = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get())->Response; - Counters.Cumulative()[COUNTER_PQ_READ_OK].Increment(1); - Counters.Percentile()[COUNTER_LATENCY_PQ_READ_OK].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds()); - Counters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp.ByteSize()); + TabletCounters.Cumulative()[COUNTER_PQ_READ_OK].Increment(1); + TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_OK].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds()); + TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp.ByteSize()); } ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release()); - ReportLabeledCounters(ctx); + ReportCounters(ctx); OnReadRequestFinished(std::move(info), answer.Size); } template <typename T> // TCmdReadResult -static void AddResultBlob(T* read, const TClientBlob& blob, ui64 offset) -{ +static void AddResultBlob(T* read, const TClientBlob& blob, ui64 offset) { auto cc = read->AddResult(); cc->SetOffset(offset); cc->SetData(blob.Data); @@ -2381,7 +2341,7 @@ TReadAnswer TReadInfo::FormAnswer( ui32 lastBlobSize = 0; const TVector<TRequestedBlob>& blobs = response->GetBlobs(); - auto updateUsage = [&](const TClientBlob& blob){ + auto updateUsage = [&](const TClientBlob& blob) { size += blob.GetBlobSize(); lastBlobSize += blob.GetBlobSize(); if (blob.IsLastPart()) { @@ -2542,8 +2502,7 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct } -TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize) -{ +TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize) { Y_VERIFY(rcount && rsize); ui32& count = *rcount; ui32& size = *rsize; @@ -2590,8 +2549,7 @@ TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(const ui64 startOffse -TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset) -{ +TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset) { Y_UNUSED(readTimestampMs); ui32& count = *rcount; ui32& size = *rsize; @@ -2603,8 +2561,7 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset, Y_VERIFY(pos != Max<ui32>()); } ui32 lastBlobSize = 0; - for (;pos < Head.Batches.size(); ++pos) - { + for (;pos < Head.Batches.size(); ++pos) { TVector<TClientBlob> blobs; Head.Batches[pos].UnpackTo(&blobs); @@ -2659,13 +2616,13 @@ void TPartition::Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx) { auto read = ev->Get(); if (read->Count == 0) { - Counters.Cumulative()[COUNTER_PQ_READ_ERROR].Increment(1); - Counters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor(0); + TabletCounters.Cumulative()[COUNTER_PQ_READ_ERROR].Increment(1); + TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor(0); ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, "no infinite flows allowed - count is not set or 0"); return; } if (read->Offset < StartOffset) { - Counters.Cumulative()[COUNTER_PQ_READ_ERROR_SMALL_OFFSET].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_READ_ERROR_SMALL_OFFSET].Increment(1); read->Offset = StartOffset; if (read->PartNo > 0) { LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, @@ -2680,8 +2637,8 @@ void TPartition::Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx) { } } if (read->Offset > EndOffset || read->Offset == EndOffset && read->PartNo > 0) { - Counters.Cumulative()[COUNTER_PQ_READ_ERROR_BIG_OFFSET].Increment(1); - Counters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor(0); + TabletCounters.Cumulative()[COUNTER_PQ_READ_ERROR_BIG_OFFSET].Increment(1); + TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor(0); LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "reading from too big offset - topic " << TopicConverter->GetClientsideName() << " partition " << Partition << @@ -2702,8 +2659,8 @@ void TPartition::Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx) { if (!read->SessionId.empty()) { if (userInfo.Session != read->SessionId) { - Counters.Cumulative()[COUNTER_PQ_READ_ERROR_NO_SESSION].Increment(1); - Counters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor(0); + TabletCounters.Cumulative()[COUNTER_PQ_READ_ERROR_NO_SESSION].Increment(1); + TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor(0); ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::READ_ERROR_NO_SESSION, TStringBuilder() << "no such session '" << read->SessionId << "'"); return; @@ -2783,8 +2740,7 @@ void TPartition::OnReadRequestFinished(TReadInfo&& info, ui64 answerSize) { } } -void TPartition::AnswerCurrentWrites(const TActorContext& ctx) -{ +void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { ui64 offset = EndOffset; while (!Responses.empty()) { const ui64 quotedTime = Responses.front().QuotedTime; @@ -2822,13 +2778,13 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) } if (!already && partNo + 1 == totalParts) { if (it == SourceIdStorage.GetInMemorySourceIds().end()) { - Counters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1); SourceIdStorage.RegisterSourceId(s, writeResponse.Msg.SeqNo, offset, CurrentTimestamp); } else { SourceIdStorage.RegisterSourceId(s, it->second.Updated(writeResponse.Msg.SeqNo, offset, CurrentTimestamp)); } - Counters.Cumulative()[COUNTER_PQ_WRITE_OK].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_OK].Increment(1); } ReplyWrite( ctx, writeResponse.Cookie, s, seqNo, partNo, totalParts, @@ -2838,10 +2794,11 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "Answering for message sourceid: '" << EscapeC(s) << "', Topic: '" << TopicConverter->GetClientsideName() - << "', Partition: " << Partition - << ", SeqNo: " << seqNo << ", partNo: " << partNo << ", Offset: " << offset << " is " - << (already ? "already written" : "stored on disk") + "Answering for message sourceid: '" << EscapeC(s) << + "', Topic: '" << TopicConverter->GetClientsideName() << + "', Partition: " << Partition << + ", SeqNo: " << seqNo << ", partNo: " << partNo << + ", Offset: " << offset << " is " << (already ? "already written" : "stored on disk") ); if (PartitionWriteQuotaWaitCounter) { PartitionWriteQuotaWaitCounter->IncFor(quotedTime); @@ -2900,17 +2857,16 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) } -void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo, const TActorContext& ctx) -{ +void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo, const TActorContext& ctx) { if (userInfo.ReadScheduled) return; userInfo.ReadScheduled = true; LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition - << " user " << user << " readTimeStamp for offset " << userInfo.Offset << " initiated " - << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset - << " ReadingTimestamp " << ReadingTimestamp << " rrg " << userInfo.ReadRuleGeneration + "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << + " user " << user << " readTimeStamp for offset " << userInfo.Offset << " initiated " << + " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset << + " ReadingTimestamp " << ReadingTimestamp << " rrg " << userInfo.ReadRuleGeneration ); if (ReadingTimestamp) { @@ -2929,14 +2885,14 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo userInfo.ReadWriteTimestamp = userInfo.WriteTimestamp; } - Counters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_OFFSET_IS_LOST].Increment(1); - ReportLabeledCounters(ctx); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_OFFSET_IS_LOST].Increment(1); + ReportCounters(ctx); return; } if (userInfo.Offset >= (i64)EndOffset || StartOffset == EndOffset) { userInfo.ReadScheduled = false; - ReportLabeledCounters(ctx); + ReportCounters(ctx); return; } @@ -2965,10 +2921,9 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo false); ctx.Send(ctx.SelfID, event.Release()); - Counters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_MISS].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_MISS].Increment(1); } - void TPartition::ProcessTimestampsForNewData(const ui64 prevEndOffset, const TActorContext& ctx) { for (auto& userInfoPair : UsersInfoStorage.GetAll()) { if (userInfoPair.second.Offset >= (i64)prevEndOffset && userInfoPair.second.Offset < (i64)EndOffset) { @@ -2977,16 +2932,16 @@ void TPartition::ProcessTimestampsForNewData(const ui64 prevEndOffset, const TAc } } - -void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& ctx) -{ +void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& ctx) { ReadingTimestamp = false; auto userInfo = UsersInfoStorage.GetIfExists(ReadingForUser); if (!userInfo || userInfo->ReadRuleGeneration != ReadingForUserReadRuleGeneration) { LOG_INFO_S( ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition - << " user " << ReadingForUser << " readTimeStamp for other generation or no client info at all" + "Topic '" << TopicConverter->GetClientsideName() << "'" << + " partition " << Partition << + " user " << ReadingForUser << + " readTimeStamp for other generation or no client info at all" ); ProcessTimestampRead(ctx); @@ -2995,9 +2950,12 @@ void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition - << " user " << ReadingForUser << " readTimeStamp done, result " << userInfo->WriteTimestamp.MilliSeconds() - << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset + "Topic '" << TopicConverter->GetClientsideName() << "'" << + " partition " << Partition << + " user " << ReadingForUser << + " readTimeStamp done, result " << userInfo->WriteTimestamp.MilliSeconds() << + " queuesize " << UpdateUserInfoTimestamp.size() << + " startOffset " << StartOffset ); Y_VERIFY(userInfo->ReadScheduled); userInfo->ReadScheduled = false; @@ -3009,10 +2967,11 @@ void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& NKikimrServices::PERSQUEUE, "Reading Timestamp failed for offset " << ReadingForOffset << " ( "<< userInfo->Offset << " ) " << ev->Get()->Response.DebugString() ); - if (ev->Get()->Response.GetStatus() == NMsgBusProxy::MSTATUS_OK && ev->Get()->Response.GetErrorCode() == NPersQueue::NErrorCode::OK - && ev->Get()->Response.GetPartitionResponse().HasCmdReadResult() - && ev->Get()->Response.GetPartitionResponse().GetCmdReadResult().ResultSize() > 0 - && (i64)ev->Get()->Response.GetPartitionResponse().GetCmdReadResult().GetResult(0).GetOffset() >= userInfo->Offset) { + if (ev->Get()->Response.GetStatus() == NMsgBusProxy::MSTATUS_OK && + ev->Get()->Response.GetErrorCode() == NPersQueue::NErrorCode::OK && + ev->Get()->Response.GetPartitionResponse().HasCmdReadResult() && + ev->Get()->Response.GetPartitionResponse().GetCmdReadResult().ResultSize() > 0 && + (i64)ev->Get()->Response.GetPartitionResponse().GetCmdReadResult().GetResult(0).GetOffset() >= userInfo->Offset) { //offsets is inside gap - return timestamp of first record after gap const auto& res = ev->Get()->Response.GetPartitionResponse().GetCmdReadResult().GetResult(0); userInfo->WriteTimestamp = TInstant::MilliSeconds(res.GetWriteTimestampMS()); @@ -3027,7 +2986,7 @@ void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& UpdateUserInfoTimestamp.push_back(std::make_pair(ReadingForUser, ReadingForUserReadRuleGeneration)); userInfo->ReadScheduled = true; } - Counters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_ERROR].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_ERROR].Increment(1); } ProcessTimestampRead(ctx); } @@ -3050,12 +3009,11 @@ void TPartition::ProcessTimestampRead(const TActorContext& ctx) { ReadTimestampForOffset(user, *userInfo, ctx); } Y_VERIFY(ReadingTimestamp || UpdateUserInfoTimestamp.empty()); - ReportLabeledCounters(ctx); + ReportCounters(ctx); } -void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) -{ +void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) { ReadingTimestamp = false; auto userInfo = UsersInfoStorage.GetIfExists(ReadingForUser); if (!userInfo || userInfo->ReadRuleGeneration != ReadingForUserReadRuleGeneration) { @@ -3077,8 +3035,7 @@ void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) } -void TPartition::CheckHeadConsistency() const -{ +void TPartition::CheckHeadConsistency() const { ui32 p = 0; for (ui32 j = 0; j < DataKeysHead.size(); ++j) { ui32 s = 0; @@ -3092,7 +3049,8 @@ void TPartition::CheckHeadConsistency() const } Y_VERIFY(s < DataKeysHead[j].Border()); } - Y_VERIFY(DataKeysBody.empty() || Head.Offset >= DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount()); + Y_VERIFY(DataKeysBody.empty() || + Head.Offset >= DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount()); Y_VERIFY(p == HeadKeys.size()); if (!HeadKeys.empty()) { Y_VERIFY(HeadKeys.size() <= TotalMaxCount); @@ -3106,8 +3064,7 @@ void TPartition::CheckHeadConsistency() const } -void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) -{ +void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) { if (!CompactedKeys.empty()) HeadKeys.clear(); @@ -3174,8 +3131,7 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) } -ui64 TPartition::GetSizeLag(i64 offset) -{ +ui64 TPartition::GetSizeLag(i64 offset) { ui64 sizeLag = 0; if (!DataKeysBody.empty() && (offset < (i64)Head.Offset || offset == (i64)Head.Offset && Head.PartNo > 0)) { //there will be something in body auto it = std::upper_bound(DataKeysBody.begin(), DataKeysBody.end(), std::make_pair(offset, 0), @@ -3194,9 +3150,8 @@ ui64 TPartition::GetSizeLag(i64 offset) } -void TPartition::ReportLabeledCounters(const TActorContext& ctx) -{ - if (!PartitionLabeledCounters) { +void TPartition::ReportCounters(const TActorContext& ctx) { + if (!PartitionCounters) { return; } //per client counters @@ -3261,23 +3216,25 @@ void TPartition::ReportLabeledCounters(const TActorContext& ctx) userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_MESSAGE_LAG].Set(EndOffset - userInfo.Offset); } - if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_MESSAGE_LAG].Get() != EndOffset - off) { haveChanges = true; userInfo.LabeledCounters->GetCounters()[METRIC_READ_MESSAGE_LAG].Set(EndOffset - off); userInfo.LabeledCounters->GetCounters()[METRIC_READ_TOTAL_MESSAGE_LAG].Set(EndOffset - off); } + ui64 sizeLag = GetSizeLag(userInfo.Offset); - ui64 sizeLagRead = GetSizeLag(userInfo.ReadOffset); if (userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_SIZE_LAG].Get() != sizeLag) { haveChanges = true; userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_SIZE_LAG].Set(sizeLag); } + + ui64 sizeLagRead = GetSizeLag(userInfo.ReadOffset); if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_SIZE_LAG].Get() != sizeLagRead) { haveChanges = true; userInfo.LabeledCounters->GetCounters()[METRIC_READ_SIZE_LAG].Set(sizeLagRead); userInfo.LabeledCounters->GetCounters()[METRIC_READ_TOTAL_SIZE_LAG].Set(sizeLag); } + if (userInfo.LabeledCounters->GetCounters()[METRIC_USER_PARTITIONS].Get() == 0) { haveChanges = true; userInfo.LabeledCounters->GetCounters()[METRIC_USER_PARTITIONS].Set(1); @@ -3329,60 +3286,59 @@ void TPartition::ReportLabeledCounters(const TActorContext& ctx) ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *userInfo.LabeledCounters)); } } - //Partition counters bool haveChanges = false; - if (SourceIdStorage.GetInMemorySourceIds().size() != PartitionLabeledCounters->GetCounters()[METRIC_MAX_NUM_SIDS].Get()) { + if (SourceIdStorage.GetInMemorySourceIds().size() != PartitionCounters->GetCounters()[METRIC_MAX_NUM_SIDS].Get()) { haveChanges = true; - PartitionLabeledCounters->GetCounters()[METRIC_MAX_NUM_SIDS].Set(SourceIdStorage.GetInMemorySourceIds().size()); - PartitionLabeledCounters->GetCounters()[METRIC_NUM_SIDS].Set(SourceIdStorage.GetInMemorySourceIds().size()); + PartitionCounters->GetCounters()[METRIC_MAX_NUM_SIDS].Set(SourceIdStorage.GetInMemorySourceIds().size()); + PartitionCounters->GetCounters()[METRIC_NUM_SIDS].Set(SourceIdStorage.GetInMemorySourceIds().size()); } TDuration lifetimeNow = ctx.Now() - SourceIdStorage.MinAvailableTimestamp(ctx.Now()); - if (lifetimeNow.MilliSeconds() != PartitionLabeledCounters->GetCounters()[METRIC_MIN_SID_LIFETIME].Get()) { + if (lifetimeNow.MilliSeconds() != PartitionCounters->GetCounters()[METRIC_MIN_SID_LIFETIME].Get()) { haveChanges = true; - PartitionLabeledCounters->GetCounters()[METRIC_MIN_SID_LIFETIME].Set(lifetimeNow.MilliSeconds()); + PartitionCounters->GetCounters()[METRIC_MIN_SID_LIFETIME].Set(lifetimeNow.MilliSeconds()); } - ui64 headGapSize = DataKeysBody.empty() ? 0 : (Head.Offset - (DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount())); - ui64 gapSize = GapSize + headGapSize; - ui32 gapsCount = GapOffsets.size() + (headGapSize ? 1 : 0); - - if (gapSize != PartitionLabeledCounters->GetCounters()[METRIC_GAPS_SIZE].Get()) { + const ui64 headGapSize = DataKeysBody.empty() ? 0 : (Head.Offset - (DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount())); + const ui64 gapSize = GapSize + headGapSize; + if (gapSize != PartitionCounters->GetCounters()[METRIC_GAPS_SIZE].Get()) { haveChanges = true; - PartitionLabeledCounters->GetCounters()[METRIC_MAX_GAPS_SIZE].Set(gapSize); - PartitionLabeledCounters->GetCounters()[METRIC_GAPS_SIZE].Set(gapSize); + PartitionCounters->GetCounters()[METRIC_MAX_GAPS_SIZE].Set(gapSize); + PartitionCounters->GetCounters()[METRIC_GAPS_SIZE].Set(gapSize); } - if (gapsCount != PartitionLabeledCounters->GetCounters()[METRIC_GAPS_COUNT].Get()) { + + const ui32 gapsCount = GapOffsets.size() + (headGapSize ? 1 : 0); + if (gapsCount != PartitionCounters->GetCounters()[METRIC_GAPS_COUNT].Get()) { haveChanges = true; - PartitionLabeledCounters->GetCounters()[METRIC_MAX_GAPS_COUNT].Set(gapsCount); - PartitionLabeledCounters->GetCounters()[METRIC_GAPS_COUNT].Set(gapsCount); + PartitionCounters->GetCounters()[METRIC_MAX_GAPS_COUNT].Set(gapsCount); + PartitionCounters->GetCounters()[METRIC_GAPS_COUNT].Set(gapsCount); } ui64 speed = WriteQuota.GetTotalSpeed(); - if (speed != PartitionLabeledCounters->GetCounters()[METRIC_WRITE_QUOTA_BYTES].Get()) { + if (speed != PartitionCounters->GetCounters()[METRIC_WRITE_QUOTA_BYTES].Get()) { haveChanges = true; - PartitionLabeledCounters->GetCounters()[METRIC_WRITE_QUOTA_BYTES].Set(speed); + PartitionCounters->GetCounters()[METRIC_WRITE_QUOTA_BYTES].Set(speed); } ui64 availSec = WriteQuota.GetAvailableAvgSec(ctx.Now()); - if (availSec != PartitionLabeledCounters->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_SEC].Get()) { + if (availSec != PartitionCounters->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_SEC].Get()) { haveChanges = true; - PartitionLabeledCounters->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_SEC].Set(availSec); + PartitionCounters->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_SEC].Set(availSec); } ui64 availMin = WriteQuota.GetAvailableAvgMin(ctx.Now()); - if (availMin != PartitionLabeledCounters->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_MIN].Get()) { + if (availMin != PartitionCounters->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_MIN].Get()) { haveChanges = true; - PartitionLabeledCounters->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_MIN].Set(availMin); + PartitionCounters->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_MIN].Set(availMin); } ui32 id = METRIC_TOTAL_WRITE_SPEED_1; for (ui32 i = 0; i < AvgWriteBytes.size(); ++i) { ui64 avg = AvgWriteBytes[i].GetValue(); - if (avg != PartitionLabeledCounters->GetCounters()[id].Get()) { + if (avg != PartitionCounters->GetCounters()[id].Get()) { haveChanges = true; - PartitionLabeledCounters->GetCounters()[id].Set(avg); //total - PartitionLabeledCounters->GetCounters()[id + 1].Set(avg); //max + PartitionCounters->GetCounters()[id].Set(avg); //total + PartitionCounters->GetCounters()[id + 1].Set(avg); //max } id += 2; } @@ -3392,10 +3348,10 @@ void TPartition::ReportLabeledCounters(const TActorContext& ctx) id = METRIC_TOTAL_QUOTA_SPEED_1; for (ui32 i = 0; i < AvgQuotaBytes.size(); ++i) { ui64 avg = AvgQuotaBytes[i].GetValue(); - if (avg != PartitionLabeledCounters->GetCounters()[id].Get()) { + if (avg != PartitionCounters->GetCounters()[id].Get()) { haveChanges = true; - PartitionLabeledCounters->GetCounters()[id].Set(avg); //total - PartitionLabeledCounters->GetCounters()[id + 1].Set(avg); //max + PartitionCounters->GetCounters()[id].Set(avg); //total + PartitionCounters->GetCounters()[id + 1].Set(avg); //max } id += 2; } @@ -3403,34 +3359,33 @@ void TPartition::ReportLabeledCounters(const TActorContext& ctx) if (WriteQuota.GetTotalSpeed()) { ui64 quotaUsage = ui64(AvgQuotaBytes[1].GetValue()) * 1000000 / WriteQuota.GetTotalSpeed() / 60; - if (quotaUsage != PartitionLabeledCounters->GetCounters()[METRIC_WRITE_QUOTA_USAGE].Get()) { + if (quotaUsage != PartitionCounters->GetCounters()[METRIC_WRITE_QUOTA_USAGE].Get()) { haveChanges = true; - PartitionLabeledCounters->GetCounters()[METRIC_WRITE_QUOTA_USAGE].Set(quotaUsage); + PartitionCounters->GetCounters()[METRIC_WRITE_QUOTA_USAGE].Set(quotaUsage); } } ui64 partSize = BodySize + Head.PackedSize; - if (partSize != PartitionLabeledCounters->GetCounters()[METRIC_TOTAL_PART_SIZE].Get()) { + if (partSize != PartitionCounters->GetCounters()[METRIC_TOTAL_PART_SIZE].Get()) { haveChanges = true; - PartitionLabeledCounters->GetCounters()[METRIC_MAX_PART_SIZE].Set(partSize); - PartitionLabeledCounters->GetCounters()[METRIC_TOTAL_PART_SIZE].Set(partSize); + PartitionCounters->GetCounters()[METRIC_MAX_PART_SIZE].Set(partSize); + PartitionCounters->GetCounters()[METRIC_TOTAL_PART_SIZE].Set(partSize); } - ui64 ts = WriteTimestamp.MilliSeconds(); - if (ts < MIN_TIMESTAMP_MS) ts = Max<i64>(); - if (PartitionLabeledCounters->GetCounters()[METRIC_LAST_WRITE_TIME].Get() != ts) { + ui64 ts = (WriteTimestamp.MilliSeconds() < MIN_TIMESTAMP_MS) ? Max<i64>() : WriteTimestamp.MilliSeconds(); + if (PartitionCounters->GetCounters()[METRIC_LAST_WRITE_TIME].Get() != ts) { haveChanges = true; - PartitionLabeledCounters->GetCounters()[METRIC_LAST_WRITE_TIME].Set(ts); + PartitionCounters->GetCounters()[METRIC_LAST_WRITE_TIME].Set(ts); } ui64 timeLag = WriteLagMs.GetValue(); - if (PartitionLabeledCounters->GetCounters()[METRIC_WRITE_TIME_LAG_MS].Get() != timeLag) { + if (PartitionCounters->GetCounters()[METRIC_WRITE_TIME_LAG_MS].Get() != timeLag) { haveChanges = true; - PartitionLabeledCounters->GetCounters()[METRIC_WRITE_TIME_LAG_MS].Set(timeLag); + PartitionCounters->GetCounters()[METRIC_WRITE_TIME_LAG_MS].Set(timeLag); } if (haveChanges) { - ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *PartitionLabeledCounters)); + ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *PartitionCounters)); } } @@ -3603,18 +3558,17 @@ void TPartition::HandleSetOffsetResponse(NKikimrClient::TResponse& response, con userInfo->ActualTimestamps = false; ReadTimestampForOffset(user, *userInfo, ctx); } else { - Counters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1); } auto counter = setSession ? COUNTER_PQ_CREATE_SESSION_OK : (dropSession ? COUNTER_PQ_DELETE_SESSION_OK : COUNTER_PQ_SET_CLIENT_OFFSET_OK); - Counters.Cumulative()[counter].Increment(1); + TabletCounters.Cumulative()[counter].Increment(1); } userInfo->WriteInProgress = false; ProcessUserActs(*userInfo, ctx); } -void TPartition::ScheduleUpdateAvailableSize(const TActorContext& ctx) -{ +void TPartition::ScheduleUpdateAvailableSize(const TActorContext& ctx) { ctx.Schedule(UPDATE_AVAIL_SIZE_INTERVAL, new TEvPQ::TEvUpdateAvailableSize()); } @@ -3646,11 +3600,11 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { SLIBigLatency.Inc(); } - Counters.Percentile()[COUNTER_LATENCY_PQ_WRITE_CYCLE].IncrementFor(totalLatencyMs); - Counters.Cumulative()[COUNTER_PQ_WRITE_CYCLE_BYTES_TOTAL].Increment(WriteCycleSize); - Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_OK].Increment(WriteNewSize); - Counters.Percentile()[COUNTER_PQ_WRITE_CYCLE_BYTES].IncrementFor(WriteCycleSize); - Counters.Percentile()[COUNTER_PQ_WRITE_NEW_BYTES].IncrementFor(WriteNewSize); + TabletCounters.Percentile()[COUNTER_LATENCY_PQ_WRITE_CYCLE].IncrementFor(totalLatencyMs); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_CYCLE_BYTES_TOTAL].Increment(WriteCycleSize); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_OK].Increment(WriteNewSize); + TabletCounters.Percentile()[COUNTER_PQ_WRITE_CYCLE_BYTES].IncrementFor(WriteCycleSize); + TabletCounters.Percentile()[COUNTER_PQ_WRITE_NEW_BYTES].IncrementFor(WriteNewSize); if (BytesWritten) BytesWritten.Inc(WriteNewSizeInternal); if (BytesWrittenUncompressed) @@ -3697,7 +3651,7 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { ProcessTimestampsForNewData(prevEndOffset, ctx); - ReportLabeledCounters(ctx); + ReportCounters(ctx); HandleWrites(ctx); } @@ -3756,8 +3710,8 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c TMaybe<ui64> offset = ev->Get()->Offset; if (WriteInflightSize > Config.GetPartitionConfig().GetMaxWriteInflightSize()) { - Counters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(ev->Get()->Msgs.size()); - Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(sz); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(ev->Get()->Msgs.size()); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(sz); ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::OVERLOAD, TStringBuilder() << "try later. Write inflight limit reached. " @@ -3791,16 +3745,14 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c } } - if (EndOffset - StartOffset >= static_cast<ui64>(Config.GetPartitionConfig().GetMaxCountInPartition()) - || BodySize + Head.PackedSize >= static_cast<ui64>(Config.GetPartitionConfig().GetMaxSizeInPartition())) { - - Counters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(ev->Get()->Msgs.size()); - Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(sz); + const ui64 maxSize = Config.GetPartitionConfig().GetMaxSizeInPartition(); + const ui64 maxCount = Config.GetPartitionConfig().GetMaxCountInPartition(); + if (EndOffset - StartOffset >= maxCount || BodySize + Head.PackedSize >= maxSize) { + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(ev->Get()->Msgs.size()); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(sz); ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL, - Sprintf("try later, partition is full - already have %" PRIu64" from %" PRIu64 " count, %" PRIu64 " from %" PRIu64 " size", - EndOffset - StartOffset, static_cast<ui64>(Config.GetPartitionConfig().GetMaxCountInPartition()), - BodySize + Head.PackedSize, static_cast<ui64>(Config.GetPartitionConfig().GetMaxSizeInPartition()))); + Sprintf("try later, partition is full - already have %" PRIu64" from %" PRIu64 " count, %" PRIu64 " from %" PRIu64 " size", EndOffset - StartOffset, maxCount, BodySize + Head.PackedSize, maxSize)); return; } ui64 size = 0; @@ -3815,8 +3767,9 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c WriteInflightSize += size; ReservedSize -= decReservedSize; - Y_VERIFY(size <= decReservedSize || decReservedSize == 0); //TODO: remove decReservedSize == 0 - Counters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); + // TODO: remove decReservedSize == 0 + Y_VERIFY(size <= decReservedSize || decReservedSize == 0); + TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); UpdateWriteBufferIsFullState(ctx.Now()); } @@ -3914,8 +3867,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TAct Requests.emplace_back(std::move(msg), WriteQuota.GetQuotedTime(), ctx.Now().MilliSeconds(), 0); } -std::pair<TKey, ui32> TPartition::Compact(const TKey& key, const ui32 size, bool headCleared) -{ +std::pair<TKey, ui32> TPartition::Compact(const TKey& key, const ui32 size, bool headCleared) { std::pair<TKey, ui32> res({key, size}); ui32 x = headCleared ? 0 : Head.PackedSize; Y_VERIFY(std::accumulate(DataKeysHead.begin(), DataKeysHead.end(), 0u, [](ui32 sum, const TKeyLevel& level){return sum + level.Sum();}) == NewHead.PackedSize + x); @@ -3937,8 +3889,7 @@ std::pair<TKey, ui32> TPartition::Compact(const TKey& key, const ui32 size, bool } -void TPartition::ProcessChangeOwnerRequests(const TActorContext& ctx) -{ +void TPartition::ProcessChangeOwnerRequests(const TActorContext& ctx) { while (!WaitToChangeOwner.empty()) { auto &ev = WaitToChangeOwner.front(); if (OwnerPipes.find(ev->PipeClient) != OwnerPipes.end()) { //this is not request from dead pipe @@ -3954,8 +3905,7 @@ void TPartition::ProcessChangeOwnerRequests(const TActorContext& ctx) } -void TPartition::BecomeIdle(const TActorContext&) -{ +void TPartition::BecomeIdle(const TActorContext&) { Become(&TThis::StateIdle); } @@ -4011,7 +3961,7 @@ void TPartition::WriteClientInfo(const ui64 cookie, TUserInfo& userInfo, const T && (ev->Type != TEvPQ::TEvSetClientInfo::ESCI_DROP_SESSION || !userInfo.Session.empty()) //but allow DropSession request when session is already dropped - for idempotence || (ev->Type == TEvPQ::TEvSetClientInfo::ESCI_CREATE_SESSION && !userInfo.Session.empty() && (ev->Generation < userInfo.Generation || ev->Generation == userInfo.Generation && ev->Step <= userInfo.Step))) { //old generation request - Counters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1); ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::WRONG_COOKIE, TStringBuilder() << "set offset in already dead session " << ev->SessionId << " actual is " << userInfo.Session); userInfo.UserActs.pop_front(); @@ -4052,7 +4002,7 @@ void TPartition::WriteClientInfo(const ui64 cookie, TUserInfo& userInfo, const T ); offset = EndOffset; ev->Offset = offset; -/* Counters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1); +/* TabletCounters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1); ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::SET_OFFSET_ERROR_COMMIT_TO_FUTURE, TStringBuilder() << "can't commit to future. Offset " << offset << " EndOffset " << EndOffset); userInfo.UserActrs.pop_front(); @@ -4112,8 +4062,8 @@ void TPartition::ClearOldHead(const ui64 offset, const ui16 partNo, TEvKeyValue: void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode = NPersQueue::NErrorCode::BAD_REQUEST) { ReplyError(ctx, p.Cookie, errorCode, errorStr); - Counters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); - Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(p.Msg.Data.size() + p.Msg.SourceId.size()); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(p.Msg.Data.size() + p.Msg.SourceId.size()); FailBadClient(ctx); NewHead.Clear(); NewHead.Offset = EndOffset; @@ -4199,7 +4149,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const WriteInflightSize -= p.Msg.Data.size(); - Counters.Percentile()[COUNTER_LATENCY_PQ_RECEIVE_QUEUE].IncrementFor(ctx.Now().MilliSeconds() - p.Msg.ReceiveTimestamp); + TabletCounters.Percentile()[COUNTER_LATENCY_PQ_RECEIVE_QUEUE].IncrementFor(ctx.Now().MilliSeconds() - p.Msg.ReceiveTimestamp); //check already written ui64 poffset = p.Offset ? *p.Offset : curOffset; @@ -4221,11 +4171,11 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const << " EndOffset " << EndOffset << " CurOffset " << curOffset << " offset " << poffset ); - Counters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1); - Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size()); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size()); } else { - Counters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1); - Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size()); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size()); } TString().swap(p.Msg.Data); @@ -4465,8 +4415,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const } -std::pair<TKey, ui32> TPartition::GetNewWriteKey(bool headCleared) -{ +std::pair<TKey, ui32> TPartition::GetNewWriteKey(bool headCleared) { bool needCompaction = false; ui32 HeadSize = headCleared ? 0 : Head.PackedSize; if (HeadSize + NewHead.PackedSize > 0 && HeadSize + NewHead.PackedSize @@ -4693,8 +4642,7 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, const TActorCon return true; } -void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) -{ +void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) { if (QuotaDeadline == TInstant::Zero() || QuotaDeadline > ctx.Now()) return; @@ -4702,8 +4650,8 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::OVERLOAD, "quota exceeded"); if (w.IsWrite()) { const auto& msg = w.GetWrite().Msg; - Counters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); - Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size()); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size()); WriteInflightSize -= msg.Data.size(); } } @@ -4714,8 +4662,7 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) } -void TPartition::HandleWrites(const TActorContext& ctx) -{ +void TPartition::HandleWrites(const TActorContext& ctx) { Become(&TThis::StateWrite); THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); @@ -4753,8 +4700,7 @@ void TPartition::HandleWrites(const TActorContext& ctx) } -void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription) -{ +void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription) { ui32 count = 0; ui32 size = 0; @@ -4795,13 +4741,13 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u )); const auto& resp = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get())->Response; if (info.IsSubscription) { - Counters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_OK].Increment(1); + TabletCounters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_OK].Increment(1); } - Counters.Cumulative()[COUNTER_PQ_READ_HEAD_ONLY_OK].Increment(1); - Counters.Percentile()[COUNTER_LATENCY_PQ_READ_HEAD_ONLY].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds()); - Counters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp.ByteSize()); + TabletCounters.Cumulative()[COUNTER_PQ_READ_HEAD_ONLY_OK].Increment(1); + TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_HEAD_ONLY].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds()); + TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp.ByteSize()); ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release()); - ReportLabeledCounters(ctx); + ReportCounters(ctx); OnReadRequestFinished(std::move(info), answer.Size); return; } @@ -4816,21 +4762,24 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u ctx.Send(BlobCache, request.Release()); } -void TPartition::Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx) -{ +void TPartition::Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx) { const ui64 cookie = ev->Cookie; LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "Got quota. Topic: \"" << TopicConverter->GetClientsideName() << "\". Partition: " - << Partition << ": " << ev->Get()->Result << ". Cookie: " << cookie + "Got quota." << + " Topic: \"" << TopicConverter->GetClientsideName() << "\"." << + " Partition: " << Partition << ": " << ev->Get()->Result << "." << + " Cookie: " << cookie ); // Check if (Y_UNLIKELY(ev->Get()->Result != TEvQuota::TEvClearance::EResult::Success)) { - Y_VERIFY(ev->Get()->Result != TEvQuota::TEvClearance::EResult::Deadline); // We set deadline == inf in quota request. + // We set deadline == inf in quota request. + Y_VERIFY(ev->Get()->Result != TEvQuota::TEvClearance::EResult::Deadline); LOG_ERROR_S( ctx, NKikimrServices::PERSQUEUE, - "Got quota error. Topic: \"" << TopicConverter->GetClientsideName() << "\". Partition " << Partition - << ": " << ev->Get()->Result + "Got quota error." << + " Topic: \"" << TopicConverter->GetClientsideName() << "\"." << + " Partition " << Partition << ": " << ev->Get()->Result ); ctx.Send(Tablet, new TEvents::TEvPoisonPill()); return; @@ -4853,25 +4802,24 @@ void TPartition::Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& c HandleWrites(ctx); } -size_t TPartition::GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request) -{ - if (AppData()->PQConfig.GetQuotingConfig().GetTopicWriteQuotaEntityToLimit() == NKikimrPQ::TPQConfig::TQuotingConfig::USER_PAYLOAD_SIZE) { +size_t TPartition::GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request) { + if (AppData()->PQConfig.GetQuotingConfig().GetTopicWriteQuotaEntityToLimit() == + NKikimrPQ::TPQConfig::TQuotingConfig::USER_PAYLOAD_SIZE) { return WriteNewSize; } else { - size_t dataSize = 0; - for (const auto& cmdWrite : request.Record.GetCmdWrite()) { - dataSize += cmdWrite.GetValue().size(); - } - return dataSize; + return std::accumulate(request.Record.GetCmdWrite().begin(), request.Record.GetCmdWrite().end(), 0ul, + [](size_t sum, const auto& el) { return sum + el.GetValue().size(); }); } } -void TPartition::RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie) -{ +void TPartition::RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie) { LOG_DEBUG_S( TActivationContext::AsActorContext(), NKikimrServices::PERSQUEUE, - "Send write quota request. Topic: \"" << TopicConverter->GetClientsideName() << "\". Partition: " << Partition - << ". Amount: " << dataSize << ". Cookie: " << cookie + "Send write quota request." << + " Topic: \"" << TopicConverter->GetClientsideName() << "\"." << + " Partition: " << Partition << "." << + " Amount: " << dataSize << "." << + " Cookie: " << cookie ); Send(MakeQuoterServiceID(), @@ -4887,8 +4835,7 @@ bool TPartition::WaitingForPreviousBlobQuota() const { return TopicQuotaRequestCookie != 0; } -void TPartition::WriteBlobWithQuota(THolder<TEvKeyValue::TEvRequest>&& request) -{ +void TPartition::WriteBlobWithQuota(THolder<TEvKeyValue::TEvRequest>&& request) { // Request quota and write blob. // Mirrored topics are not quoted in local dc. const bool skip = !IsQuotingEnabled() || TopicWriteQuotaResourcePath.empty(); @@ -4912,8 +4859,7 @@ void TPartition::WriteBlobWithQuota(THolder<TEvKeyValue::TEvRequest>&& request) #endif } -void TPartition::CalcTopicWriteQuotaParams() -{ +void TPartition::CalcTopicWriteQuotaParams() { const auto& pqConfig = AppData()->PQConfig; const auto& quotingConfig = pqConfig.GetQuotingConfig(); if (IsQuotingEnabled()) { // Mirrored topics are not quoted in local dc. @@ -4944,8 +4890,8 @@ void TPartition::CalcTopicWriteQuotaParams() void TPartition::CreateMirrorerActor() { Mirrorer = MakeHolder<TMirrorerInfo>( - Register(new TMirrorer(Tablet, SelfId(), TopicConverter, Partition, IsLocalDC, EndOffset, Config.GetPartitionConfig().GetMirrorFrom(), Counters)), - Counters + Register(new TMirrorer(Tablet, SelfId(), TopicConverter, Partition, IsLocalDC, EndOffset, Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters)), + TabletCounters ); } @@ -4955,5 +4901,4 @@ bool TPartition::IsQuotingEnabled() const { return IsLocalDC && !pqConfig.GetTopicsAreFirstClassCitizen() && quotingConfig.GetEnableQuoting(); } -}// NPQ -}// NKikimr +} // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 0cb644ac12..bdd7ed90db 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -1,218 +1,170 @@ #pragma once #include <util/generic/set.h> -#include <util/system/hp_timer.h> -#include <ydb/core/base/quoter.h> -#include <ydb/core/keyvalue/keyvalue_events.h> #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> #include <library/cpp/sliding_window/sliding_window.h> -#include <ydb/core/protos/pqconfig.pb.h> -#include <ydb/core/persqueue/events/internal.h> +#include <ydb/core/keyvalue/keyvalue_events.h> #include <ydb/library/persqueue/counter_time_keeper/counter_time_keeper.h> -#include <ydb/library/persqueue/topic_parser/topic_parser.h> -#include "key.h" #include "blob.h" -#include "subscriber.h" #include "header.h" -#include "user_info.h" +#include "key.h" +#include "partition_types.h" #include "sourceid.h" -#include "ownerinfo.h" +#include "subscriber.h" +#include "user_info.h" -#include <variant> -namespace NKikimr { -namespace NPQ { +namespace NKikimr::NPQ { -class TKeyLevel; +static const ui32 MAX_BLOB_PART_SIZE = 500_KB; -static const ui32 MAX_BLOB_PART_SIZE = 500 << 10; //500Kb +ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 headOffset); typedef TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor> TPartitionLabeledCounters; - -struct TDataKey { - TKey Key; - ui32 Size; - TInstant Timestamp; - ui64 CumulativeSize; -}; - -ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 headOffset); - +class TKeyLevel; struct TMirrorerInfo; class TPartition : public TActorBootstrapped<TPartition> { private: - static constexpr ui32 MAX_ERRORS_COUNT_TO_STORE = 10; + static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10; private: struct THasDataReq; struct THasDataDeadline; - //answer for requests when data arrives and drop deadlined requests - void ProcessHasDataRequests(const TActorContext& ctx); - - void FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx); - void ProcessUserActs(TUserInfo& userInfo, const TActorContext& ctx); - void ReplyError(const TActorContext& ctx, const ui64 dst, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error); void ReplyErrorForStoredWrites(const TActorContext& ctx); - void ReplyOk(const TActorContext& ctx, const ui64 dst); - void ReplyWrite( - const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, - ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, - ui64 partitionQuotedTime, TDuration topicQuotedTime, ui64 queueTime, ui64 writeTime); - void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, - const TInstant writeTimestamp, const TInstant createTimestamp); + void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp); + void ReplyOk(const TActorContext& ctx, const ui64 dst); void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie); - void Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorContext& ctx); - - void Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContext& ctx); - void Handle(NReadSpeedLimiterEvents::TEvCounters::TPtr& ev, const TActorContext& ctx); - - //answer for reads for Timestamps - void Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& ctx); - void Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx); - void ProcessTimestampRead(const TActorContext& ctx); - - void HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx); - - void HandleGetDiskStatus(const NKikimrClient::TResponse& res, const TActorContext& ctx); - void HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx); - void HandleDataRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx); - void HandleMetaRead(const NKikimrClient::TKeyValueResponse::TReadResult& response, const TActorContext& ctx); + void ReplyWrite(const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, ui64 partitionQuotedTime, TDuration topicQuotedTime, ui64 queueTime, ui64 writeTime); - //forms DataKeysBody and other partition's info - //ctx here only for logging + void AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx); + void AnswerCurrentWrites(const TActorContext& ctx); + void CancelAllWritesOnIdle(const TActorContext& ctx); + void CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode); + void ClearOldHead(const ui64 offset, const ui16 partNo, TEvKeyValue::TEvRequest* request); + void CreateMirrorerActor(); + void DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const TActorContext& ctx); + void FailBadClient(const TActorContext& ctx); void FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx); - //will form head and request data keys from head or finish initialization + void FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx); + void FilterDeadlinedWrites(const TActorContext& ctx); void FormHeadAndProceed(const TActorContext& ctx); - void HandleDataRead(const NKikimrClient::TResponse& range, const TActorContext& ctx); - void InitComplete(const TActorContext& ctx); - - - void Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ctx); - void ProcessChangeOwnerRequests(const TActorContext& ctx); - void ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx); + void Handle(NReadSpeedLimiterEvents::TEvCounters::TPtr& ev, const TActorContext& ctx); + void Handle(NReadSpeedLimiterEvents::TEvResponse::TPtr& ev, const TActorContext& ctx); + void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx); - + void Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx); - + void Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvGetClientOffset::TPtr& ev, const TActorContext& ctx); - - void Handle(TEvPQ::TEvUpdateWriteTimestamp::TPtr& ev, const TActorContext& ctx); - - void Handle(TEvPQ::TEvSetClientInfo::TPtr& ev, const TActorContext& ctx); - void WriteClientInfo(const ui64 cookie, TUserInfo& ui, const TActorContext& ctx); - - - void HandleOnInit(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx); - void HandleOnInit(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQ::TEvGetPartitionClientInfo::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx); + void Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx); - void Handle(TEvPQ::TEvGetPartitionClientInfo::TPtr& ev, const TActorContext& ctx); - - void Handle(TEvPersQueue::TEvReportPartitionError::TPtr& ev, const TActorContext& ctx); - void LogAndCollectError(const NKikimrPQ::TStatusResponse::TErrorMessage& error, const TActorContext& ctx); - void LogAndCollectError(NKikimrServices::EServiceKikimr service, const TString& msg, const TActorContext& ctx); - - void HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr& ev, const TActorContext& ctx); - void HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr& ev, const TActorContext& ctx); - + void Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvQuotaDeadlineCheck::TPtr& ev, const TActorContext& ctx); - - void UpdateAvailableSize(const TActorContext& ctx); - void ScheduleUpdateAvailableSize(const TActorContext& ctx); - - void Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorContext& ctx); - + void Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ctx); - void HandleWakeup(const TActorContext& ctx); + void Handle(TEvPQ::TEvReserveBytes::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQ::TEvSetClientInfo::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQ::TEvUpdateWriteTimestamp::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPersQueue::TEvReportPartitionError::TPtr& ev, const TActorContext& ctx); + void Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx); void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx); - - void Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx); - void Handle(NReadSpeedLimiterEvents::TEvResponse::TPtr& ev, const TActorContext& ctx); - void DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const TActorContext& ctx); - void OnReadRequestFinished(TReadInfo&& info, ui64 answerSize); - - // will return rcount and rsize also - TVector<TRequestedBlob> GetReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize); - TVector<TClientBlob> GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset); - void ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription); - - void HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx); - void HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx); - - void HandleOnIdle(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx); - void HandleOnWrite(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx); - + void HandleDataRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx); + void HandleDataRead(const NKikimrClient::TResponse& range, const TActorContext& ctx); + void HandleGetDiskStatus(const NKikimrClient::TResponse& res, const TActorContext& ctx); + void HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx); + void HandleMetaRead(const NKikimrClient::TKeyValueResponse::TReadResult& response, const TActorContext& ctx); + void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx); void HandleOnIdle(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx); - void HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx); - + void HandleOnIdle(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx); void HandleOnIdle(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx); + void HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr& ev, const TActorContext& ctx); + void HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx); + void HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx); + void HandleOnInit(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx); + void HandleOnInit(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx); + void HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx); + void HandleOnWrite(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx); void HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx); - - void Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx); - bool CleanUp(TEvKeyValue::TEvRequest* request, bool hasWrites, const TActorContext& ctx); - - //will fill sourceIds, request and NewHead - //returns true if head is compacted - bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, TSourceIdWriter& sourceIdWriter); - std::pair<TKey, ui32> GetNewWriteKey(bool headCleared); - void AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx); - - bool ProcessWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx); - void FilterDeadlinedWrites(const TActorContext& ctx); - void SetDeadlinesForWrites(const TActorContext& ctx); - - void ReadTimestampForOffset(const TString& user, TUserInfo& ui, const TActorContext& ctx); - void ProcessTimestampsForNewData(const ui64 prevEndOffset, const TActorContext& ctx); - void ReportLabeledCounters(const TActorContext& ctx); - ui64 GetSizeLag(i64 offset); - - void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx); + void HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr& ev, const TActorContext& ctx); + void HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx); void HandleSetOffsetResponse(NKikimrClient::TResponse& response, const TActorContext& ctx); + void HandleWakeup(const TActorContext& ctx); void HandleWriteResponse(const TActorContext& ctx); - void Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx); - - - void AnswerCurrentWrites(const TActorContext& ctx); - void SyncMemoryStateWithKVState(const TActorContext& ctx); - //only Writes container is filled; only DISK_IS_FULL can be here - void CancelAllWritesOnIdle(const TActorContext& ctx); - //additional contaiters are half-filled, need to clear them too - struct TWriteMsg; // forward - void CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, - const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode); + void InitComplete(const TActorContext& ctx); + void InitUserInfoForImportantClients(const TActorContext& ctx); + void LogAndCollectError(NKikimrServices::EServiceKikimr service, const TString& msg, const TActorContext& ctx); + void LogAndCollectError(const NKikimrPQ::TStatusResponse::TErrorMessage& error, const TActorContext& ctx); - void FailBadClient(const TActorContext& ctx); - void ClearOldHead(const ui64 offset, const ui16 partNo, TEvKeyValue::TEvRequest* request); + void OnReadRequestFinished(TReadInfo&& info, ui64 answerSize); - void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx); + void ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx); + void ProcessChangeOwnerRequests(const TActorContext& ctx); + void ProcessHasDataRequests(const TActorContext& ctx); + void ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription); + void ProcessReserveRequests(const TActorContext& ctx); + void ProcessTimestampRead(const TActorContext& ctx); + void ProcessTimestampsForNewData(const ui64 prevEndOffset, const TActorContext& ctx); + void ProcessUserActs(TUserInfo& userInfo, const TActorContext& ctx); - void InitUserInfoForImportantClients(const TActorContext& ctx); + void ReadTimestampForOffset(const TString& user, TUserInfo& ui, const TActorContext& ctx); + void ReportCounters(const TActorContext& ctx); + void ScheduleUpdateAvailableSize(const TActorContext& ctx); + void SetDeadlinesForWrites(const TActorContext& ctx); + void SetupStreamCounters(const TActorContext& ctx); + void SetupTopicCounters(const TActorContext& ctx); - THashMap<TString, TOwnerInfo>::iterator DropOwner(THashMap<TString, TOwnerInfo>::iterator& it, const TActorContext& ctx); + void SyncMemoryStateWithKVState(const TActorContext& ctx); + void UpdateAvailableSize(const TActorContext& ctx); + void WriteClientInfo(const ui64 cookie, TUserInfo& ui, const TActorContext& ctx); - void Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContext& ctx); + void AddMetaKey(TEvKeyValue::TEvRequest* request); + void BecomeIdle(const TActorContext& ctx); + void CalcTopicWriteQuotaParams(); + void CheckHeadConsistency() const; + void HandleWrites(const TActorContext& ctx); + void RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie); + void WriteBlobWithQuota(THolder<TEvKeyValue::TEvRequest>&& request); - void Handle(TEvPQ::TEvReserveBytes::TPtr& ev, const TActorContext& ctx); - void ProcessReserveRequests(const TActorContext& ctx); + void UpdateUserInfoEndOffset(const TInstant& now); + void UpdateWriteBufferIsFullState(const TInstant& now); - void CreateMirrorerActor(); + TInstant GetWriteTimeEstimate(ui64 offset) const; + bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, TSourceIdWriter& sourceIdWriter); + bool CleanUp(TEvKeyValue::TEvRequest* request, bool hasWrites, const TActorContext& ctx); + bool CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites, const TActorContext& ctx); bool IsQuotingEnabled() const; - - void SetupTopicCounters(const TActorContext& ctx); - void SetupStreamCounters(const TActorContext& ctx); + bool ProcessWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx); + bool WaitingForPreviousBlobQuota() const; + size_t GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request); + std::pair<TInstant, TInstant> GetTime(const TUserInfo& userInfo, ui64 offset) const; + std::pair<TKey, ui32> Compact(const TKey& key, const ui32 size, bool headCleared); + ui32 NextChannel(bool isHead, ui32 blobSize); + ui64 GetSizeLag(i64 offset); + std::pair<TKey, ui32> GetNewWriteKey(bool headCleared); + THashMap<TString, TOwnerInfo>::iterator DropOwner(THashMap<TString, TOwnerInfo>::iterator& it, + const TActorContext& ctx); + // will return rcount and rsize also + TVector<TRequestedBlob> GetReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize); + TVector<TClientBlob> GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset); public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -253,7 +205,7 @@ private: STFUNC(StateInit) { - NPersQueue::TCounterTimeKeeper keeper(Counters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]); + NPersQueue::TCounterTimeKeeper keeper(TabletCounters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]); LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, EventStr("StateInit", ev)); @@ -279,7 +231,7 @@ private: STFUNC(StateIdle) { - NPersQueue::TCounterTimeKeeper keeper(Counters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]); + NPersQueue::TCounterTimeKeeper keeper(TabletCounters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]); LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, EventStr("StateIdle", ev)); @@ -326,7 +278,7 @@ private: STFUNC(StateWrite) { - NPersQueue::TCounterTimeKeeper keeper(Counters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]); + NPersQueue::TCounterTimeKeeper keeper(TabletCounters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]); LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, EventStr("StateWrite", ev)); @@ -371,33 +323,7 @@ private: break; }; } - - bool CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites, const TActorContext& ctx); - std::pair<TKey, ui32> Compact(const TKey& key, const ui32 size, bool headCleared); - - void HandleWrites(const TActorContext& ctx); - void BecomeIdle(const TActorContext& ctx); - - void CheckHeadConsistency() const; - - std::pair<TInstant, TInstant> GetTime(const TUserInfo& userInfo, ui64 offset) const; - TInstant GetWriteTimeEstimate(ui64 offset) const; - - ui32 NextChannel(bool isHead, ui32 blobSize); - - void WriteBlobWithQuota(THolder<TEvKeyValue::TEvRequest>&& request); - void AddMetaKey(TEvKeyValue::TEvRequest* request); - - size_t GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request); - void RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie); - void CalcTopicWriteQuotaParams(); - bool WaitingForPreviousBlobQuota() const; - private: - void UpdateUserInfoEndOffset(const TInstant& now); - - void UpdateWriteBufferIsFullState(const TInstant& now); - enum EInitState { WaitDiskStatus, WaitInfoRange, @@ -406,13 +332,6 @@ private: WaitMetaRead }; - - struct TUserCookie { - TString User; - ui64 Cookie; - }; - - ui64 TabletID; ui32 Partition; NKikimrPQ::TPQTabletConfig Config; @@ -440,111 +359,6 @@ private: EInitState InitState; - struct TWriteMsg { - ui64 Cookie; - TMaybe<ui64> Offset; - TEvPQ::TEvWrite::TMsg Msg; - }; - - struct TOwnershipMsg { - ui64 Cookie; - TString OwnerCookie; - }; - - struct TRegisterMessageGroupMsg { - ui64 Cookie; - TEvPQ::TEvRegisterMessageGroup::TBody Body; - - explicit TRegisterMessageGroupMsg(TEvPQ::TEvRegisterMessageGroup& ev) - : Cookie(ev.Cookie) - , Body(std::move(ev.Body)) - { - } - }; - - struct TDeregisterMessageGroupMsg { - ui64 Cookie; - TEvPQ::TEvDeregisterMessageGroup::TBody Body; - - explicit TDeregisterMessageGroupMsg(TEvPQ::TEvDeregisterMessageGroup& ev) - : Cookie(ev.Cookie) - , Body(std::move(ev.Body)) - { - } - }; - - struct TSplitMessageGroupMsg { - ui64 Cookie; - TVector<TEvPQ::TEvDeregisterMessageGroup::TBody> Deregistrations; - TVector<TEvPQ::TEvRegisterMessageGroup::TBody> Registrations; - - explicit TSplitMessageGroupMsg(ui64 cookie) - : Cookie(cookie) - { - } - }; - - struct TMessage { - std::variant< - TWriteMsg, - TOwnershipMsg, - TRegisterMessageGroupMsg, - TDeregisterMessageGroupMsg, - TSplitMessageGroupMsg - > Body; - - ui64 QuotedTime; - ui64 QueueTime; - ui64 WriteTime; - - template <typename T> - explicit TMessage(T&& body, ui64 quotedTime, ui64 queueTime, ui64 writeTime) - : Body(std::forward<T>(body)) - , QuotedTime(quotedTime) - , QueueTime(queueTime) - , WriteTime(writeTime) - { - } - - ui64 GetCookie() const { - switch (Body.index()) { - case 0: - return std::get<0>(Body).Cookie; - case 1: - return std::get<1>(Body).Cookie; - case 2: - return std::get<2>(Body).Cookie; - case 3: - return std::get<3>(Body).Cookie; - case 4: - return std::get<4>(Body).Cookie; - default: - Y_FAIL("unreachable"); - } - } - - #define DEFINE_CHECKER_GETTER(name, i) \ - bool Is##name() const { \ - return Body.index() == i; \ - } \ - const auto& Get##name() const { \ - Y_VERIFY(Is##name()); \ - return std::get<i>(Body); \ - } \ - auto& Get##name() { \ - Y_VERIFY(Is##name()); \ - return std::get<i>(Body); \ - } - - DEFINE_CHECKER_GETTER(Write, 0) - DEFINE_CHECKER_GETTER(Ownership, 1) - DEFINE_CHECKER_GETTER(RegisterMessageGroup, 2) - DEFINE_CHECKER_GETTER(DeregisterMessageGroup, 3) - DEFINE_CHECKER_GETTER(SplitMessageGroup, 4) - - #undef DEFINE_CHECKER_GETTER - }; - std::deque<TMessage> Requests; std::deque<TMessage> Responses; @@ -574,7 +388,7 @@ private: bool ReadingTimestamp; TString ReadingForUser; ui64 ReadingForUserReadRuleGeneration; - ui64 ReadingForOffset; + ui64 ReadingForOffset; // log only THashMap<ui64, TString> CookieToUser; ui64 SetOffsetCookie; @@ -593,8 +407,8 @@ private: std::deque<THolder<TEvPQ::TEvChangeOwner>> WaitToChangeOwner; - TTabletCountersBase Counters; - THolder<TPartitionLabeledCounters> PartitionLabeledCounters; + TTabletCountersBase TabletCounters; + THolder<TPartitionLabeledCounters> PartitionCounters; TSubscriber Subscriber; @@ -621,7 +435,6 @@ private: TVector<NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>> AvgWriteBytes; TVector<NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>> AvgQuotaBytes; - ui64 ReservedSize; std::deque<THolder<TEvPQ::TEvReserveBytes>> ReserveRequests; @@ -637,9 +450,9 @@ private: THolder<TPercentileCounter> InputTimeLag; THolder<TPercentileCounter> MessageSize; TPercentileCounter WriteLatency; + NKikimr::NPQ::TMultiCounter SLIBigLatency; NKikimr::NPQ::TMultiCounter WritesTotal; - NKikimr::NPQ::TMultiCounter BytesWritten; NKikimr::NPQ::TMultiCounter BytesWrittenUncompressed; NKikimr::NPQ::TMultiCounter BytesWrittenComp; @@ -647,11 +460,13 @@ private: // Writing blob with topic quota variables ui64 TopicQuotaRequestCookie = 0; + // Wait topic quota metrics THolder<TPercentileCounter> TopicWriteQuotaWaitCounter; TInstant StartTopicQuotaWaitTimeForCurrentBlob; TInstant WriteStartTime; TDuration TopicQuotaWaitTimeForCurrentBlob; + // Topic quota parameters TString TopicWriteQuoterPath; TString TopicWriteQuotaResourcePath; @@ -662,5 +477,4 @@ private: THolder<TMirrorerInfo> Mirrorer; }; -}// NPQ -}// NKikimr +} // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_types.h b/ydb/core/persqueue/partition_types.h new file mode 100644 index 0000000000..9b709c990c --- /dev/null +++ b/ydb/core/persqueue/partition_types.h @@ -0,0 +1,130 @@ +#pragma once + +#include <ydb/core/persqueue/events/internal.h> + +#include <util/generic/fwd.h> +#include <util/generic/maybe.h> + +#include <variant> + + +namespace NKikimr::NPQ { + +struct TUserCookie { + TString User; + ui64 Cookie; +}; + +struct TWriteMsg { + ui64 Cookie; + TMaybe<ui64> Offset; + TEvPQ::TEvWrite::TMsg Msg; +}; + +struct TOwnershipMsg { + ui64 Cookie; + TString OwnerCookie; +}; + +struct TRegisterMessageGroupMsg { + ui64 Cookie; + TEvPQ::TEvRegisterMessageGroup::TBody Body; + + explicit TRegisterMessageGroupMsg(TEvPQ::TEvRegisterMessageGroup& ev) + : Cookie(ev.Cookie) + , Body(std::move(ev.Body)) + { + } +}; + +struct TDeregisterMessageGroupMsg { + ui64 Cookie; + TEvPQ::TEvDeregisterMessageGroup::TBody Body; + + explicit TDeregisterMessageGroupMsg(TEvPQ::TEvDeregisterMessageGroup& ev) + : Cookie(ev.Cookie) + , Body(std::move(ev.Body)) + { + } +}; + +struct TSplitMessageGroupMsg { + ui64 Cookie; + TVector<TEvPQ::TEvDeregisterMessageGroup::TBody> Deregistrations; + TVector<TEvPQ::TEvRegisterMessageGroup::TBody> Registrations; + + explicit TSplitMessageGroupMsg(ui64 cookie) + : Cookie(cookie) + { + } +}; + +struct TMessage { + std::variant< + TWriteMsg, + TOwnershipMsg, + TRegisterMessageGroupMsg, + TDeregisterMessageGroupMsg, + TSplitMessageGroupMsg + > Body; + + ui64 QuotedTime; + ui64 QueueTime; + ui64 WriteTime; + + template <typename T> + explicit TMessage(T&& body, ui64 quotedTime, ui64 queueTime, ui64 writeTime) + : Body(std::forward<T>(body)) + , QuotedTime(quotedTime) + , QueueTime(queueTime) + , WriteTime(writeTime) + { + } + + ui64 GetCookie() const { + switch (Body.index()) { + case 0: + return std::get<0>(Body).Cookie; + case 1: + return std::get<1>(Body).Cookie; + case 2: + return std::get<2>(Body).Cookie; + case 3: + return std::get<3>(Body).Cookie; + case 4: + return std::get<4>(Body).Cookie; + default: + Y_FAIL("unreachable"); + } + } + + #define DEFINE_CHECKER_GETTER(name, i) \ + bool Is##name() const { \ + return Body.index() == i; \ + } \ + const auto& Get##name() const { \ + Y_VERIFY(Is##name()); \ + return std::get<i>(Body); \ + } \ + auto& Get##name() { \ + Y_VERIFY(Is##name()); \ + return std::get<i>(Body); \ + } + + DEFINE_CHECKER_GETTER(Write, 0) + DEFINE_CHECKER_GETTER(Ownership, 1) + DEFINE_CHECKER_GETTER(RegisterMessageGroup, 2) + DEFINE_CHECKER_GETTER(DeregisterMessageGroup, 3) + DEFINE_CHECKER_GETTER(SplitMessageGroup, 4) + + #undef DEFINE_CHECKER_GETTER +}; + +struct TDataKey { + TKey Key; + ui32 Size; + TInstant Timestamp; + ui64 CumulativeSize; +}; + +} // namespace NKikimr diff --git a/ydb/core/persqueue/pq_ut.cpp b/ydb/core/persqueue/pq_ut.cpp index e54c24a75b..d68710b156 100644 --- a/ydb/core/persqueue/pq_ut.cpp +++ b/ydb/core/persqueue/pq_ut.cpp @@ -21,13 +21,10 @@ namespace NKikimr { -const static TString TOPIC_NAME = "rt3.dc1--topic"; -Y_UNIT_TEST_SUITE(TPQTest) { +const static TString TOPIC_NAME = "rt3.dc1--topic"; -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// SINGLE COMMAND TEST FUNCTIONS -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +Y_UNIT_TEST_SUITE(TPQTest) { Y_UNIT_TEST(TestGroupsBalancer) { TTestContext tc; @@ -1190,7 +1187,8 @@ Y_UNIT_TEST(TestWritePQ) { tc.Prepare(dispatchName, setup, activeZone); tc.Runtime->SetScheduledLimit(100); - PQTabletPrepare({}, {{"user", true}}, tc); //important client, lifetimeseconds=0 - never delete + // Important client, lifetimeseconds=0 - never delete + PQTabletPrepare({}, {{"user", true}}, tc); TVector<std::pair<ui64, TString>> data, data1, data2; activeZone = PlainOrSoSlow(true, false); @@ -1482,7 +1480,6 @@ Y_UNIT_TEST(TestWriteToFullPartition) { } - Y_UNIT_TEST(TestTimeRetention) { TTestContext tc; RunTestWithReboots(tc.TabletIds, [&]() { @@ -1560,7 +1557,6 @@ Y_UNIT_TEST(TestPQPartialRead) { }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { TFinalizer finalizer(tc); tc.Prepare(dispatchName, setup, activeZone); - tc.Runtime->SetScheduledLimit(200); PQTabletPrepare({}, {{"aaa", true}}, tc); //important client - never delete diff --git a/ydb/core/persqueue/pq_ut.h b/ydb/core/persqueue/pq_ut.h index de8425f855..753613f528 100644 --- a/ydb/core/persqueue/pq_ut.h +++ b/ydb/core/persqueue/pq_ut.h @@ -2,27 +2,10 @@ #include "pq.h" #include "user_info.h" - +#include <ydb/core/testlib/actors/test_runtime.h> #include <ydb/core/testlib/basics/runtime.h> -#include <ydb/core/tablet_flat/tablet_flat_executed.h> -#include <ydb/core/tx/schemeshard/schemeshard.h> -#include <ydb/public/lib/base/msgbus.h> -#include <ydb/core/keyvalue/keyvalue_events.h> -#include <ydb/core/persqueue/events/global.h> -#include <ydb/core/tablet/tablet_counters_aggregator.h> -#include <ydb/core/persqueue/key.h> -#include <ydb/core/keyvalue/keyvalue_events.h> -#include <ydb/core/persqueue/partition.h> -#include <ydb/core/engine/minikql/flat_local_tx_factory.h> -#include <ydb/core/security/ticket_parser.h> - -#include <ydb/core/testlib/fake_scheme_shard.h> #include <ydb/core/testlib/tablet_helpers.h> -#include <library/cpp/testing/unittest/registar.h> - -#include <util/system/sanitizers.h> -#include <util/system/valgrind.h> const bool ENABLE_DETAILED_PQ_LOG = false; const bool ENABLE_DETAILED_KV_LOG = false; @@ -233,915 +216,183 @@ struct TTabletPreparationParameters { ui32 specVersion{0}; i32 storageLimitBytes{0}; }; -void PQTabletPrepare(const TTabletPreparationParameters& parameters, - const TVector<std::pair<TString, bool>>& users, TTestContext& tc) { - TAutoPtr<IEventHandle> handle; - static int version = 0; - if (parameters.specVersion) { - version = parameters.specVersion; - } else { - ++version; - } - for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { - try { - tc.Runtime->ResetScheduledCount(); - - THolder<TEvPersQueue::TEvUpdateConfig> request(new TEvPersQueue::TEvUpdateConfig()); - for (ui32 i = 0; i < parameters.partitions; ++i) { - request->Record.MutableTabletConfig()->AddPartitionIds(i); - } - request->Record.MutableTabletConfig()->SetCacheSize(10_MB); - request->Record.SetTxId(12345); - auto tabletConfig = request->Record.MutableTabletConfig(); - if (tc.Runtime->GetAppData().PQConfig.GetTopicsAreFirstClassCitizen()) { - tabletConfig->SetTopicName("topic"); - tabletConfig->SetTopicPath(tc.Runtime->GetAppData().PQConfig.GetDatabase() + "/topic"); - } else { - tabletConfig->SetTopicName("rt3.dc1--topic"); - tabletConfig->SetTopicPath("/Root/PQ/rt3.dc1--topic"); - } - tabletConfig->SetTopic("topic"); - tabletConfig->SetVersion(version); - tabletConfig->SetLocalDC(parameters.localDC); - tabletConfig->AddReadRules("user"); - tabletConfig->AddReadFromTimestampsMs(parameters.readFromTimestampsMs); - auto config = tabletConfig->MutablePartitionConfig(); - config->SetMaxCountInPartition(parameters.maxCountInPartition); - config->SetMaxSizeInPartition(parameters.maxSizeInPartition); - if (parameters.storageLimitBytes > 0) { - config->SetStorageLimitBytes(parameters.storageLimitBytes); - } else { - config->SetLifetimeSeconds(parameters.deleteTime); - } - config->SetSourceIdLifetimeSeconds(TDuration::Hours(1).Seconds()); - if (parameters.sidMaxCount > 0) - config->SetSourceIdMaxCounts(parameters.sidMaxCount); - config->SetMaxWriteInflightSize(90'000'000); - config->SetLowWatermark(parameters.lowWatermark); - - for (auto& u : users) { - if (u.second) - config->AddImportantClientId(u.first); - if (u.first != "user") - tabletConfig->AddReadRules(u.first); - } - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - TEvPersQueue::TEvUpdateConfigResponse* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvUpdateConfigResponse>(handle); - - UNIT_ASSERT(result); - auto& rec = result->Record; - UNIT_ASSERT(rec.HasStatus() && rec.GetStatus() == NKikimrPQ::OK); - UNIT_ASSERT(rec.HasTxId() && rec.GetTxId() == 12345); - UNIT_ASSERT(rec.HasOrigin() && result->GetOrigin() == 1); - retriesLeft = 0; - } catch (NActors::TSchedulingLimitReachedException) { - UNIT_ASSERT(retriesLeft >= 1); - } - } - TEvKeyValue::TEvResponse *result; - THolder<TEvKeyValue::TEvRequest> request; - for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { - try { - - request.Reset(new TEvKeyValue::TEvRequest); - auto read = request->Record.AddCmdRead(); - read->SetKey("_config"); - - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); - - UNIT_ASSERT(result); - UNIT_ASSERT(result->Record.HasStatus()); - UNIT_ASSERT_EQUAL(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK); - retriesLeft = 0; - } catch (NActors::TSchedulingLimitReachedException) { - UNIT_ASSERT(retriesLeft >= 1); - } - } -} - - - -void BalancerPrepare(const TString topic, const TVector<std::pair<ui32, std::pair<ui64, ui32>>>& map, const ui64 ssId, TTestContext& tc, const bool requireAuth = false) { - TAutoPtr<IEventHandle> handle; - static int version = 0; - ++version; - - for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { - try { - tc.Runtime->ResetScheduledCount(); - - THolder<TEvPersQueue::TEvUpdateBalancerConfig> request(new TEvPersQueue::TEvUpdateBalancerConfig()); - for (const auto& p : map) { - auto part = request->Record.AddPartitions(); - part->SetPartition(p.first); - part->SetGroup(p.second.second); - part->SetTabletId(p.second.first); - - auto tablet = request->Record.AddTablets(); - tablet->SetTabletId(p.second.first); - tablet->SetOwner(1); - tablet->SetIdx(p.second.first); - } - request->Record.SetTxId(12345); - request->Record.SetPathId(1); - request->Record.SetVersion(version); - request->Record.SetTopicName(topic); - request->Record.SetPath("path"); - request->Record.SetSchemeShardId(ssId); - request->Record.MutableTabletConfig()->AddReadRules("client"); - request->Record.MutableTabletConfig()->SetRequireAuthWrite(requireAuth); - request->Record.MutableTabletConfig()->SetRequireAuthRead(requireAuth); - - tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - TEvPersQueue::TEvUpdateConfigResponse* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvUpdateConfigResponse>(handle); - - UNIT_ASSERT(result); - auto& rec = result->Record; - UNIT_ASSERT(rec.HasStatus() && rec.GetStatus() == NKikimrPQ::OK); - UNIT_ASSERT(rec.HasTxId() && rec.GetTxId() == 12345); - UNIT_ASSERT(rec.HasOrigin() && result->GetOrigin() == tc.BalancerTabletId); - retriesLeft = 0; - } catch (NActors::TSchedulingLimitReachedException) { - UNIT_ASSERT(retriesLeft >= 1); - } - } - //TODO: check state - TTestActorRuntime& runtime = *tc.Runtime; - - ForwardToTablet(runtime, tc.BalancerTabletId, tc.Edge, new TEvents::TEvPoisonPill()); - TDispatchOptions rebootOptions; - rebootOptions.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 2)); - runtime.DispatchEvents(rebootOptions); -} - - -void PQGetPartInfo(ui64 startOffset, ui64 endOffset, TTestContext& tc) { - TAutoPtr<IEventHandle> handle; - TEvPersQueue::TEvOffsetsResponse *result; - THolder<TEvPersQueue::TEvOffsets> request; - - for (i32 retriesLeft = 3; retriesLeft > 0; --retriesLeft) { - try { - - tc.Runtime->ResetScheduledCount(); - request.Reset(new TEvPersQueue::TEvOffsets); - - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvOffsetsResponse>(handle); - UNIT_ASSERT(result); - - if (result->Record.PartResultSize() == 0 || - result->Record.GetPartResult(0).GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress - retriesLeft = 3; - continue; - } - - UNIT_ASSERT(result->Record.PartResultSize()); - UNIT_ASSERT_VALUES_EQUAL((ui64)result->Record.GetPartResult(0).GetStartOffset(), startOffset); - UNIT_ASSERT_VALUES_EQUAL((ui64)result->Record.GetPartResult(0).GetEndOffset(), endOffset); - retriesLeft = 0; - } catch (NActors::TSchedulingLimitReachedException) { - UNIT_ASSERT(retriesLeft > 0); - } - } - -} - -void RestartTablet(TTestContext& tc) { - TTestActorRuntime& runtime = *tc.Runtime; - - ForwardToTablet(runtime, tc.TabletId, tc.Edge, new TEvents::TEvPoisonPill()); - TDispatchOptions rebootOptions; - rebootOptions.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 2)); - runtime.DispatchEvents(rebootOptions); -} - - -TActorId SetOwner(const ui32 partition, TTestContext& tc, const TString& owner, bool force) { - TActorId pipeClient = tc.Runtime->ConnectToPipe(tc.TabletId, tc.Edge, 0, GetPipeConfigWithRetries()); - - THolder<TEvPersQueue::TEvRequest> request; - - request.Reset(new TEvPersQueue::TEvRequest); - auto req = request->Record.MutablePartitionRequest(); - req->SetPartition(partition); - req->MutableCmdGetOwnership()->SetOwner(owner); - req->MutableCmdGetOwnership()->SetForce(force); - ActorIdToProto(pipeClient, req->MutablePipeClient()); - - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries(), pipeClient); - return pipeClient; -} - -TActorId RegisterReadSession(const TString& session, TTestContext& tc, const TVector<ui32>& groups = {}) { - TActorId pipeClient = tc.Runtime->ConnectToPipe(tc.BalancerTabletId, tc.Edge, 0, GetPipeConfigWithRetries()); - - THolder<TEvPersQueue::TEvRegisterReadSession> request; - - request.Reset(new TEvPersQueue::TEvRegisterReadSession); - auto& req = request->Record; - req.SetSession(session); - ActorIdToProto(pipeClient, req.MutablePipeClient()); - req.SetClientId("user"); - for (const auto& g : groups) { - req.AddGroups(g); - } - - tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries(), pipeClient); - return pipeClient; -} - -void WaitSessionKill(TTestContext& tc) { - TAutoPtr<IEventHandle> handle; - - tc.Runtime->ResetScheduledCount(); - - TEvPersQueue::TEvError *result; - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvError>(handle); - UNIT_ASSERT(result); - Cerr << "ANS: " << result->Record << "\n"; -// UNIT_ASSERT_EQUAL(result->Record.GetSession(), session); -} - - -void WaitPartition(const TString &session, TTestContext& tc, ui32 partition, const TString& sessionToRelease, const TString& topic, const TActorId& pipe, bool ok = true) { - TAutoPtr<IEventHandle> handle; - - tc.Runtime->ResetScheduledCount(); - - for (ui32 i = 0; i < 3; ++i) { - Cerr << "STEP " << i << " ok " << ok << "\n"; - - try { - tc.Runtime->ResetScheduledCount(); - if (i % 2 == 0) { - TEvPersQueue::TEvLockPartition *result; - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvLockPartition>(handle); - UNIT_ASSERT(result); - Cerr << "ANS: " << result->Record << "\n"; - UNIT_ASSERT(ok); - UNIT_ASSERT_EQUAL(result->Record.GetSession(), session); - break; - } else { - TEvPersQueue::TEvReleasePartition *result; - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvReleasePartition>(handle); - UNIT_ASSERT(result); - - Cerr << "ANS2: " << result->Record << "\n"; - - UNIT_ASSERT_EQUAL(result->Record.GetSession(), sessionToRelease); - UNIT_ASSERT(ok); - - THolder<TEvPersQueue::TEvPartitionReleased> request; - - request.Reset(new TEvPersQueue::TEvPartitionReleased); - auto& req = request->Record; - req.SetSession(sessionToRelease); - req.SetPartition(partition); - req.SetTopic(topic); - req.SetClientId("user"); - ActorIdToProto(pipe, req.MutablePipeClient()); - - tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries(), pipe); - } - } catch (NActors::TSchedulingLimitReachedException) { - UNIT_ASSERT(i < 2 || !ok); - } catch (NActors::TEmptyEventQueueException) { - UNIT_ASSERT(i < 2 || !ok); - } - } -} - - -std::pair<TString, TActorId> CmdSetOwner(const ui32 partition, TTestContext& tc, const TString& owner = "default", bool force = true) { - TAutoPtr<IEventHandle> handle; - TEvPersQueue::TEvResponse *result; - TString cookie; - TActorId pipeClient; - for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { - try { - tc.Runtime->ResetScheduledCount(); - - pipeClient = SetOwner(partition, tc, owner, force); - - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); - - UNIT_ASSERT(result); - UNIT_ASSERT(result->Record.HasStatus()); - if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress - retriesLeft = 3; - continue; - } - - if (result->Record.GetErrorReason().StartsWith("ownership session is killed by another session with id ")) { - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); - UNIT_ASSERT(result); - UNIT_ASSERT(result->Record.HasStatus()); - } - - if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress - retriesLeft = 3; - continue; - } - - UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); - UNIT_ASSERT(result->Record.HasPartitionResponse()); - UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdGetOwnershipResult()); - UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdGetOwnershipResult().HasOwnerCookie()); - cookie = result->Record.GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie(); - UNIT_ASSERT(!cookie.empty()); - retriesLeft = 0; - } catch (NActors::TSchedulingLimitReachedException) { - UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); - } - } - return std::make_pair(cookie, pipeClient); -} - - -void WritePartData(const ui32 partition, const TString& sourceId, const i64 offset, const ui64 seqNo, const ui16 partNo, const ui16 totalParts, - const ui32 totalSize, const TString& data, TTestContext& tc, const TString& cookie, i32 msgSeqNo) -{ - THolder<TEvPersQueue::TEvRequest> request; - tc.Runtime->ResetScheduledCount(); - request.Reset(new TEvPersQueue::TEvRequest); - auto req = request->Record.MutablePartitionRequest(); - req->SetPartition(partition); - req->SetOwnerCookie(cookie); - req->SetMessageNo(msgSeqNo); - if (offset != -1) - req->SetCmdWriteOffset(offset); - auto write = req->AddCmdWrite(); - write->SetSourceId(sourceId); - write->SetSeqNo(seqNo); - write->SetPartNo(partNo); - write->SetTotalParts(totalParts); - if (partNo == 0) - write->SetTotalSize(totalSize); - write->SetData(data); - - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); -} - -void WritePartDataWithBigMsg(const ui32 partition, const TString& sourceId, const ui64 seqNo, const ui16 partNo, const ui16 totalParts, - const ui32 totalSize, const TString& data, TTestContext& tc, const TString& cookie, i32 msgSeqNo, ui32 bigMsgSize) -{ - THolder<TEvPersQueue::TEvRequest> request; - tc.Runtime->ResetScheduledCount(); - request.Reset(new TEvPersQueue::TEvRequest); - auto req = request->Record.MutablePartitionRequest(); - req->SetPartition(partition); - req->SetOwnerCookie(cookie); - req->SetMessageNo(msgSeqNo); - - TString bigData(bigMsgSize, 'a'); - - auto write = req->AddCmdWrite(); - write->SetSourceId(sourceId); - write->SetSeqNo(seqNo); - write->SetData(bigData); - - write = req->AddCmdWrite(); - write->SetSourceId(sourceId); - write->SetSeqNo(seqNo + 1); - write->SetPartNo(partNo); - write->SetTotalParts(totalParts); - if (partNo == 0) - write->SetTotalSize(totalSize); - write->SetData(data); - - - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); -} - - - -void WriteData(const ui32 partition, const TString& sourceId, const TVector<std::pair<ui64, TString>> data, TTestContext& tc, - const TString& cookie, i32 msgSeqNo, i64 offset, bool disableDeduplication = false) -{ - THolder<TEvPersQueue::TEvRequest> request; - tc.Runtime->ResetScheduledCount(); - request.Reset(new TEvPersQueue::TEvRequest); - auto req = request->Record.MutablePartitionRequest(); - req->SetPartition(partition); - req->SetOwnerCookie(cookie); - req->SetMessageNo(msgSeqNo); - if (offset >= 0) - req->SetCmdWriteOffset(offset); - for (auto& p : data) { - auto write = req->AddCmdWrite(); - write->SetSourceId(sourceId); - write->SetSeqNo(p.first); - write->SetData(p.second); - write->SetDisableDeduplication(disableDeduplication); - } - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); -} - -void CmdWrite(const ui32 partition, const TString& sourceId, const TVector<std::pair<ui64, TString>> data, - TTestContext& tc, bool error = false, const THashSet<ui32>& alreadyWrittenSeqNo = {}, - bool isFirst = false, const TString& ownerCookie = "", i32 msn = -1, i64 offset = -1, - bool treatWrongCookieAsError = false, bool treatBadOffsetAsError = true, - bool disableDeduplication = false) { - TAutoPtr<IEventHandle> handle; - TEvPersQueue::TEvResponse *result; - - ui32& msgSeqNo = tc.MsgSeqNoMap[partition]; - if (msn != -1) msgSeqNo = msn; - TString cookie = ownerCookie; - for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { - try { - WriteData(partition, sourceId, data, tc, cookie, msgSeqNo, offset, disableDeduplication); - result = tc.Runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle, [](const TEvPersQueue::TEvResponse& ev){ - if (ev.Record.HasPartitionResponse() && ev.Record.GetPartitionResponse().CmdWriteResultSize() > 0 || ev.Record.GetErrorCode() != NPersQueue::NErrorCode::OK) - return true; - return false; - }); //there could be outgoing reads in TestReadSubscription test - - UNIT_ASSERT(result); - UNIT_ASSERT(result->Record.HasStatus()); - if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress - retriesLeft = 3; - continue; - } - - if (!treatWrongCookieAsError && result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE) { - cookie = CmdSetOwner(partition, tc).first; - msgSeqNo = 0; - retriesLeft = 3; - continue; - } - - if (!treatBadOffsetAsError && result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRITE_ERROR_BAD_OFFSET) { - return; - } - - if (error) { - UNIT_ASSERT( - result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL || - result->Record.GetErrorCode() == NPersQueue::NErrorCode::BAD_REQUEST || - result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE - ); - break; - } else { - Cerr << result->Record.GetErrorReason(); - UNIT_ASSERT_VALUES_EQUAL((ui32)result->Record.GetErrorCode(), (ui32)NPersQueue::NErrorCode::OK); - } - UNIT_ASSERT_VALUES_EQUAL(result->Record.GetPartitionResponse().CmdWriteResultSize(), data.size()); - - for (ui32 i = 0; i < data.size(); ++i) { - UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasAlreadyWritten()); - UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasOffset()); - UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasMaxSeqNo() == - result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetAlreadyWritten()); - if (result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasMaxSeqNo()) { - UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetMaxSeqNo() >= (i64)data[i].first); - } - if (isFirst || offset != -1) { - UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetAlreadyWritten() - || result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetOffset() == i + (offset == -1 ? 0 : offset)); - } - } - for (ui32 i = 0; i < data.size(); ++i) { - auto res = result->Record.GetPartitionResponse().GetCmdWriteResult(i); - UNIT_ASSERT(!alreadyWrittenSeqNo.contains(res.GetSeqNo()) || res.GetAlreadyWritten()); - } - retriesLeft = 0; - } catch (NActors::TSchedulingLimitReachedException) { - UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); - retriesLeft = 3; - } - } - ++msgSeqNo; -} - - -void ReserveBytes(const ui32 partition, TTestContext& tc, - const TString& cookie, i32 msgSeqNo, i64 size, const TActorId& pipeClient, bool lastRequest) -{ - THolder<TEvPersQueue::TEvRequest> request; - tc.Runtime->ResetScheduledCount(); - request.Reset(new TEvPersQueue::TEvRequest); - auto req = request->Record.MutablePartitionRequest(); - req->SetPartition(partition); - req->SetOwnerCookie(cookie); - req->SetMessageNo(msgSeqNo); - ActorIdToProto(pipeClient, req->MutablePipeClient()); - req->MutableCmdReserveBytes()->SetSize(size); - req->MutableCmdReserveBytes()->SetLastRequest(lastRequest); - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - - tc.Runtime->DispatchEvents(); -} - - -void CmdReserveBytes(const ui32 partition, TTestContext& tc, const TString& ownerCookie, i32 msn, i64 size, TActorId pipeClient, bool noAnswer = false, bool lastRequest = false) { - TAutoPtr<IEventHandle> handle; - TEvPersQueue::TEvResponse *result; - - ui32& msgSeqNo = tc.MsgSeqNoMap[partition]; - if (msn != -1) msgSeqNo = msn; - TString cookie = ownerCookie; - - for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { - try { - ReserveBytes(partition, tc, cookie, msgSeqNo, size, pipeClient, lastRequest); - result = tc.Runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle, [](const TEvPersQueue::TEvResponse& ev){ - if (!ev.Record.HasPartitionResponse() || !ev.Record.GetPartitionResponse().HasCmdReadResult()) - return true; - return false; - }); //there could be outgoing reads in TestReadSubscription test - - UNIT_ASSERT(result); - UNIT_ASSERT(result->Record.HasStatus()); - - if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - retriesLeft = 3; - continue; - } - - if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE) { - auto p = CmdSetOwner(partition, tc); - pipeClient = p.second; - cookie = p.first; - msgSeqNo = 0; - retriesLeft = 3; - continue; - } - UNIT_ASSERT(!noAnswer); - - UNIT_ASSERT_C(result->Record.GetErrorCode() == NPersQueue::NErrorCode::OK, result->Record); - - retriesLeft = 0; - } catch (NActors::TSchedulingLimitReachedException) { - if (noAnswer) - break; - UNIT_ASSERT(retriesLeft == 2); - } - } - ++msgSeqNo; -} - - -void CmdSetOffset(const ui32 partition, const TString& user, ui64 offset, bool error, TTestContext& tc, const TString& session = "") { - TAutoPtr<IEventHandle> handle; - TEvPersQueue::TEvResponse *result; - THolder<TEvPersQueue::TEvRequest> request; - for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { - try { - tc.Runtime->ResetScheduledCount(); - request.Reset(new TEvPersQueue::TEvRequest); - auto req = request->Record.MutablePartitionRequest(); - req->SetPartition(partition); - auto off = req->MutableCmdSetClientOffset(); - off->SetClientId(user); - off->SetOffset(offset); - if (!session.empty()) - off->SetSessionId(session); - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); - - UNIT_ASSERT(result); - UNIT_ASSERT(result->Record.HasStatus()); - if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress - retriesLeft = 3; - continue; - } - if ((result->Record.GetErrorCode() == NPersQueue::NErrorCode::SET_OFFSET_ERROR_COMMIT_TO_FUTURE || - result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE) && error) { - break; - } - UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); - retriesLeft = 0; - } catch (NActors::TSchedulingLimitReachedException) { - UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); - } - } -} - - -void CmdCreateSession(const ui32 partition, const TString& user, const TString& session, TTestContext& tc, const i64 offset = 0, - const ui32 gen = 0, const ui32 step = 0, bool error = false) { - TAutoPtr<IEventHandle> handle; - TEvPersQueue::TEvResponse *result; - THolder<TEvPersQueue::TEvRequest> request; - for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { - try { - tc.Runtime->ResetScheduledCount(); - request.Reset(new TEvPersQueue::TEvRequest); - auto req = request->Record.MutablePartitionRequest(); - req->SetPartition(partition); - auto off = req->MutableCmdCreateSession(); - off->SetClientId(user); - off->SetSessionId(session); - off->SetGeneration(gen); - off->SetStep(step); - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); - - UNIT_ASSERT(result); - UNIT_ASSERT(result->Record.HasStatus()); - if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress - retriesLeft = 3; - continue; - } - - if (error) { - UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::WRONG_COOKIE); - return; - } - - UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); - - UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdGetClientOffsetResult()); - auto resp = result->Record.GetPartitionResponse().GetCmdGetClientOffsetResult(); - UNIT_ASSERT(resp.HasOffset() && (i64)resp.GetOffset() == offset); - retriesLeft = 0; - } catch (NActors::TSchedulingLimitReachedException) { - UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); - } - } -} - -void CmdKillSession(const ui32 partition, const TString& user, const TString& session, TTestContext& tc) { - TAutoPtr<IEventHandle> handle; - TEvPersQueue::TEvResponse *result; - THolder<TEvPersQueue::TEvRequest> request; - for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { - try { - tc.Runtime->ResetScheduledCount(); - request.Reset(new TEvPersQueue::TEvRequest); - auto req = request->Record.MutablePartitionRequest(); - req->SetPartition(partition); - auto off = req->MutableCmdDeleteSession(); - off->SetClientId(user); - off->SetSessionId(session); - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); - - UNIT_ASSERT(result); - UNIT_ASSERT(result->Record.HasStatus()); - if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress - retriesLeft = 3; - continue; - } - UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); - retriesLeft = 0; - } catch (NActors::TSchedulingLimitReachedException) { - UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); - } - } -} - - - -void CmdGetOffset(const ui32 partition, const TString& user, i64 offset, TTestContext& tc, i64 ctime = -1, ui64 writeTime = 0) { - TAutoPtr<IEventHandle> handle; - TEvPersQueue::TEvResponse *result; - THolder<TEvPersQueue::TEvRequest> request; - for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { - try { - tc.Runtime->ResetScheduledCount(); - request.Reset(new TEvPersQueue::TEvRequest); - auto req = request->Record.MutablePartitionRequest(); - req->SetPartition(partition); - auto off = req->MutableCmdGetClientOffset(); - off->SetClientId(user); - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); - - UNIT_ASSERT(result); - UNIT_ASSERT(result->Record.HasStatus()); - - if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress - retriesLeft = 3; - continue; - } - - UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); - UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdGetClientOffsetResult()); - auto resp = result->Record.GetPartitionResponse().GetCmdGetClientOffsetResult(); - if (ctime != -1) { - UNIT_ASSERT_EQUAL(resp.HasCreateTimestampMS(), ctime > 0); - if (ctime > 0) { - if (ctime == Max<i64>()) { - UNIT_ASSERT(resp.GetCreateTimestampMS() + 86000000 < TAppData::TimeProvider->Now().MilliSeconds()); - } else { - UNIT_ASSERT_EQUAL((i64)resp.GetCreateTimestampMS(), ctime); - } - } - } - Cerr << "CMDGETOFFSET partition " << partition << " waiting for offset " << offset << ": " << resp << "\n"; - UNIT_ASSERT((offset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == offset); - if (writeTime > 0) { - UNIT_ASSERT(resp.HasWriteTimestampEstimateMS()); - UNIT_ASSERT(resp.GetWriteTimestampEstimateMS() >= writeTime); - } - retriesLeft = 0; - } catch (NActors::TSchedulingLimitReachedException) { - UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); - } - } -} - - -void CmdUpdateWriteTimestamp(const ui32 partition, ui64 timestamp, TTestContext& tc) { - TAutoPtr<IEventHandle> handle; - TEvPersQueue::TEvResponse *result; - THolder<TEvPersQueue::TEvRequest> request; - for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { - try { - tc.Runtime->ResetScheduledCount(); - request.Reset(new TEvPersQueue::TEvRequest); - auto req = request->Record.MutablePartitionRequest(); - req->SetPartition(partition); - auto off = req->MutableCmdUpdateWriteTimestamp(); - off->SetWriteTimeMS(timestamp); - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); - - UNIT_ASSERT(result); - UNIT_ASSERT(result->Record.HasStatus()); - - if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress - retriesLeft = 3; - continue; - } - - UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); - retriesLeft = 0; - } catch (NActors::TSchedulingLimitReachedException) { - UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); - } - } -} - - -TVector<TString> CmdSourceIdRead(TTestContext& tc) { - TAutoPtr<IEventHandle> handle; - TVector<TString> sourceIds; - THolder<TEvKeyValue::TEvRequest> request; - TEvKeyValue::TEvResponse *result; - - for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { - try { - request.Reset(new TEvKeyValue::TEvRequest); - sourceIds.clear(); - auto read = request->Record.AddCmdReadRange(); - auto range = read->MutableRange(); - NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkSourceId); - range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size()); - range->SetIncludeFrom(true); - NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkUserDeprecated); - range->SetTo(ikeyTo.Data(), ikeyTo.Size()); - range->SetIncludeTo(false); - Cout << request.Get()->ToString() << Endl; - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); - UNIT_ASSERT(result); - UNIT_ASSERT(result->Record.HasStatus()); - UNIT_ASSERT_EQUAL(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK); - for (ui64 idx = 0; idx < result->Record.ReadRangeResultSize(); ++idx) { - const auto &readResult = result->Record.GetReadRangeResult(idx); - UNIT_ASSERT(readResult.HasStatus()); - UNIT_ASSERT_EQUAL(readResult.GetStatus(), NKikimrProto::OK); - for (size_t j = 0; j < readResult.PairSize(); ++j) { - const auto& pair = readResult.GetPair(j); - TString s = pair.GetKey().substr(NPQ::TKeyPrefix::MarkedSize()); - sourceIds.push_back(s); - } - } - retriesLeft = 0; - } catch (NActors::TSchedulingLimitReachedException) { - UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); - } - } - return sourceIds; -} - - -void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui32 size, const ui32 resCount, bool timeouted, TTestContext& tc, TVector<i32> offsets = {}, const ui32 maxTimeLagMs = 0, const ui64 readTimestampMs = 0) { - TAutoPtr<IEventHandle> handle; - TEvPersQueue::TEvResponse *result; - THolder<TEvPersQueue::TEvRequest> request; - - for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { - try { - tc.Runtime->ResetScheduledCount(); - request.Reset(new TEvPersQueue::TEvRequest); - auto req = request->Record.MutablePartitionRequest(); - req->SetPartition(partition); - auto read = req->MutableCmdRead(); - read->SetOffset(offset); - read->SetClientId("user"); - read->SetCount(count); - read->SetBytes(size); - if (maxTimeLagMs > 0) { - read->SetMaxTimeLagMs(maxTimeLagMs); - } - if (readTimestampMs > 0) { - read->SetReadTimestampMs(readTimestampMs); - } - req->SetCookie(123); - - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); - result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); - - - UNIT_ASSERT(result); - UNIT_ASSERT(result->Record.HasStatus()); - - UNIT_ASSERT(result->Record.HasPartitionResponse()); - UNIT_ASSERT_EQUAL(result->Record.GetPartitionResponse().GetCookie(), 123); - if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress - retriesLeft = 3; - continue; - } - if (timeouted) { - UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); - UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdReadResult()); - auto res = result->Record.GetPartitionResponse().GetCmdReadResult(); - UNIT_ASSERT_EQUAL(res.ResultSize(), 0); - break; - } - UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); - - UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdReadResult()); - auto res = result->Record.GetPartitionResponse().GetCmdReadResult(); - - UNIT_ASSERT_EQUAL(res.ResultSize(), resCount); - ui64 off = offset; - - for (ui32 i = 0; i < resCount; ++i) { - - auto r = res.GetResult(i); - if (offsets.empty()) { - if (readTimestampMs == 0) { - UNIT_ASSERT_EQUAL((ui64)r.GetOffset(), off); - } - UNIT_ASSERT(r.GetSourceId().size() == 9 && r.GetSourceId().StartsWith("sourceid")); - UNIT_ASSERT_EQUAL(ui32(r.GetData()[0]), off); - UNIT_ASSERT_EQUAL(ui32((unsigned char)r.GetData().back()), r.GetSeqNo() % 256); - ++off; - } else { - UNIT_ASSERT(offsets[i] == (i64)r.GetOffset()); - } - } - retriesLeft = 0; - } catch (NActors::TSchedulingLimitReachedException) { - UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); - } - } -} - - -void FillUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, ui32 partition, ui64 offset) { - NPQ::TKeyPrefix ikey(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUser); - ikey.Append(client.c_str(), client.size()); - - NKikimrPQ::TUserInfo userInfo; - userInfo.SetOffset(offset); - userInfo.SetGeneration(1); - userInfo.SetStep(2); - userInfo.SetSession("test-session"); - userInfo.SetOffsetRewindSum(10); - userInfo.SetReadRuleGeneration(1); - TString out; - Y_PROTOBUF_SUPPRESS_NODISCARD userInfo.SerializeToString(&out); - - TBuffer idata; - idata.Append(out.c_str(), out.size()); - - write->SetKey(ikey.Data(), ikey.Size()); - write->SetValue(idata.Data(), idata.Size()); -} - -void FillDeprecatedUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, ui32 partition, ui64 offset) { - TString session = "test-session"; - ui32 gen = 1; - ui32 step = 2; - NPQ::TKeyPrefix ikeyDeprecated(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUserDeprecated); - ikeyDeprecated.Append(client.c_str(), client.size()); - - TBuffer idataDeprecated = NPQ::NDeprecatedUserData::Serialize(offset, gen, step, session); - write->SetKey(ikeyDeprecated.Data(), ikeyDeprecated.Size()); - write->SetValue(idataDeprecated.Data(), idataDeprecated.Size()); -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// TEST CASES -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - -} // NKikimr +void PQTabletPrepare( + const TTabletPreparationParameters& parameters, + const TVector<std::pair<TString, bool>>& users, + TTestContext& tc); + +TActorId RegisterReadSession( + const TString& session, + TTestContext& tc, + const TVector<ui32>& groups = {}); + +TActorId SetOwner( + const ui32 partition, + TTestContext& tc, + const TString& owner, + bool force); + +TVector<TString> CmdSourceIdRead(TTestContext& tc); + +std::pair<TString, TActorId> CmdSetOwner( + const ui32 partition, + TTestContext& tc, + const TString& owner = "default", + bool force = true); + +void BalancerPrepare( + const TString topic, + const TVector<std::pair<ui32, std::pair<ui64, ui32>>>& map, + const ui64 ssId, + TTestContext& tc, + const bool requireAuth = false); + +void CmdCreateSession( + const ui32 partition, + const TString& user, + const TString& session, + TTestContext& tc, + const i64 offset = 0, + const ui32 gen = 0, + const ui32 step = 0, + bool error = false); + +void CmdGetOffset( + const ui32 partition, + const TString& user, + i64 offset, + TTestContext& tc, + i64 ctime = -1, + ui64 writeTime = 0); + +void CmdKillSession( + const ui32 partition, + const TString& user, + const TString& session, + TTestContext& tc); + +void CmdRead( + const ui32 partition, + const ui64 offset, + const ui32 count, + const ui32 size, + const ui32 resCount, + bool timeouted, + TTestContext& tc, + TVector<i32> offsets = {}, + const ui32 maxTimeLagMs = 0, + const ui64 readTimestampMs = 0); + +void CmdReserveBytes( + const ui32 partition, + TTestContext& tc, + const TString& ownerCookie, + i32 msn, i64 size, + TActorId pipeClient, + bool noAnswer = false, + bool lastRequest = false); + +void CmdSetOffset( + const ui32 partition, + const TString& user, + ui64 offset, + bool error, + TTestContext& tc, + const TString& session = ""); + +void CmdUpdateWriteTimestamp( + const ui32 partition, + ui64 timestamp, + TTestContext& tc); + +void CmdWrite( + const ui32 partition, + const TString& sourceId, + const TVector<std::pair<ui64, TString>> data, + TTestContext& tc, + bool error = false, + const THashSet<ui32>& alreadyWrittenSeqNo = {}, + bool isFirst = false, + const TString& ownerCookie = "", + i32 msn = -1, + i64 offset = -1, + bool treatWrongCookieAsError = false, + bool treatBadOffsetAsError = true, + bool disableDeduplication = false); + +void FillDeprecatedUserInfo( + NKikimrClient::TKeyValueRequest_TCmdWrite* write, + const TString& client, + ui32 partition, + ui64 offset); + +void FillUserInfo( + NKikimrClient::TKeyValueRequest_TCmdWrite* write, + const TString& client, + ui32 partition, + ui64 offset); + +void PQGetPartInfo( + ui64 startOffset, + ui64 endOffset, + TTestContext& tc); + +void ReserveBytes( + const ui32 partition, + TTestContext& tc, + const TString& cookie, + i32 msgSeqNo, + i64 size, + const TActorId& pipeClient, + bool lastRequest); + +void RestartTablet(TTestContext& tc); + +void WaitPartition( + const TString &session, + TTestContext& tc, + ui32 partition, + const TString& sessionToRelease, + const TString& topic, + const TActorId& pipe, + bool ok = true); + +void WaitSessionKill(TTestContext& tc); + +void WriteData( + const ui32 partition, + const TString& sourceId, + const TVector<std::pair<ui64, TString>> data, + TTestContext& tc, + const TString& cookie, + i32 msgSeqNo, + i64 offset, + bool disableDeduplication = false); + +void WritePartData( + const ui32 partition, + const TString& sourceId, + const i64 offset, + const ui64 seqNo, + const ui16 partNo, + const ui16 totalParts, + const ui32 totalSize, + const TString& data, + TTestContext& tc, + const TString& cookie, + i32 msgSeqNo); + +void WritePartDataWithBigMsg( + const ui32 partition, + const TString& sourceId, + const ui64 seqNo, + const ui16 partNo, + const ui16 totalParts, + const ui32 totalSize, + const TString& data, + TTestContext& tc, + const TString& cookie, + i32 msgSeqNo, + ui32 bigMsgSize); + +} // namespace NKikimr diff --git a/ydb/core/persqueue/pq_ut_impl.cpp b/ydb/core/persqueue/pq_ut_impl.cpp new file mode 100644 index 0000000000..b7c39bed47 --- /dev/null +++ b/ydb/core/persqueue/pq_ut_impl.cpp @@ -0,0 +1,908 @@ +#include "pq_ut.h" + +#include <ydb/core/engine/minikql/flat_local_tx_factory.h> +#include <ydb/core/keyvalue/keyvalue_events.h> +#include <ydb/core/keyvalue/keyvalue_events.h> +#include <ydb/core/persqueue/events/global.h> +#include <ydb/core/persqueue/key.h> +#include <ydb/core/persqueue/partition.h> +#include <ydb/core/security/ticket_parser.h> +#include <ydb/core/tablet/tablet_counters_aggregator.h> +#include <ydb/core/tablet_flat/tablet_flat_executed.h> +#include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/public/lib/base/msgbus.h> + +#include <library/cpp/testing/unittest/registar.h> + + +namespace NKikimr { + +void PQTabletPrepare(const TTabletPreparationParameters& parameters, + const TVector<std::pair<TString, bool>>& users, TTestContext& tc) { + TAutoPtr<IEventHandle> handle; + static int version = 0; + if (parameters.specVersion) { + version = parameters.specVersion; + } else { + ++version; + } + for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { + try { + tc.Runtime->ResetScheduledCount(); + + THolder<TEvPersQueue::TEvUpdateConfig> request(new TEvPersQueue::TEvUpdateConfig()); + for (ui32 i = 0; i < parameters.partitions; ++i) { + request->Record.MutableTabletConfig()->AddPartitionIds(i); + } + request->Record.MutableTabletConfig()->SetCacheSize(10_MB); + request->Record.SetTxId(12345); + auto tabletConfig = request->Record.MutableTabletConfig(); + if (tc.Runtime->GetAppData().PQConfig.GetTopicsAreFirstClassCitizen()) { + tabletConfig->SetTopicName("topic"); + tabletConfig->SetTopicPath(tc.Runtime->GetAppData().PQConfig.GetDatabase() + "/topic"); + } else { + tabletConfig->SetTopicName("rt3.dc1--topic"); + tabletConfig->SetTopicPath("/Root/PQ/rt3.dc1--topic"); + } + tabletConfig->SetTopic("topic"); + tabletConfig->SetVersion(version); + tabletConfig->SetLocalDC(parameters.localDC); + tabletConfig->AddReadRules("user"); + tabletConfig->AddReadFromTimestampsMs(parameters.readFromTimestampsMs); + auto config = tabletConfig->MutablePartitionConfig(); + config->SetMaxCountInPartition(parameters.maxCountInPartition); + config->SetMaxSizeInPartition(parameters.maxSizeInPartition); + if (parameters.storageLimitBytes > 0) { + config->SetStorageLimitBytes(parameters.storageLimitBytes); + } else { + config->SetLifetimeSeconds(parameters.deleteTime); + } + config->SetSourceIdLifetimeSeconds(TDuration::Hours(1).Seconds()); + if (parameters.sidMaxCount > 0) + config->SetSourceIdMaxCounts(parameters.sidMaxCount); + config->SetMaxWriteInflightSize(90'000'000); + config->SetLowWatermark(parameters.lowWatermark); + + for (auto& u : users) { + if (u.second) + config->AddImportantClientId(u.first); + if (u.first != "user") + tabletConfig->AddReadRules(u.first); + } + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + TEvPersQueue::TEvUpdateConfigResponse* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvUpdateConfigResponse>(handle); + + UNIT_ASSERT(result); + auto& rec = result->Record; + UNIT_ASSERT(rec.HasStatus() && rec.GetStatus() == NKikimrPQ::OK); + UNIT_ASSERT(rec.HasTxId() && rec.GetTxId() == 12345); + UNIT_ASSERT(rec.HasOrigin() && result->GetOrigin() == 1); + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT(retriesLeft >= 1); + } + } + TEvKeyValue::TEvResponse *result; + THolder<TEvKeyValue::TEvRequest> request; + for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { + try { + + request.Reset(new TEvKeyValue::TEvRequest); + auto read = request->Record.AddCmdRead(); + read->SetKey("_config"); + + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); + + UNIT_ASSERT(result); + UNIT_ASSERT(result->Record.HasStatus()); + UNIT_ASSERT_EQUAL(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK); + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT(retriesLeft >= 1); + } + } +} + +void CmdGetOffset(const ui32 partition, const TString& user, i64 offset, TTestContext& tc, i64 ctime, + ui64 writeTime) { + TAutoPtr<IEventHandle> handle; + TEvPersQueue::TEvResponse *result; + THolder<TEvPersQueue::TEvRequest> request; + for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { + try { + tc.Runtime->ResetScheduledCount(); + request.Reset(new TEvPersQueue::TEvRequest); + auto req = request->Record.MutablePartitionRequest(); + req->SetPartition(partition); + auto off = req->MutableCmdGetClientOffset(); + off->SetClientId(user); + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); + + UNIT_ASSERT(result); + UNIT_ASSERT(result->Record.HasStatus()); + + if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { + tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress + retriesLeft = 3; + continue; + } + + UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); + UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdGetClientOffsetResult()); + auto resp = result->Record.GetPartitionResponse().GetCmdGetClientOffsetResult(); + if (ctime != -1) { + UNIT_ASSERT_EQUAL(resp.HasCreateTimestampMS(), ctime > 0); + if (ctime > 0) { + if (ctime == Max<i64>()) { + UNIT_ASSERT(resp.GetCreateTimestampMS() + 86000000 < TAppData::TimeProvider->Now().MilliSeconds()); + } else { + UNIT_ASSERT_EQUAL((i64)resp.GetCreateTimestampMS(), ctime); + } + } + } + Cerr << "CMDGETOFFSET partition " << partition << " waiting for offset " << offset << ": " << resp << "\n"; + UNIT_ASSERT((offset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == offset); + if (writeTime > 0) { + UNIT_ASSERT(resp.HasWriteTimestampEstimateMS()); + UNIT_ASSERT(resp.GetWriteTimestampEstimateMS() >= writeTime); + } + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); + } + } +} + +void BalancerPrepare(const TString topic, const TVector<std::pair<ui32, std::pair<ui64, ui32>>>& map, const ui64 ssId, TTestContext& tc, const bool requireAuth) { + TAutoPtr<IEventHandle> handle; + static int version = 0; + ++version; + + for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { + try { + tc.Runtime->ResetScheduledCount(); + + THolder<TEvPersQueue::TEvUpdateBalancerConfig> request(new TEvPersQueue::TEvUpdateBalancerConfig()); + for (const auto& p : map) { + auto part = request->Record.AddPartitions(); + part->SetPartition(p.first); + part->SetGroup(p.second.second); + part->SetTabletId(p.second.first); + + auto tablet = request->Record.AddTablets(); + tablet->SetTabletId(p.second.first); + tablet->SetOwner(1); + tablet->SetIdx(p.second.first); + } + request->Record.SetTxId(12345); + request->Record.SetPathId(1); + request->Record.SetVersion(version); + request->Record.SetTopicName(topic); + request->Record.SetPath("path"); + request->Record.SetSchemeShardId(ssId); + request->Record.MutableTabletConfig()->AddReadRules("client"); + request->Record.MutableTabletConfig()->SetRequireAuthWrite(requireAuth); + request->Record.MutableTabletConfig()->SetRequireAuthRead(requireAuth); + + tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + TEvPersQueue::TEvUpdateConfigResponse* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvUpdateConfigResponse>(handle); + + UNIT_ASSERT(result); + auto& rec = result->Record; + UNIT_ASSERT(rec.HasStatus() && rec.GetStatus() == NKikimrPQ::OK); + UNIT_ASSERT(rec.HasTxId() && rec.GetTxId() == 12345); + UNIT_ASSERT(rec.HasOrigin() && result->GetOrigin() == tc.BalancerTabletId); + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT(retriesLeft >= 1); + } + } + //TODO: check state + TTestActorRuntime& runtime = *tc.Runtime; + + ForwardToTablet(runtime, tc.BalancerTabletId, tc.Edge, new TEvents::TEvPoisonPill()); + TDispatchOptions rebootOptions; + rebootOptions.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 2)); + runtime.DispatchEvents(rebootOptions); +} + +void PQGetPartInfo(ui64 startOffset, ui64 endOffset, TTestContext& tc) { + TAutoPtr<IEventHandle> handle; + TEvPersQueue::TEvOffsetsResponse *result; + THolder<TEvPersQueue::TEvOffsets> request; + + for (i32 retriesLeft = 3; retriesLeft > 0; --retriesLeft) { + try { + + tc.Runtime->ResetScheduledCount(); + request.Reset(new TEvPersQueue::TEvOffsets); + + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvOffsetsResponse>(handle); + UNIT_ASSERT(result); + + if (result->Record.PartResultSize() == 0 || + result->Record.GetPartResult(0).GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { + tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress + retriesLeft = 3; + continue; + } + + UNIT_ASSERT(result->Record.PartResultSize()); + UNIT_ASSERT_VALUES_EQUAL((ui64)result->Record.GetPartResult(0).GetStartOffset(), startOffset); + UNIT_ASSERT_VALUES_EQUAL((ui64)result->Record.GetPartResult(0).GetEndOffset(), endOffset); + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT(retriesLeft > 0); + } + } +} + +void RestartTablet(TTestContext& tc) { + TTestActorRuntime& runtime = *tc.Runtime; + + ForwardToTablet(runtime, tc.TabletId, tc.Edge, new TEvents::TEvPoisonPill()); + TDispatchOptions rebootOptions; + rebootOptions.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 2)); + runtime.DispatchEvents(rebootOptions); +} + +TActorId SetOwner(const ui32 partition, TTestContext& tc, const TString& owner, bool force) { + TActorId pipeClient = tc.Runtime->ConnectToPipe(tc.TabletId, tc.Edge, 0, GetPipeConfigWithRetries()); + + THolder<TEvPersQueue::TEvRequest> request; + + request.Reset(new TEvPersQueue::TEvRequest); + auto req = request->Record.MutablePartitionRequest(); + req->SetPartition(partition); + req->MutableCmdGetOwnership()->SetOwner(owner); + req->MutableCmdGetOwnership()->SetForce(force); + ActorIdToProto(pipeClient, req->MutablePipeClient()); + + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries(), pipeClient); + return pipeClient; +} + +TActorId RegisterReadSession(const TString& session, TTestContext& tc, const TVector<ui32>& groups) { + TActorId pipeClient = tc.Runtime->ConnectToPipe(tc.BalancerTabletId, tc.Edge, 0, GetPipeConfigWithRetries()); + + THolder<TEvPersQueue::TEvRegisterReadSession> request; + + request.Reset(new TEvPersQueue::TEvRegisterReadSession); + auto& req = request->Record; + req.SetSession(session); + ActorIdToProto(pipeClient, req.MutablePipeClient()); + req.SetClientId("user"); + for (const auto& g : groups) { + req.AddGroups(g); + } + + tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries(), pipeClient); + return pipeClient; +} + +void WaitSessionKill(TTestContext& tc) { + TAutoPtr<IEventHandle> handle; + + tc.Runtime->ResetScheduledCount(); + + TEvPersQueue::TEvError *result; + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvError>(handle); + UNIT_ASSERT(result); + Cerr << "ANS: " << result->Record << "\n"; +// UNIT_ASSERT_EQUAL(result->Record.GetSession(), session); +} + +void WaitPartition(const TString &session, TTestContext& tc, ui32 partition, const TString& sessionToRelease, const TString& topic, const TActorId& pipe, bool ok) { + TAutoPtr<IEventHandle> handle; + + tc.Runtime->ResetScheduledCount(); + + for (ui32 i = 0; i < 3; ++i) { + Cerr << "STEP " << i << " ok " << ok << "\n"; + + try { + tc.Runtime->ResetScheduledCount(); + if (i % 2 == 0) { + TEvPersQueue::TEvLockPartition *result; + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvLockPartition>(handle); + UNIT_ASSERT(result); + Cerr << "ANS: " << result->Record << "\n"; + UNIT_ASSERT(ok); + UNIT_ASSERT_EQUAL(result->Record.GetSession(), session); + break; + } else { + TEvPersQueue::TEvReleasePartition *result; + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvReleasePartition>(handle); + UNIT_ASSERT(result); + + Cerr << "ANS2: " << result->Record << "\n"; + + UNIT_ASSERT_EQUAL(result->Record.GetSession(), sessionToRelease); + UNIT_ASSERT(ok); + + THolder<TEvPersQueue::TEvPartitionReleased> request; + + request.Reset(new TEvPersQueue::TEvPartitionReleased); + auto& req = request->Record; + req.SetSession(sessionToRelease); + req.SetPartition(partition); + req.SetTopic(topic); + req.SetClientId("user"); + ActorIdToProto(pipe, req.MutablePipeClient()); + + tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries(), pipe); + } + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT(i < 2 || !ok); + } catch (NActors::TEmptyEventQueueException) { + UNIT_ASSERT(i < 2 || !ok); + } + } +} + +std::pair<TString, TActorId> CmdSetOwner(const ui32 partition, TTestContext& tc, const TString& owner, bool force) { + TAutoPtr<IEventHandle> handle; + TEvPersQueue::TEvResponse *result; + TString cookie; + TActorId pipeClient; + for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { + try { + tc.Runtime->ResetScheduledCount(); + + pipeClient = SetOwner(partition, tc, owner, force); + + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); + + UNIT_ASSERT(result); + UNIT_ASSERT(result->Record.HasStatus()); + if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { + tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress + retriesLeft = 3; + continue; + } + + if (result->Record.GetErrorReason().StartsWith("ownership session is killed by another session with id ")) { + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); + UNIT_ASSERT(result); + UNIT_ASSERT(result->Record.HasStatus()); + } + + if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { + tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress + retriesLeft = 3; + continue; + } + + UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); + UNIT_ASSERT(result->Record.HasPartitionResponse()); + UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdGetOwnershipResult()); + UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdGetOwnershipResult().HasOwnerCookie()); + cookie = result->Record.GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie(); + UNIT_ASSERT(!cookie.empty()); + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); + } + } + return std::make_pair(cookie, pipeClient); +} + +void WritePartData(const ui32 partition, const TString& sourceId, const i64 offset, const ui64 seqNo, const ui16 partNo, const ui16 totalParts, + const ui32 totalSize, const TString& data, TTestContext& tc, const TString& cookie, i32 msgSeqNo) { + THolder<TEvPersQueue::TEvRequest> request; + tc.Runtime->ResetScheduledCount(); + request.Reset(new TEvPersQueue::TEvRequest); + auto req = request->Record.MutablePartitionRequest(); + req->SetPartition(partition); + req->SetOwnerCookie(cookie); + req->SetMessageNo(msgSeqNo); + if (offset != -1) + req->SetCmdWriteOffset(offset); + auto write = req->AddCmdWrite(); + write->SetSourceId(sourceId); + write->SetSeqNo(seqNo); + write->SetPartNo(partNo); + write->SetTotalParts(totalParts); + if (partNo == 0) + write->SetTotalSize(totalSize); + write->SetData(data); + + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); +} + +void WritePartDataWithBigMsg(const ui32 partition, const TString& sourceId, const ui64 seqNo, const ui16 partNo, const ui16 totalParts, + const ui32 totalSize, const TString& data, TTestContext& tc, const TString& cookie, i32 msgSeqNo, ui32 bigMsgSize) { + THolder<TEvPersQueue::TEvRequest> request; + tc.Runtime->ResetScheduledCount(); + request.Reset(new TEvPersQueue::TEvRequest); + auto req = request->Record.MutablePartitionRequest(); + req->SetPartition(partition); + req->SetOwnerCookie(cookie); + req->SetMessageNo(msgSeqNo); + + TString bigData(bigMsgSize, 'a'); + + auto write = req->AddCmdWrite(); + write->SetSourceId(sourceId); + write->SetSeqNo(seqNo); + write->SetData(bigData); + + write = req->AddCmdWrite(); + write->SetSourceId(sourceId); + write->SetSeqNo(seqNo + 1); + write->SetPartNo(partNo); + write->SetTotalParts(totalParts); + if (partNo == 0) + write->SetTotalSize(totalSize); + write->SetData(data); + + + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); +} + +void WriteData(const ui32 partition, const TString& sourceId, const TVector<std::pair<ui64, TString>> data, TTestContext& tc, + const TString& cookie, i32 msgSeqNo, i64 offset, bool disableDeduplication) { + THolder<TEvPersQueue::TEvRequest> request; + tc.Runtime->ResetScheduledCount(); + request.Reset(new TEvPersQueue::TEvRequest); + auto req = request->Record.MutablePartitionRequest(); + req->SetPartition(partition); + req->SetOwnerCookie(cookie); + req->SetMessageNo(msgSeqNo); + if (offset >= 0) + req->SetCmdWriteOffset(offset); + for (auto& p : data) { + auto write = req->AddCmdWrite(); + write->SetSourceId(sourceId); + write->SetSeqNo(p.first); + write->SetData(p.second); + write->SetDisableDeduplication(disableDeduplication); + } + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); +} + +void CmdWrite(const ui32 partition, const TString& sourceId, const TVector<std::pair<ui64, TString>> data, + TTestContext& tc, bool error, const THashSet<ui32>& alreadyWrittenSeqNo, + bool isFirst, const TString& ownerCookie, i32 msn, i64 offset, + bool treatWrongCookieAsError, bool treatBadOffsetAsError, + bool disableDeduplication) { + TAutoPtr<IEventHandle> handle; + TEvPersQueue::TEvResponse *result; + + ui32& msgSeqNo = tc.MsgSeqNoMap[partition]; + if (msn != -1) msgSeqNo = msn; + TString cookie = ownerCookie; + for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { + try { + WriteData(partition, sourceId, data, tc, cookie, msgSeqNo, offset, disableDeduplication); + result = tc.Runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle, [](const TEvPersQueue::TEvResponse& ev){ + if (ev.Record.HasPartitionResponse() && ev.Record.GetPartitionResponse().CmdWriteResultSize() > 0 || ev.Record.GetErrorCode() != NPersQueue::NErrorCode::OK) + return true; + return false; + }); //there could be outgoing reads in TestReadSubscription test + + UNIT_ASSERT(result); + UNIT_ASSERT(result->Record.HasStatus()); + if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { + tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress + retriesLeft = 3; + continue; + } + + if (!treatWrongCookieAsError && result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE) { + cookie = CmdSetOwner(partition, tc).first; + msgSeqNo = 0; + retriesLeft = 3; + continue; + } + + if (!treatBadOffsetAsError && result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRITE_ERROR_BAD_OFFSET) { + return; + } + + if (error) { + UNIT_ASSERT( + result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL || + result->Record.GetErrorCode() == NPersQueue::NErrorCode::BAD_REQUEST || + result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE + ); + break; + } else { + Cerr << result->Record.GetErrorReason(); + UNIT_ASSERT_VALUES_EQUAL((ui32)result->Record.GetErrorCode(), (ui32)NPersQueue::NErrorCode::OK); + } + UNIT_ASSERT_VALUES_EQUAL(result->Record.GetPartitionResponse().CmdWriteResultSize(), data.size()); + + for (ui32 i = 0; i < data.size(); ++i) { + UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasAlreadyWritten()); + UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasOffset()); + UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasMaxSeqNo() == + result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetAlreadyWritten()); + if (result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasMaxSeqNo()) { + UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetMaxSeqNo() >= (i64)data[i].first); + } + if (isFirst || offset != -1) { + UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetAlreadyWritten() + || result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetOffset() == i + (offset == -1 ? 0 : offset)); + } + } + for (ui32 i = 0; i < data.size(); ++i) { + auto res = result->Record.GetPartitionResponse().GetCmdWriteResult(i); + UNIT_ASSERT(!alreadyWrittenSeqNo.contains(res.GetSeqNo()) || res.GetAlreadyWritten()); + } + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); + retriesLeft = 3; + } + } + ++msgSeqNo; +} + +void ReserveBytes(const ui32 partition, TTestContext& tc, + const TString& cookie, i32 msgSeqNo, i64 size, const TActorId& pipeClient, bool lastRequest) { + THolder<TEvPersQueue::TEvRequest> request; + tc.Runtime->ResetScheduledCount(); + request.Reset(new TEvPersQueue::TEvRequest); + auto req = request->Record.MutablePartitionRequest(); + req->SetPartition(partition); + req->SetOwnerCookie(cookie); + req->SetMessageNo(msgSeqNo); + ActorIdToProto(pipeClient, req->MutablePipeClient()); + req->MutableCmdReserveBytes()->SetSize(size); + req->MutableCmdReserveBytes()->SetLastRequest(lastRequest); + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + + tc.Runtime->DispatchEvents(); +} + +void CmdReserveBytes(const ui32 partition, TTestContext& tc, const TString& ownerCookie, i32 msn, i64 size, TActorId pipeClient, bool noAnswer, bool lastRequest) { + TAutoPtr<IEventHandle> handle; + TEvPersQueue::TEvResponse *result; + + ui32& msgSeqNo = tc.MsgSeqNoMap[partition]; + if (msn != -1) msgSeqNo = msn; + TString cookie = ownerCookie; + + for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { + try { + ReserveBytes(partition, tc, cookie, msgSeqNo, size, pipeClient, lastRequest); + result = tc.Runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle, [](const TEvPersQueue::TEvResponse& ev){ + if (!ev.Record.HasPartitionResponse() || !ev.Record.GetPartitionResponse().HasCmdReadResult()) + return true; + return false; + }); //there could be outgoing reads in TestReadSubscription test + + UNIT_ASSERT(result); + UNIT_ASSERT(result->Record.HasStatus()); + + if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { + retriesLeft = 3; + continue; + } + + if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE) { + auto p = CmdSetOwner(partition, tc); + pipeClient = p.second; + cookie = p.first; + msgSeqNo = 0; + retriesLeft = 3; + continue; + } + UNIT_ASSERT(!noAnswer); + + UNIT_ASSERT_C(result->Record.GetErrorCode() == NPersQueue::NErrorCode::OK, result->Record); + + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + if (noAnswer) + break; + UNIT_ASSERT(retriesLeft == 2); + } + } + ++msgSeqNo; +} + + +void CmdSetOffset(const ui32 partition, const TString& user, ui64 offset, bool error, TTestContext& tc, const TString& session) { + TAutoPtr<IEventHandle> handle; + TEvPersQueue::TEvResponse *result; + THolder<TEvPersQueue::TEvRequest> request; + for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { + try { + tc.Runtime->ResetScheduledCount(); + request.Reset(new TEvPersQueue::TEvRequest); + auto req = request->Record.MutablePartitionRequest(); + req->SetPartition(partition); + auto off = req->MutableCmdSetClientOffset(); + off->SetClientId(user); + off->SetOffset(offset); + if (!session.empty()) + off->SetSessionId(session); + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); + + UNIT_ASSERT(result); + UNIT_ASSERT(result->Record.HasStatus()); + if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { + tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress + retriesLeft = 3; + continue; + } + if ((result->Record.GetErrorCode() == NPersQueue::NErrorCode::SET_OFFSET_ERROR_COMMIT_TO_FUTURE || + result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE) && error) { + break; + } + UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); + } + } +} + + +void CmdCreateSession(const ui32 partition, const TString& user, const TString& session, TTestContext& tc, const i64 offset, const ui32 gen, const ui32 step, bool error) { + TAutoPtr<IEventHandle> handle; + TEvPersQueue::TEvResponse *result; + THolder<TEvPersQueue::TEvRequest> request; + for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { + try { + tc.Runtime->ResetScheduledCount(); + request.Reset(new TEvPersQueue::TEvRequest); + auto req = request->Record.MutablePartitionRequest(); + req->SetPartition(partition); + auto off = req->MutableCmdCreateSession(); + off->SetClientId(user); + off->SetSessionId(session); + off->SetGeneration(gen); + off->SetStep(step); + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); + + UNIT_ASSERT(result); + UNIT_ASSERT(result->Record.HasStatus()); + if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { + tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress + retriesLeft = 3; + continue; + } + + if (error) { + UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::WRONG_COOKIE); + return; + } + + UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); + + UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdGetClientOffsetResult()); + auto resp = result->Record.GetPartitionResponse().GetCmdGetClientOffsetResult(); + UNIT_ASSERT(resp.HasOffset() && (i64)resp.GetOffset() == offset); + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); + } + } +} + +void CmdKillSession(const ui32 partition, const TString& user, const TString& session, TTestContext& tc) { + TAutoPtr<IEventHandle> handle; + TEvPersQueue::TEvResponse *result; + THolder<TEvPersQueue::TEvRequest> request; + for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { + try { + tc.Runtime->ResetScheduledCount(); + request.Reset(new TEvPersQueue::TEvRequest); + auto req = request->Record.MutablePartitionRequest(); + req->SetPartition(partition); + auto off = req->MutableCmdDeleteSession(); + off->SetClientId(user); + off->SetSessionId(session); + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); + + UNIT_ASSERT(result); + UNIT_ASSERT(result->Record.HasStatus()); + if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { + tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress + retriesLeft = 3; + continue; + } + UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); + } + } +} + + +void CmdUpdateWriteTimestamp(const ui32 partition, ui64 timestamp, TTestContext& tc) { + TAutoPtr<IEventHandle> handle; + TEvPersQueue::TEvResponse *result; + THolder<TEvPersQueue::TEvRequest> request; + for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { + try { + tc.Runtime->ResetScheduledCount(); + request.Reset(new TEvPersQueue::TEvRequest); + auto req = request->Record.MutablePartitionRequest(); + req->SetPartition(partition); + auto off = req->MutableCmdUpdateWriteTimestamp(); + off->SetWriteTimeMS(timestamp); + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); + + UNIT_ASSERT(result); + UNIT_ASSERT(result->Record.HasStatus()); + + if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { + tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress + retriesLeft = 3; + continue; + } + + UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); + } + } +} + + +TVector<TString> CmdSourceIdRead(TTestContext& tc) { + TAutoPtr<IEventHandle> handle; + TVector<TString> sourceIds; + THolder<TEvKeyValue::TEvRequest> request; + TEvKeyValue::TEvResponse *result; + + for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { + try { + request.Reset(new TEvKeyValue::TEvRequest); + sourceIds.clear(); + auto read = request->Record.AddCmdReadRange(); + auto range = read->MutableRange(); + NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkSourceId); + range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size()); + range->SetIncludeFrom(true); + NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkUserDeprecated); + range->SetTo(ikeyTo.Data(), ikeyTo.Size()); + range->SetIncludeTo(false); + Cout << request.Get()->ToString() << Endl; + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); + UNIT_ASSERT(result); + UNIT_ASSERT(result->Record.HasStatus()); + UNIT_ASSERT_EQUAL(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK); + for (ui64 idx = 0; idx < result->Record.ReadRangeResultSize(); ++idx) { + const auto &readResult = result->Record.GetReadRangeResult(idx); + UNIT_ASSERT(readResult.HasStatus()); + UNIT_ASSERT_EQUAL(readResult.GetStatus(), NKikimrProto::OK); + for (size_t j = 0; j < readResult.PairSize(); ++j) { + const auto& pair = readResult.GetPair(j); + TString s = pair.GetKey().substr(NPQ::TKeyPrefix::MarkedSize()); + sourceIds.push_back(s); + } + } + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); + } + } + return sourceIds; +} + + +void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui32 size, const ui32 resCount, bool timeouted, TTestContext& tc, TVector<i32> offsets, const ui32 maxTimeLagMs, const ui64 readTimestampMs) { + TAutoPtr<IEventHandle> handle; + TEvPersQueue::TEvResponse *result; + THolder<TEvPersQueue::TEvRequest> request; + + for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) { + try { + tc.Runtime->ResetScheduledCount(); + request.Reset(new TEvPersQueue::TEvRequest); + auto req = request->Record.MutablePartitionRequest(); + req->SetPartition(partition); + auto read = req->MutableCmdRead(); + read->SetOffset(offset); + read->SetClientId("user"); + read->SetCount(count); + read->SetBytes(size); + if (maxTimeLagMs > 0) { + read->SetMaxTimeLagMs(maxTimeLagMs); + } + if (readTimestampMs > 0) { + read->SetReadTimestampMs(readTimestampMs); + } + req->SetCookie(123); + + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle); + + + UNIT_ASSERT(result); + UNIT_ASSERT(result->Record.HasStatus()); + + UNIT_ASSERT(result->Record.HasPartitionResponse()); + UNIT_ASSERT_EQUAL(result->Record.GetPartitionResponse().GetCookie(), 123); + if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { + tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress + retriesLeft = 3; + continue; + } + if (timeouted) { + UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); + UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdReadResult()); + auto res = result->Record.GetPartitionResponse().GetCmdReadResult(); + UNIT_ASSERT_EQUAL(res.ResultSize(), 0); + break; + } + UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK); + + UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdReadResult()); + auto res = result->Record.GetPartitionResponse().GetCmdReadResult(); + + UNIT_ASSERT_EQUAL(res.ResultSize(), resCount); + ui64 off = offset; + + for (ui32 i = 0; i < resCount; ++i) { + + auto r = res.GetResult(i); + if (offsets.empty()) { + if (readTimestampMs == 0) { + UNIT_ASSERT_EQUAL((ui64)r.GetOffset(), off); + } + UNIT_ASSERT(r.GetSourceId().size() == 9 && r.GetSourceId().StartsWith("sourceid")); + UNIT_ASSERT_EQUAL(ui32(r.GetData()[0]), off); + UNIT_ASSERT_EQUAL(ui32((unsigned char)r.GetData().back()), r.GetSeqNo() % 256); + ++off; + } else { + UNIT_ASSERT(offsets[i] == (i64)r.GetOffset()); + } + } + retriesLeft = 0; + } catch (NActors::TSchedulingLimitReachedException) { + UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2); + } + } +} + + +void FillUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, ui32 partition, ui64 offset) { + NPQ::TKeyPrefix ikey(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUser); + ikey.Append(client.c_str(), client.size()); + + NKikimrPQ::TUserInfo userInfo; + userInfo.SetOffset(offset); + userInfo.SetGeneration(1); + userInfo.SetStep(2); + userInfo.SetSession("test-session"); + userInfo.SetOffsetRewindSum(10); + userInfo.SetReadRuleGeneration(1); + TString out; + Y_PROTOBUF_SUPPRESS_NODISCARD userInfo.SerializeToString(&out); + + TBuffer idata; + idata.Append(out.c_str(), out.size()); + + write->SetKey(ikey.Data(), ikey.Size()); + write->SetValue(idata.Data(), idata.Size()); +} + +void FillDeprecatedUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, ui32 partition, ui64 offset) { + TString session = "test-session"; + ui32 gen = 1; + ui32 step = 2; + NPQ::TKeyPrefix ikeyDeprecated(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUserDeprecated); + ikeyDeprecated.Append(client.c_str(), client.size()); + + TBuffer idataDeprecated = NPQ::NDeprecatedUserData::Serialize(offset, gen, step, session); + write->SetKey(ikeyDeprecated.Data(), ikeyDeprecated.Size()); + write->SetValue(idataDeprecated.Data(), idataDeprecated.Size()); +} + +} // namespace NKikimr diff --git a/ydb/core/persqueue/ut/CMakeLists.darwin.txt b/ydb/core/persqueue/ut/CMakeLists.darwin.txt index 4928770dfe..2ab8a0cbeb 100644 --- a/ydb/core/persqueue/ut/CMakeLists.darwin.txt +++ b/ydb/core/persqueue/ut/CMakeLists.darwin.txt @@ -39,6 +39,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_ut_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/type_codecs_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/user_info_ut.cpp diff --git a/ydb/core/persqueue/ut/CMakeLists.linux.txt b/ydb/core/persqueue/ut/CMakeLists.linux.txt index 79d902da8f..bd73cffc77 100644 --- a/ydb/core/persqueue/ut/CMakeLists.linux.txt +++ b/ydb/core/persqueue/ut/CMakeLists.linux.txt @@ -43,6 +43,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_ut_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/type_codecs_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/user_info_ut.cpp diff --git a/ydb/core/persqueue/ut_slow/CMakeLists.darwin.txt b/ydb/core/persqueue/ut_slow/CMakeLists.darwin.txt index 5d657b27d5..b4a546fecf 100644 --- a/ydb/core/persqueue/ut_slow/CMakeLists.darwin.txt +++ b/ydb/core/persqueue/ut_slow/CMakeLists.darwin.txt @@ -34,6 +34,7 @@ target_link_options(ydb-core-persqueue-ut_slow PRIVATE CoreFoundation ) target_sources(ydb-core-persqueue-ut_slow PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_ut_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_ut_slow.cpp ) add_test( diff --git a/ydb/core/persqueue/ut_slow/CMakeLists.linux.txt b/ydb/core/persqueue/ut_slow/CMakeLists.linux.txt index ca0df6ce16..038edb3538 100644 --- a/ydb/core/persqueue/ut_slow/CMakeLists.linux.txt +++ b/ydb/core/persqueue/ut_slow/CMakeLists.linux.txt @@ -38,6 +38,7 @@ target_link_options(ydb-core-persqueue-ut_slow PRIVATE -ldl ) target_sources(ydb-core-persqueue-ut_slow PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_ut_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_ut_slow.cpp ) add_test( |