aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormokhotskii <mokhotskii@ydb.tech>2022-07-28 18:33:00 +0300
committermokhotskii <mokhotskii@ydb.tech>2022-07-28 18:33:00 +0300
commitb9a10e543a5624c857f7c598b2605e161e01d688 (patch)
treebf40bf6efe67f9e2a5f0b5517ed639efc04cb9bf
parentb4e0b9fbb7463d8633199df9ae0fe44f8712c770 (diff)
downloadydb-b9a10e543a5624c857f7c598b2605e161e01d688.tar.gz
Clean up pq partition.h
Clean up pq partition.h/cpp: - move all types from partition.h to partition_types.h - move and sort TPartition methods within a class definition - rename TPartition::Counters -> TabletCounters - rename TPartition::PartitionLabeledCounters -> PartitionCounters - move every { to the previous line - move helper test functions to pq_ut_impl.cpp - clean up pq_ut.h
-rw-r--r--ydb/core/persqueue/partition.cpp579
-rw-r--r--ydb/core/persqueue/partition.h418
-rw-r--r--ydb/core/persqueue/partition_types.h130
-rw-r--r--ydb/core/persqueue/pq_ut.cpp12
-rw-r--r--ydb/core/persqueue/pq_ut.h1111
-rw-r--r--ydb/core/persqueue/pq_ut_impl.cpp908
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.linux.txt1
-rw-r--r--ydb/core/persqueue/ut_slow/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/persqueue/ut_slow/CMakeLists.linux.txt1
10 files changed, 1605 insertions, 1557 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 197c154968..a4151cb566 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -1,9 +1,7 @@
-#include "partition.h"
#include "event_helpers.h"
-#include "read.h"
-#include "sourceid.h"
-#include "ownerinfo.h"
#include "mirrorer.h"
+#include "partition.h"
+#include "read.h"
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/blobstorage.h>
@@ -25,23 +23,15 @@
Y_VERIFY(!blob.Data.empty(), "Empty data. SourceId: %s, SeqNo: %" PRIu64, blob.SourceId.data(), blob.SeqNo); \
Y_VERIFY(blob.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, blob.SeqNo);
-namespace NKikimr {
-namespace NPQ {
-
-static const ui32 BATCH_UNPACK_SIZE_BORDER = 500_KB; //500kb
-
-static const ui32 MAX_WRITE_CYCLE_SIZE = 16_MB; //16MB
+namespace NKikimr::NPQ {
+static const ui32 BATCH_UNPACK_SIZE_BORDER = 500_KB;
+static const ui32 MAX_WRITE_CYCLE_SIZE = 16_MB;
static const ui32 MAX_USER_ACTS = 1000;
-
static const TDuration WAKE_TIMEOUT = TDuration::Seconds(5);
-
static const ui32 MAX_INLINE_SIZE = 1000;
-
static const ui32 LEVEL0 = 32;
-
static const TDuration UPDATE_AVAIL_SIZE_INTERVAL = TDuration::MilliSeconds(100);
-
static const TString WRITE_QUOTA_ROOT_PATH = "write-quota";
struct TPartition::THasDataReq {
@@ -67,6 +57,15 @@ struct TPartition::THasDataDeadline {
}
};
+struct TMirrorerInfo {
+ TMirrorerInfo(const TActorId& actor, const TTabletCountersBase& baseline)
+ : Actor(actor) {
+ Baseline.Populate(baseline);
+ }
+
+ TActorId Actor;
+ TTabletCountersBase Baseline;
+};
class TKeyLevel {
public:
@@ -76,8 +75,7 @@ public:
: Border_(border)
, Sum_(0)
, RecsCount_(0)
- , InternalPartsCount_(0)
- {}
+ , InternalPartsCount_(0) {}
void Clear() {
Keys_.clear();
@@ -132,7 +130,6 @@ public:
return res;
}
-
ui32 Sum() const {
return Sum_;
}
@@ -146,6 +143,7 @@ public:
Y_VERIFY(pos < Keys_.size());
return Keys_[pos].second;
}
+
void PushKeyToFront(const TKey& key, ui32 size) {
Sum_ += size;
RecsCount_ += key.GetCount();
@@ -172,9 +170,7 @@ private:
ui16 InternalPartsCount_;
};
-
-void HtmlOutput(IOutputStream& out, const TString& line, const std::deque<std::pair<TKey, ui32>>& keys)
-{
+void HtmlOutput(IOutputStream& out, const TString& line, const std::deque<std::pair<TKey, ui32>>& keys) {
HTML(out) {
TABLE() {
TABLEHEAD() {
@@ -206,7 +202,6 @@ void HtmlOutput(IOutputStream& out, const TString& line, const std::deque<std::p
}
}
-
IOutputStream& operator <<(IOutputStream& out, const TKeyLevel& value) {
TStringStream str;
str << "count=" << value.Keys_.size() << " sum=" << value.Sum_ << " border=" << value.Border_ << " recs= " << value.RecsCount_ << ":";
@@ -228,27 +223,14 @@ ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp
}
}
-struct TMirrorerInfo {
- TMirrorerInfo(const TActorId& actor, const TTabletCountersBase& baseline)
- : Actor(actor)
- {
- Baseline.Populate(baseline);
- }
-
- TActorId Actor;
- TTabletCountersBase Baseline;
-};
-
-void TPartition::ReplyError(const TActorContext& ctx, const ui64 dst, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error)
-{
+void TPartition::ReplyError(const TActorContext& ctx, const ui64 dst, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error) {
ReplyPersQueueError(
- dst == 0 ? ctx.SelfID : Tablet, ctx, TabletID, TopicConverter->GetClientsideName(), Partition, Counters,
- NKikimrServices::PERSQUEUE, dst, errorCode, error, true
+ dst == 0 ? ctx.SelfID : Tablet, ctx, TabletID, TopicConverter->GetClientsideName(), Partition,
+ TabletCounters, NKikimrServices::PERSQUEUE, dst, errorCode, error, true
);
}
-void TPartition::ReplyOk(const TActorContext& ctx, const ui64 dst)
-{
+void TPartition::ReplyOk(const TActorContext& ctx, const ui64 dst) {
THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst);
NKikimrClient::TResponse& resp = response->Response;
resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
@@ -256,8 +238,7 @@ void TPartition::ReplyOk(const TActorContext& ctx, const ui64 dst)
ctx.Send(Tablet, response.Release());
}
-void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie)
-{
+void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) {
THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst);
NKikimrClient::TResponse& resp = response->Response;
resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
@@ -269,8 +250,7 @@ void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TS
void TPartition::ReplyWrite(
const TActorContext& ctx, const ui64 dst, const TString& sourceId, const ui64 seqNo, const ui16 partNo, const ui16 totalParts,
const ui64 offset, const TInstant writeTimestamp, bool already, const ui64 maxSeqNo,
- const ui64 partitionQuotedTime, const TDuration topicQuotedTime, const ui64 queueTime, const ui64 writeTime)
-{
+ const ui64 partitionQuotedTime, const TDuration topicQuotedTime, const ui64 queueTime, const ui64 writeTime) {
Y_VERIFY(offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, offset);
Y_VERIFY(seqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, seqNo);
@@ -299,8 +279,7 @@ void TPartition::ReplyWrite(
void TPartition::ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset,
- const TInstant writeTimestamp, const TInstant createTimestamp)
-{
+ const TInstant writeTimestamp, const TInstant createTimestamp) {
THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst);
NKikimrClient::TResponse& resp = response->Response;
resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
@@ -323,8 +302,7 @@ void TPartition::ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst
static void RequestRange(const TActorContext& ctx, const TActorId& dst, ui32 partition,
- TKeyPrefix::EType c, bool includeData = false, const TString& key = "", bool dropTmp = false)
-{
+ TKeyPrefix::EType c, bool includeData = false, const TString& key = "", bool dropTmp = false) {
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
auto read = request->Record.AddCmdReadRange();
auto range = read->MutableRange();
@@ -356,8 +334,7 @@ static void RequestRange(const TActorContext& ctx, const TActorId& dst, ui32 par
}
-NKikimrClient::TKeyValueRequest::EStorageChannel GetChannel(ui32 i)
-{
+NKikimrClient::TKeyValueRequest::EStorageChannel GetChannel(ui32 i) {
return NKikimrClient::TKeyValueRequest::EStorageChannel(NKikimrClient::TKeyValueRequest::MAIN + i);
}
@@ -369,8 +346,7 @@ void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels) {
}
-static void RequestDiskStatus(const TActorContext& ctx, const TActorId& dst, ui32 numChannels)
-{
+static void RequestDiskStatus(const TActorContext& ctx, const TActorId& dst, ui32 numChannels) {
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
AddCheckDiskRequest(request.Get(), numChannels);
@@ -379,13 +355,11 @@ static void RequestDiskStatus(const TActorContext& ctx, const TActorId& dst, ui3
}
-void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key)
-{
+void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) {
RequestRange(ctx, dst, partition, TKeyPrefix::TypeInfo, true, key, key == "");
}
-void RequestMetaRead(const TActorContext& ctx, const TActorId& dst, ui32 partition)
-{
+void RequestMetaRead(const TActorContext& ctx, const TActorId& dst, ui32 partition) {
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
auto read = request->Record.AddCmdRead();
TKeyPrefix key{TKeyPrefix::TypeMeta, partition};
@@ -393,8 +367,7 @@ void RequestMetaRead(const TActorContext& ctx, const TActorId& dst, ui32 partiti
ctx.Send(dst, request.Release());
}
-void RequestData(const TActorContext& ctx, const TActorId& dst, const TVector<TString>& keys)
-{
+void RequestData(const TActorContext& ctx, const TActorId& dst, const TVector<TString>& keys) {
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
for (auto& key: keys) {
auto read = request->Record.AddCmdRead();
@@ -403,8 +376,7 @@ void RequestData(const TActorContext& ctx, const TActorId& dst, const TVector<TS
ctx.Send(dst, request.Release());
}
-void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key)
-{
+void RequestDataRange(const TActorContext& ctx, const TActorId& dst, ui32 partition, const TString& key) {
RequestRange(ctx, dst, partition, TKeyPrefix::TypeData, false, key);
}
@@ -488,11 +460,11 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
, InitDuration(TDuration::Zero())
, InitDone(false)
, NewPartition(newPartition)
- // ToDo ToReview - Which name to use here? It verifies in tablet_counters_protobuf.h:633 on proper path
- , PartitionLabeledCounters(
+ // TODO: ToReview - Which name to use here? It verifies in tablet_counters_protobuf.h:633 on proper path
+ , PartitionCounters(
topicConverter->IsFirstClass() ? nullptr
: new TPartitionLabeledCounters(topicConverter->GetClientsideName(), partition))
- , Subscriber(partition, Counters, Tablet)
+ , Subscriber(partition, TabletCounters, Tablet)
, WriteCycleStartTime(ctx.Now())
, WriteCycleSize(0)
, WriteNewSize(0)
@@ -510,8 +482,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
, TotalChannelWritesByHead(Config.GetPartitionConfig().GetNumChannels(), 0)
, WriteBufferIsFullCounter(nullptr)
, WriteTimestamp(ctx.Now())
- , WriteLagMs(TDuration::Minutes(1), 100)
-{
+ , WriteLagMs(TDuration::Minutes(1), 100) {
if (Config.GetPartitionConfig().HasMirrorFrom()) {
ManageWriteTimestampEstimate = !Config.GetPartitionConfig().GetMirrorFrom().GetSyncWriteTime();
} else {
@@ -522,11 +493,10 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
CalcTopicWriteQuotaParams();
- Counters.Populate(counters);
+ TabletCounters.Populate(counters);
}
-void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx)
-{
+void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx) {
TVector<TString> res;
TString str;
if (CurrentStateFunc() == &TThis::StateInit) {
@@ -570,8 +540,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
res.push_back(out.Str()); out.Clear();
}
out << Config.DebugString(); res.push_back(out.Str()); out.Clear();
- HTML(out)
- {
+ HTML(out) {
DIV_CLASS_ID("tab-pane fade", Sprintf("partition_%u", ui32(Partition))) {
TABLE_SORTABLE_CLASS("table") {
TABLEHEAD() {
@@ -721,8 +690,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
}
-void TPartition::Bootstrap(const TActorContext& ctx)
-{
+void TPartition::Bootstrap(const TActorContext& ctx) {
UsersInfoStorage.Init(Tablet, SelfId());
Y_VERIFY(AppData(ctx)->PQConfig.GetMaxBlobsPerLevel() > 0);
@@ -741,7 +709,6 @@ void TPartition::Bootstrap(const TActorContext& ctx)
std::reverse(CompactLevelBorder.begin(), CompactLevelBorder.end());
-
for (ui32 i = 0; i < TotalLevels; ++i) {
DataKeysHead.push_back(TKeyLevel(CompactLevelBorder[i]));
}
@@ -762,7 +729,6 @@ void TPartition::Bootstrap(const TActorContext& ctx)
}
if (AppData(ctx)->Counters) {
- TVector<NPersQueue::TPQLabelsInfo> labels;
if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) {
SetupStreamCounters(ctx);
} else {
@@ -966,7 +932,6 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
void TPartition::UpdateAvailableSize(const TActorContext& ctx) {
-
FilterDeadlinedWrites(ctx);
auto now = ctx.Now();
@@ -987,7 +952,7 @@ void TPartition::UpdateAvailableSize(const TActorContext& ctx) {
}
}
ScheduleUpdateAvailableSize(ctx);
- ReportLabeledCounters(ctx);
+ ReportCounters(ctx);
}
void TPartition::HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActorContext& ctx) {
@@ -1004,9 +969,9 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {
FilterDeadlinedWrites(ctx);
ctx.Schedule(WAKE_TIMEOUT, new TEvents::TEvWakeup());
- ctx.Send(Tablet, new TEvPQ::TEvPartitionCounters(Partition, Counters));
+ ctx.Send(Tablet, new TEvPQ::TEvPartitionCounters(Partition, TabletCounters));
- ReportLabeledCounters(ctx);
+ ReportCounters(ctx);
ProcessHasDataRequests(ctx);
@@ -1231,7 +1196,7 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
void TPartition::Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContext& /*ctx*/) {
if (Mirrorer) {
auto diff = ev->Get()->Counters.MakeDiffForAggr(Mirrorer->Baseline);
- Counters.Populate(*diff.Get());
+ TabletCounters.Populate(*diff.Get());
ev->Get()->Counters.RememberCurrentStateAsBaseline(Mirrorer->Baseline);
}
}
@@ -1240,7 +1205,7 @@ void TPartition::Handle(NReadSpeedLimiterEvents::TEvCounters::TPtr& ev, const TA
auto userInfo = UsersInfoStorage.GetIfExists(ev->Get()->User);
if (userInfo && userInfo->ReadSpeedLimiter) {
auto diff = ev->Get()->Counters.MakeDiffForAggr(userInfo->ReadSpeedLimiter->Baseline);
- Counters.Populate(*diff.Get());
+ TabletCounters.Populate(*diff.Get());
ev->Get()->Counters.RememberCurrentStateAsBaseline(userInfo->ReadSpeedLimiter->Baseline);
}
}
@@ -1276,14 +1241,13 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx)
Die(ctx);
}
-void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx)
-{
+void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx) {
for (const auto& w : Requests) {
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::WRITE_ERROR_DISK_IS_FULL, "Disk is full");
if (w.IsWrite()) {
const auto& msg = w.GetWrite().Msg;
- Counters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1);
- Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size());
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size());
WriteInflightSize -= msg.Data.size();
}
}
@@ -1298,8 +1262,7 @@ void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx)
}
-void TPartition::FailBadClient(const TActorContext& ctx)
-{
+void TPartition::FailBadClient(const TActorContext& ctx) {
for (auto it = Owners.begin(); it != Owners.end();) {
it = DropOwner(it, ctx);
}
@@ -1310,8 +1273,8 @@ void TPartition::FailBadClient(const TActorContext& ctx)
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::BAD_REQUEST, "previous write request failed");
if (w.IsWrite()) {
const auto& msg = w.GetWrite().Msg;
- Counters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1);
- Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size());
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size());
WriteInflightSize -= msg.Data.size();
}
}
@@ -1321,9 +1284,9 @@ void TPartition::FailBadClient(const TActorContext& ctx)
for (const auto& w : Responses) {
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::BAD_REQUEST, "previous write request failed");
if (w.IsWrite())
- Counters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1);
}
- Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(WriteNewSize);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(WriteNewSize);
Responses.clear();
ProcessChangeOwnerRequests(ctx);
@@ -1331,13 +1294,11 @@ void TPartition::FailBadClient(const TActorContext& ctx)
}
-bool CheckDiskStatus(const TStorageStatusFlags status)
-{
+bool CheckDiskStatus(const TStorageStatusFlags status) {
return !status.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove);
}
-void TPartition::HandleGetDiskStatus(const NKikimrClient::TResponse& response, const TActorContext& ctx)
-{
+void TPartition::HandleGetDiskStatus(const NKikimrClient::TResponse& response, const TActorContext& ctx) {
bool diskIsOk = true;
for (ui32 i = 0; i < response.GetStatusResultSize(); ++i) {
auto& res = response.GetGetStatusResult(i);
@@ -1362,8 +1323,7 @@ void TPartition::HandleGetDiskStatus(const NKikimrClient::TResponse& response, c
RequestMetaRead(ctx, Tablet, Partition);
}
-void TPartition::HandleMetaRead(const NKikimrClient::TKeyValueResponse::TReadResult& response, const TActorContext& ctx)
-{
+void TPartition::HandleMetaRead(const NKikimrClient::TKeyValueResponse::TReadResult& response, const TActorContext& ctx) {
NKikimrPQ::TPartitionMeta meta;
switch (response.GetStatus()) {
case NKikimrProto::OK: {
@@ -1398,8 +1358,7 @@ void TPartition::HandleMetaRead(const NKikimrClient::TKeyValueResponse::TReadRes
-void TPartition::HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx)
-{
+void TPartition::HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) {
//megaqc check here all results
Y_VERIFY(range.HasStatus());
const TString *key = nullptr;
@@ -1459,8 +1418,7 @@ void TPartition::HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TRe
};
}
-void TPartition::FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx)
-{
+void TPartition::FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) {
for (ui32 i = 0; i < range.PairSize(); ++i) {
auto pair = range.GetPair(i);
Y_VERIFY(pair.GetStatus() == NKikimrProto::OK); //this is readrange without keys, only OK could be here
@@ -1495,8 +1453,7 @@ void TPartition::FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TRead
Y_VERIFY(EndOffset >= StartOffset);
}
-void TPartition::FormHeadAndProceed(const TActorContext& ctx)
-{
+void TPartition::FormHeadAndProceed(const TActorContext& ctx) {
Head.Offset = EndOffset;
Head.PartNo = 0;
TVector<TString> keys;
@@ -1528,8 +1485,7 @@ void TPartition::FormHeadAndProceed(const TActorContext& ctx)
RequestData(ctx, Tablet, keys);
}
-void TPartition::HandleDataRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx)
-{
+void TPartition::HandleDataRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) {
Y_VERIFY(range.HasStatus());
switch(range.GetStatus()) {
case NKikimrProto::OK:
@@ -1553,8 +1509,7 @@ void TPartition::HandleDataRangeRead(const NKikimrClient::TKeyValueResponse::TRe
};
}
-void TPartition::HandleDataRead(const NKikimrClient::TResponse& response, const TActorContext& ctx)
-{
+void TPartition::HandleDataRead(const NKikimrClient::TResponse& response, const TActorContext& ctx) {
Y_VERIFY(InitState == WaitDataRead);
ui32 currentLevel = 0;
Y_VERIFY(HeadKeys.size() == response.ReadResultSize());
@@ -1716,7 +1671,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
Become(&TThis::StateIdle);
InitDuration = ctx.Now() - CreationTime;
InitDone = true;
- Counters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds());
+ TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds());
FillReadFromTimestamps(Config, ctx);
@@ -1744,11 +1699,11 @@ void TPartition::InitComplete(const TActorContext& ctx) {
Y_VERIFY(userInfoPair.second.Offset >= 0);
ReadTimestampForOffset(userInfoPair.first, userInfoPair.second, ctx);
}
- if (PartitionLabeledCounters) {
- PartitionLabeledCounters->GetCounters()[METRIC_INIT_TIME] = InitDuration.MilliSeconds();
- PartitionLabeledCounters->GetCounters()[METRIC_LIFE_TIME] = CreationTime.MilliSeconds();
- PartitionLabeledCounters->GetCounters()[METRIC_PARTITIONS] = 1;
- ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *PartitionLabeledCounters));
+ if (PartitionCounters) {
+ PartitionCounters->GetCounters()[METRIC_INIT_TIME] = InitDuration.MilliSeconds();
+ PartitionCounters->GetCounters()[METRIC_LIFE_TIME] = CreationTime.MilliSeconds();
+ PartitionCounters->GetCounters()[METRIC_PARTITIONS] = 1;
+ ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *PartitionCounters));
}
UpdateUserInfoEndOffset(ctx.Now());
@@ -1785,7 +1740,7 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c
//cookie is generated. but answer will be sent when all inflight writes will be done - they in the same queue 'Requests'
Requests.emplace_back(TOwnershipMsg{ev->Cookie, it->second.OwnerCookie}, WriteQuota.GetQuotedTime(), ctx.Now().MilliSeconds(), 0);
- Counters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize);
+ TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize);
UpdateWriteBufferIsFullState(ctx.Now());
ProcessReserveRequests(ctx);
@@ -1799,7 +1754,7 @@ THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THas
Y_VERIFY(ReservedSize >= it->second.ReservedSize);
ReservedSize -= it->second.ReservedSize;
UpdateWriteBufferIsFullState(ctx.Now());
- Counters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize);
+ TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize);
for (auto& ev : it->second.WaitToChangeOwner) { //this request maybe could be done right now
WaitToChangeOwner.push_back(THolder<TEvPQ::TEvChangeOwner>(ev.Release()));
}
@@ -1939,7 +1894,7 @@ void TPartition::ProcessReserveRequests(const TActorContext& ctx) {
break;
}
UpdateWriteBufferIsFullState(ctx.Now());
- Counters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize);
+ TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize);
}
void TPartition::UpdateWriteBufferIsFullState(const TInstant& now) {
@@ -2155,25 +2110,32 @@ void TPartition::LogAndCollectError(NKikimrServices::EServiceKikimr service, con
LogAndCollectError(error, ctx);
}
-std::pair<TInstant, TInstant> TPartition::GetTime(const TUserInfo& userInfo, ui64 offset) const
-{
+std::pair<TInstant, TInstant> TPartition::GetTime(const TUserInfo& userInfo, ui64 offset) const {
TInstant wtime = userInfo.WriteTimestamp > TInstant::Zero() ? userInfo.WriteTimestamp : GetWriteTimeEstimate(offset);
return std::make_pair(wtime, userInfo.CreateTimestamp);
}
//zero means no such record
-TInstant TPartition::GetWriteTimeEstimate(ui64 offset) const
-{
+TInstant TPartition::GetWriteTimeEstimate(ui64 offset) const {
if (offset < StartOffset) offset = StartOffset;
if (offset >= EndOffset)
return TInstant::Zero();
- const std::deque<TDataKey>& container = (offset < Head.Offset || offset == Head.Offset && Head.PartNo > 0) ? DataKeysBody : HeadKeys;
+ const std::deque<TDataKey>& container =
+ (offset < Head.Offset || offset == Head.Offset && Head.PartNo > 0) ? DataKeysBody : HeadKeys;
Y_VERIFY(!container.empty());
auto it = std::upper_bound(container.begin(), container.end(), offset,
- [](const ui64 offset, const TDataKey& p) { return offset < p.Key.GetOffset() || offset == p.Key.GetOffset() && p.Key.GetPartNo() > 0;});
- Y_VERIFY(it != container.begin(),"Tablet %lu StartOffset %lu, HeadOffset %lu, offset %lu, containter size %lu, first-elem: %s",
- TabletID, StartOffset, Head.Offset, offset, container.size(), container.front().Key.ToString().c_str()); //always greater
- Y_VERIFY(it == container.end() || it->Key.GetOffset() > offset || it->Key.GetOffset() == offset && it->Key.GetPartNo() > 0);
+ [](const ui64 offset, const TDataKey& p) {
+ return offset < p.Key.GetOffset() ||
+ offset == p.Key.GetOffset() && p.Key.GetPartNo() > 0;
+ });
+ // Always greater
+ Y_VERIFY(it != container.begin(),
+ "Tablet %lu StartOffset %lu, HeadOffset %lu, offset %lu, containter size %lu, first-elem: %s",
+ TabletID, StartOffset, Head.Offset, offset, container.size(),
+ container.front().Key.ToString().c_str());
+ Y_VERIFY(it == container.end() ||
+ it->Key.GetOffset() > offset ||
+ it->Key.GetOffset() == offset && it->Key.GetPartNo() > 0);
--it;
if (it != container.begin())
--it;
@@ -2186,7 +2148,7 @@ void TPartition::Handle(TEvPQ::TEvGetClientOffset::TPtr& ev, const TActorContext
Y_VERIFY(userInfo.Offset >= -1, "Unexpected Offset: %" PRIi64, userInfo.Offset);
ui64 offset = Max<i64>(userInfo.Offset, 0);
auto ts = GetTime(userInfo, offset);
- Counters.Cumulative()[COUNTER_PQ_GET_CLIENT_OFFSET_OK].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_GET_CLIENT_OFFSET_OK].Increment(1);
ReplyGetClientOffsetOk(ctx, ev->Get()->Cookie, userInfo.Offset, ts.first, ts.second);
}
@@ -2207,7 +2169,7 @@ void TPartition::Handle(TEvPQ::TEvSetClientInfo::TPtr& ev, const TActorContext&
auto& userInfo = UsersInfoStorage.GetOrCreate(user, ctx);
if (userInfo.UserActs.size() > MAX_USER_ACTS) {
- Counters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1);
ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::OVERLOAD,
TStringBuilder() << "too big inflight: " << userInfo.UserActs.size());
return;
@@ -2222,8 +2184,7 @@ void TPartition::Handle(TEvPQ::TEvSetClientInfo::TPtr& ev, const TActorContext&
ProcessUserActs(userInfo, ctx);
}
-void TPartition::ProcessUserActs(TUserInfo& userInfo, const TActorContext& ctx)
-{
+void TPartition::ProcessUserActs(TUserInfo& userInfo, const TActorContext& ctx) {
if (userInfo.WriteInProgress || userInfo.UserActs.empty())
return;
ui64 cookie = ++SetOffsetCookie;
@@ -2280,28 +2241,27 @@ void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& c
if (HasError(*ev->Get())) {
if (info.IsSubscription) {
- Counters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_ERROR].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_ERROR].Increment(1);
}
- Counters.Cumulative()[COUNTER_PQ_READ_ERROR].Increment(1);
- Counters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds());
+ TabletCounters.Cumulative()[COUNTER_PQ_READ_ERROR].Increment(1);
+ TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds());
} else {
if (info.IsSubscription) {
- Counters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_OK].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_OK].Increment(1);
}
const auto& resp = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get())->Response;
- Counters.Cumulative()[COUNTER_PQ_READ_OK].Increment(1);
- Counters.Percentile()[COUNTER_LATENCY_PQ_READ_OK].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds());
- Counters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp.ByteSize());
+ TabletCounters.Cumulative()[COUNTER_PQ_READ_OK].Increment(1);
+ TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_OK].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds());
+ TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp.ByteSize());
}
ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release());
- ReportLabeledCounters(ctx);
+ ReportCounters(ctx);
OnReadRequestFinished(std::move(info), answer.Size);
}
template <typename T> // TCmdReadResult
-static void AddResultBlob(T* read, const TClientBlob& blob, ui64 offset)
-{
+static void AddResultBlob(T* read, const TClientBlob& blob, ui64 offset) {
auto cc = read->AddResult();
cc->SetOffset(offset);
cc->SetData(blob.Data);
@@ -2381,7 +2341,7 @@ TReadAnswer TReadInfo::FormAnswer(
ui32 lastBlobSize = 0;
const TVector<TRequestedBlob>& blobs = response->GetBlobs();
- auto updateUsage = [&](const TClientBlob& blob){
+ auto updateUsage = [&](const TClientBlob& blob) {
size += blob.GetBlobSize();
lastBlobSize += blob.GetBlobSize();
if (blob.IsLastPart()) {
@@ -2542,8 +2502,7 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct
}
-TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize)
-{
+TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize) {
Y_VERIFY(rcount && rsize);
ui32& count = *rcount;
ui32& size = *rsize;
@@ -2590,8 +2549,7 @@ TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(const ui64 startOffse
-TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset)
-{
+TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset) {
Y_UNUSED(readTimestampMs);
ui32& count = *rcount;
ui32& size = *rsize;
@@ -2603,8 +2561,7 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset,
Y_VERIFY(pos != Max<ui32>());
}
ui32 lastBlobSize = 0;
- for (;pos < Head.Batches.size(); ++pos)
- {
+ for (;pos < Head.Batches.size(); ++pos) {
TVector<TClientBlob> blobs;
Head.Batches[pos].UnpackTo(&blobs);
@@ -2659,13 +2616,13 @@ void TPartition::Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx) {
auto read = ev->Get();
if (read->Count == 0) {
- Counters.Cumulative()[COUNTER_PQ_READ_ERROR].Increment(1);
- Counters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor(0);
+ TabletCounters.Cumulative()[COUNTER_PQ_READ_ERROR].Increment(1);
+ TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor(0);
ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, "no infinite flows allowed - count is not set or 0");
return;
}
if (read->Offset < StartOffset) {
- Counters.Cumulative()[COUNTER_PQ_READ_ERROR_SMALL_OFFSET].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_READ_ERROR_SMALL_OFFSET].Increment(1);
read->Offset = StartOffset;
if (read->PartNo > 0) {
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
@@ -2680,8 +2637,8 @@ void TPartition::Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx) {
}
}
if (read->Offset > EndOffset || read->Offset == EndOffset && read->PartNo > 0) {
- Counters.Cumulative()[COUNTER_PQ_READ_ERROR_BIG_OFFSET].Increment(1);
- Counters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor(0);
+ TabletCounters.Cumulative()[COUNTER_PQ_READ_ERROR_BIG_OFFSET].Increment(1);
+ TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor(0);
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
"reading from too big offset - topic " << TopicConverter->GetClientsideName() <<
" partition " << Partition <<
@@ -2702,8 +2659,8 @@ void TPartition::Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx) {
if (!read->SessionId.empty()) {
if (userInfo.Session != read->SessionId) {
- Counters.Cumulative()[COUNTER_PQ_READ_ERROR_NO_SESSION].Increment(1);
- Counters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor(0);
+ TabletCounters.Cumulative()[COUNTER_PQ_READ_ERROR_NO_SESSION].Increment(1);
+ TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor(0);
ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::READ_ERROR_NO_SESSION,
TStringBuilder() << "no such session '" << read->SessionId << "'");
return;
@@ -2783,8 +2740,7 @@ void TPartition::OnReadRequestFinished(TReadInfo&& info, ui64 answerSize) {
}
}
-void TPartition::AnswerCurrentWrites(const TActorContext& ctx)
-{
+void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
ui64 offset = EndOffset;
while (!Responses.empty()) {
const ui64 quotedTime = Responses.front().QuotedTime;
@@ -2822,13 +2778,13 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx)
}
if (!already && partNo + 1 == totalParts) {
if (it == SourceIdStorage.GetInMemorySourceIds().end()) {
- Counters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1);
SourceIdStorage.RegisterSourceId(s, writeResponse.Msg.SeqNo, offset, CurrentTimestamp);
} else {
SourceIdStorage.RegisterSourceId(s, it->second.Updated(writeResponse.Msg.SeqNo, offset, CurrentTimestamp));
}
- Counters.Cumulative()[COUNTER_PQ_WRITE_OK].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_OK].Increment(1);
}
ReplyWrite(
ctx, writeResponse.Cookie, s, seqNo, partNo, totalParts,
@@ -2838,10 +2794,11 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx)
LOG_DEBUG_S(
ctx,
NKikimrServices::PERSQUEUE,
- "Answering for message sourceid: '" << EscapeC(s) << "', Topic: '" << TopicConverter->GetClientsideName()
- << "', Partition: " << Partition
- << ", SeqNo: " << seqNo << ", partNo: " << partNo << ", Offset: " << offset << " is "
- << (already ? "already written" : "stored on disk")
+ "Answering for message sourceid: '" << EscapeC(s) <<
+ "', Topic: '" << TopicConverter->GetClientsideName() <<
+ "', Partition: " << Partition <<
+ ", SeqNo: " << seqNo << ", partNo: " << partNo <<
+ ", Offset: " << offset << " is " << (already ? "already written" : "stored on disk")
);
if (PartitionWriteQuotaWaitCounter) {
PartitionWriteQuotaWaitCounter->IncFor(quotedTime);
@@ -2900,17 +2857,16 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx)
}
-void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo, const TActorContext& ctx)
-{
+void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo, const TActorContext& ctx) {
if (userInfo.ReadScheduled)
return;
userInfo.ReadScheduled = true;
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
- "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
- << " user " << user << " readTimeStamp for offset " << userInfo.Offset << " initiated "
- << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset
- << " ReadingTimestamp " << ReadingTimestamp << " rrg " << userInfo.ReadRuleGeneration
+ "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition <<
+ " user " << user << " readTimeStamp for offset " << userInfo.Offset << " initiated " <<
+ " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset <<
+ " ReadingTimestamp " << ReadingTimestamp << " rrg " << userInfo.ReadRuleGeneration
);
if (ReadingTimestamp) {
@@ -2929,14 +2885,14 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo
userInfo.ReadWriteTimestamp = userInfo.WriteTimestamp;
}
- Counters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_OFFSET_IS_LOST].Increment(1);
- ReportLabeledCounters(ctx);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_OFFSET_IS_LOST].Increment(1);
+ ReportCounters(ctx);
return;
}
if (userInfo.Offset >= (i64)EndOffset || StartOffset == EndOffset) {
userInfo.ReadScheduled = false;
- ReportLabeledCounters(ctx);
+ ReportCounters(ctx);
return;
}
@@ -2965,10 +2921,9 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo
false);
ctx.Send(ctx.SelfID, event.Release());
- Counters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_MISS].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_MISS].Increment(1);
}
-
void TPartition::ProcessTimestampsForNewData(const ui64 prevEndOffset, const TActorContext& ctx) {
for (auto& userInfoPair : UsersInfoStorage.GetAll()) {
if (userInfoPair.second.Offset >= (i64)prevEndOffset && userInfoPair.second.Offset < (i64)EndOffset) {
@@ -2977,16 +2932,16 @@ void TPartition::ProcessTimestampsForNewData(const ui64 prevEndOffset, const TAc
}
}
-
-void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& ctx)
-{
+void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& ctx) {
ReadingTimestamp = false;
auto userInfo = UsersInfoStorage.GetIfExists(ReadingForUser);
if (!userInfo || userInfo->ReadRuleGeneration != ReadingForUserReadRuleGeneration) {
LOG_INFO_S(
ctx, NKikimrServices::PERSQUEUE,
- "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
- << " user " << ReadingForUser << " readTimeStamp for other generation or no client info at all"
+ "Topic '" << TopicConverter->GetClientsideName() << "'" <<
+ " partition " << Partition <<
+ " user " << ReadingForUser <<
+ " readTimeStamp for other generation or no client info at all"
);
ProcessTimestampRead(ctx);
@@ -2995,9 +2950,12 @@ void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext&
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
- "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
- << " user " << ReadingForUser << " readTimeStamp done, result " << userInfo->WriteTimestamp.MilliSeconds()
- << " queuesize " << UpdateUserInfoTimestamp.size() << " startOffset " << StartOffset
+ "Topic '" << TopicConverter->GetClientsideName() << "'" <<
+ " partition " << Partition <<
+ " user " << ReadingForUser <<
+ " readTimeStamp done, result " << userInfo->WriteTimestamp.MilliSeconds() <<
+ " queuesize " << UpdateUserInfoTimestamp.size() <<
+ " startOffset " << StartOffset
);
Y_VERIFY(userInfo->ReadScheduled);
userInfo->ReadScheduled = false;
@@ -3009,10 +2967,11 @@ void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext&
NKikimrServices::PERSQUEUE,
"Reading Timestamp failed for offset " << ReadingForOffset << " ( "<< userInfo->Offset << " ) " << ev->Get()->Response.DebugString()
);
- if (ev->Get()->Response.GetStatus() == NMsgBusProxy::MSTATUS_OK && ev->Get()->Response.GetErrorCode() == NPersQueue::NErrorCode::OK
- && ev->Get()->Response.GetPartitionResponse().HasCmdReadResult()
- && ev->Get()->Response.GetPartitionResponse().GetCmdReadResult().ResultSize() > 0
- && (i64)ev->Get()->Response.GetPartitionResponse().GetCmdReadResult().GetResult(0).GetOffset() >= userInfo->Offset) {
+ if (ev->Get()->Response.GetStatus() == NMsgBusProxy::MSTATUS_OK &&
+ ev->Get()->Response.GetErrorCode() == NPersQueue::NErrorCode::OK &&
+ ev->Get()->Response.GetPartitionResponse().HasCmdReadResult() &&
+ ev->Get()->Response.GetPartitionResponse().GetCmdReadResult().ResultSize() > 0 &&
+ (i64)ev->Get()->Response.GetPartitionResponse().GetCmdReadResult().GetResult(0).GetOffset() >= userInfo->Offset) {
//offsets is inside gap - return timestamp of first record after gap
const auto& res = ev->Get()->Response.GetPartitionResponse().GetCmdReadResult().GetResult(0);
userInfo->WriteTimestamp = TInstant::MilliSeconds(res.GetWriteTimestampMS());
@@ -3027,7 +2986,7 @@ void TPartition::Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext&
UpdateUserInfoTimestamp.push_back(std::make_pair(ReadingForUser, ReadingForUserReadRuleGeneration));
userInfo->ReadScheduled = true;
}
- Counters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_ERROR].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_ERROR].Increment(1);
}
ProcessTimestampRead(ctx);
}
@@ -3050,12 +3009,11 @@ void TPartition::ProcessTimestampRead(const TActorContext& ctx) {
ReadTimestampForOffset(user, *userInfo, ctx);
}
Y_VERIFY(ReadingTimestamp || UpdateUserInfoTimestamp.empty());
- ReportLabeledCounters(ctx);
+ ReportCounters(ctx);
}
-void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx)
-{
+void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) {
ReadingTimestamp = false;
auto userInfo = UsersInfoStorage.GetIfExists(ReadingForUser);
if (!userInfo || userInfo->ReadRuleGeneration != ReadingForUserReadRuleGeneration) {
@@ -3077,8 +3035,7 @@ void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx)
}
-void TPartition::CheckHeadConsistency() const
-{
+void TPartition::CheckHeadConsistency() const {
ui32 p = 0;
for (ui32 j = 0; j < DataKeysHead.size(); ++j) {
ui32 s = 0;
@@ -3092,7 +3049,8 @@ void TPartition::CheckHeadConsistency() const
}
Y_VERIFY(s < DataKeysHead[j].Border());
}
- Y_VERIFY(DataKeysBody.empty() || Head.Offset >= DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount());
+ Y_VERIFY(DataKeysBody.empty() ||
+ Head.Offset >= DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount());
Y_VERIFY(p == HeadKeys.size());
if (!HeadKeys.empty()) {
Y_VERIFY(HeadKeys.size() <= TotalMaxCount);
@@ -3106,8 +3064,7 @@ void TPartition::CheckHeadConsistency() const
}
-void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx)
-{
+void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
if (!CompactedKeys.empty())
HeadKeys.clear();
@@ -3174,8 +3131,7 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx)
}
-ui64 TPartition::GetSizeLag(i64 offset)
-{
+ui64 TPartition::GetSizeLag(i64 offset) {
ui64 sizeLag = 0;
if (!DataKeysBody.empty() && (offset < (i64)Head.Offset || offset == (i64)Head.Offset && Head.PartNo > 0)) { //there will be something in body
auto it = std::upper_bound(DataKeysBody.begin(), DataKeysBody.end(), std::make_pair(offset, 0),
@@ -3194,9 +3150,8 @@ ui64 TPartition::GetSizeLag(i64 offset)
}
-void TPartition::ReportLabeledCounters(const TActorContext& ctx)
-{
- if (!PartitionLabeledCounters) {
+void TPartition::ReportCounters(const TActorContext& ctx) {
+ if (!PartitionCounters) {
return;
}
//per client counters
@@ -3261,23 +3216,25 @@ void TPartition::ReportLabeledCounters(const TActorContext& ctx)
userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_MESSAGE_LAG].Set(EndOffset - userInfo.Offset);
}
-
if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_MESSAGE_LAG].Get() != EndOffset - off) {
haveChanges = true;
userInfo.LabeledCounters->GetCounters()[METRIC_READ_MESSAGE_LAG].Set(EndOffset - off);
userInfo.LabeledCounters->GetCounters()[METRIC_READ_TOTAL_MESSAGE_LAG].Set(EndOffset - off);
}
+
ui64 sizeLag = GetSizeLag(userInfo.Offset);
- ui64 sizeLagRead = GetSizeLag(userInfo.ReadOffset);
if (userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_SIZE_LAG].Get() != sizeLag) {
haveChanges = true;
userInfo.LabeledCounters->GetCounters()[METRIC_COMMIT_SIZE_LAG].Set(sizeLag);
}
+
+ ui64 sizeLagRead = GetSizeLag(userInfo.ReadOffset);
if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_SIZE_LAG].Get() != sizeLagRead) {
haveChanges = true;
userInfo.LabeledCounters->GetCounters()[METRIC_READ_SIZE_LAG].Set(sizeLagRead);
userInfo.LabeledCounters->GetCounters()[METRIC_READ_TOTAL_SIZE_LAG].Set(sizeLag);
}
+
if (userInfo.LabeledCounters->GetCounters()[METRIC_USER_PARTITIONS].Get() == 0) {
haveChanges = true;
userInfo.LabeledCounters->GetCounters()[METRIC_USER_PARTITIONS].Set(1);
@@ -3329,60 +3286,59 @@ void TPartition::ReportLabeledCounters(const TActorContext& ctx)
ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *userInfo.LabeledCounters));
}
}
- //Partition counters
bool haveChanges = false;
- if (SourceIdStorage.GetInMemorySourceIds().size() != PartitionLabeledCounters->GetCounters()[METRIC_MAX_NUM_SIDS].Get()) {
+ if (SourceIdStorage.GetInMemorySourceIds().size() != PartitionCounters->GetCounters()[METRIC_MAX_NUM_SIDS].Get()) {
haveChanges = true;
- PartitionLabeledCounters->GetCounters()[METRIC_MAX_NUM_SIDS].Set(SourceIdStorage.GetInMemorySourceIds().size());
- PartitionLabeledCounters->GetCounters()[METRIC_NUM_SIDS].Set(SourceIdStorage.GetInMemorySourceIds().size());
+ PartitionCounters->GetCounters()[METRIC_MAX_NUM_SIDS].Set(SourceIdStorage.GetInMemorySourceIds().size());
+ PartitionCounters->GetCounters()[METRIC_NUM_SIDS].Set(SourceIdStorage.GetInMemorySourceIds().size());
}
TDuration lifetimeNow = ctx.Now() - SourceIdStorage.MinAvailableTimestamp(ctx.Now());
- if (lifetimeNow.MilliSeconds() != PartitionLabeledCounters->GetCounters()[METRIC_MIN_SID_LIFETIME].Get()) {
+ if (lifetimeNow.MilliSeconds() != PartitionCounters->GetCounters()[METRIC_MIN_SID_LIFETIME].Get()) {
haveChanges = true;
- PartitionLabeledCounters->GetCounters()[METRIC_MIN_SID_LIFETIME].Set(lifetimeNow.MilliSeconds());
+ PartitionCounters->GetCounters()[METRIC_MIN_SID_LIFETIME].Set(lifetimeNow.MilliSeconds());
}
- ui64 headGapSize = DataKeysBody.empty() ? 0 : (Head.Offset - (DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount()));
- ui64 gapSize = GapSize + headGapSize;
- ui32 gapsCount = GapOffsets.size() + (headGapSize ? 1 : 0);
-
- if (gapSize != PartitionLabeledCounters->GetCounters()[METRIC_GAPS_SIZE].Get()) {
+ const ui64 headGapSize = DataKeysBody.empty() ? 0 : (Head.Offset - (DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount()));
+ const ui64 gapSize = GapSize + headGapSize;
+ if (gapSize != PartitionCounters->GetCounters()[METRIC_GAPS_SIZE].Get()) {
haveChanges = true;
- PartitionLabeledCounters->GetCounters()[METRIC_MAX_GAPS_SIZE].Set(gapSize);
- PartitionLabeledCounters->GetCounters()[METRIC_GAPS_SIZE].Set(gapSize);
+ PartitionCounters->GetCounters()[METRIC_MAX_GAPS_SIZE].Set(gapSize);
+ PartitionCounters->GetCounters()[METRIC_GAPS_SIZE].Set(gapSize);
}
- if (gapsCount != PartitionLabeledCounters->GetCounters()[METRIC_GAPS_COUNT].Get()) {
+
+ const ui32 gapsCount = GapOffsets.size() + (headGapSize ? 1 : 0);
+ if (gapsCount != PartitionCounters->GetCounters()[METRIC_GAPS_COUNT].Get()) {
haveChanges = true;
- PartitionLabeledCounters->GetCounters()[METRIC_MAX_GAPS_COUNT].Set(gapsCount);
- PartitionLabeledCounters->GetCounters()[METRIC_GAPS_COUNT].Set(gapsCount);
+ PartitionCounters->GetCounters()[METRIC_MAX_GAPS_COUNT].Set(gapsCount);
+ PartitionCounters->GetCounters()[METRIC_GAPS_COUNT].Set(gapsCount);
}
ui64 speed = WriteQuota.GetTotalSpeed();
- if (speed != PartitionLabeledCounters->GetCounters()[METRIC_WRITE_QUOTA_BYTES].Get()) {
+ if (speed != PartitionCounters->GetCounters()[METRIC_WRITE_QUOTA_BYTES].Get()) {
haveChanges = true;
- PartitionLabeledCounters->GetCounters()[METRIC_WRITE_QUOTA_BYTES].Set(speed);
+ PartitionCounters->GetCounters()[METRIC_WRITE_QUOTA_BYTES].Set(speed);
}
ui64 availSec = WriteQuota.GetAvailableAvgSec(ctx.Now());
- if (availSec != PartitionLabeledCounters->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_SEC].Get()) {
+ if (availSec != PartitionCounters->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_SEC].Get()) {
haveChanges = true;
- PartitionLabeledCounters->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_SEC].Set(availSec);
+ PartitionCounters->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_SEC].Set(availSec);
}
ui64 availMin = WriteQuota.GetAvailableAvgMin(ctx.Now());
- if (availMin != PartitionLabeledCounters->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_MIN].Get()) {
+ if (availMin != PartitionCounters->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_MIN].Get()) {
haveChanges = true;
- PartitionLabeledCounters->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_MIN].Set(availMin);
+ PartitionCounters->GetCounters()[METRIC_MIN_WRITE_QUOTA_BYTES_AVAIL_MIN].Set(availMin);
}
ui32 id = METRIC_TOTAL_WRITE_SPEED_1;
for (ui32 i = 0; i < AvgWriteBytes.size(); ++i) {
ui64 avg = AvgWriteBytes[i].GetValue();
- if (avg != PartitionLabeledCounters->GetCounters()[id].Get()) {
+ if (avg != PartitionCounters->GetCounters()[id].Get()) {
haveChanges = true;
- PartitionLabeledCounters->GetCounters()[id].Set(avg); //total
- PartitionLabeledCounters->GetCounters()[id + 1].Set(avg); //max
+ PartitionCounters->GetCounters()[id].Set(avg); //total
+ PartitionCounters->GetCounters()[id + 1].Set(avg); //max
}
id += 2;
}
@@ -3392,10 +3348,10 @@ void TPartition::ReportLabeledCounters(const TActorContext& ctx)
id = METRIC_TOTAL_QUOTA_SPEED_1;
for (ui32 i = 0; i < AvgQuotaBytes.size(); ++i) {
ui64 avg = AvgQuotaBytes[i].GetValue();
- if (avg != PartitionLabeledCounters->GetCounters()[id].Get()) {
+ if (avg != PartitionCounters->GetCounters()[id].Get()) {
haveChanges = true;
- PartitionLabeledCounters->GetCounters()[id].Set(avg); //total
- PartitionLabeledCounters->GetCounters()[id + 1].Set(avg); //max
+ PartitionCounters->GetCounters()[id].Set(avg); //total
+ PartitionCounters->GetCounters()[id + 1].Set(avg); //max
}
id += 2;
}
@@ -3403,34 +3359,33 @@ void TPartition::ReportLabeledCounters(const TActorContext& ctx)
if (WriteQuota.GetTotalSpeed()) {
ui64 quotaUsage = ui64(AvgQuotaBytes[1].GetValue()) * 1000000 / WriteQuota.GetTotalSpeed() / 60;
- if (quotaUsage != PartitionLabeledCounters->GetCounters()[METRIC_WRITE_QUOTA_USAGE].Get()) {
+ if (quotaUsage != PartitionCounters->GetCounters()[METRIC_WRITE_QUOTA_USAGE].Get()) {
haveChanges = true;
- PartitionLabeledCounters->GetCounters()[METRIC_WRITE_QUOTA_USAGE].Set(quotaUsage);
+ PartitionCounters->GetCounters()[METRIC_WRITE_QUOTA_USAGE].Set(quotaUsage);
}
}
ui64 partSize = BodySize + Head.PackedSize;
- if (partSize != PartitionLabeledCounters->GetCounters()[METRIC_TOTAL_PART_SIZE].Get()) {
+ if (partSize != PartitionCounters->GetCounters()[METRIC_TOTAL_PART_SIZE].Get()) {
haveChanges = true;
- PartitionLabeledCounters->GetCounters()[METRIC_MAX_PART_SIZE].Set(partSize);
- PartitionLabeledCounters->GetCounters()[METRIC_TOTAL_PART_SIZE].Set(partSize);
+ PartitionCounters->GetCounters()[METRIC_MAX_PART_SIZE].Set(partSize);
+ PartitionCounters->GetCounters()[METRIC_TOTAL_PART_SIZE].Set(partSize);
}
- ui64 ts = WriteTimestamp.MilliSeconds();
- if (ts < MIN_TIMESTAMP_MS) ts = Max<i64>();
- if (PartitionLabeledCounters->GetCounters()[METRIC_LAST_WRITE_TIME].Get() != ts) {
+ ui64 ts = (WriteTimestamp.MilliSeconds() < MIN_TIMESTAMP_MS) ? Max<i64>() : WriteTimestamp.MilliSeconds();
+ if (PartitionCounters->GetCounters()[METRIC_LAST_WRITE_TIME].Get() != ts) {
haveChanges = true;
- PartitionLabeledCounters->GetCounters()[METRIC_LAST_WRITE_TIME].Set(ts);
+ PartitionCounters->GetCounters()[METRIC_LAST_WRITE_TIME].Set(ts);
}
ui64 timeLag = WriteLagMs.GetValue();
- if (PartitionLabeledCounters->GetCounters()[METRIC_WRITE_TIME_LAG_MS].Get() != timeLag) {
+ if (PartitionCounters->GetCounters()[METRIC_WRITE_TIME_LAG_MS].Get() != timeLag) {
haveChanges = true;
- PartitionLabeledCounters->GetCounters()[METRIC_WRITE_TIME_LAG_MS].Set(timeLag);
+ PartitionCounters->GetCounters()[METRIC_WRITE_TIME_LAG_MS].Set(timeLag);
}
if (haveChanges) {
- ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *PartitionLabeledCounters));
+ ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *PartitionCounters));
}
}
@@ -3603,18 +3558,17 @@ void TPartition::HandleSetOffsetResponse(NKikimrClient::TResponse& response, con
userInfo->ActualTimestamps = false;
ReadTimestampForOffset(user, *userInfo, ctx);
} else {
- Counters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1);
}
auto counter = setSession ? COUNTER_PQ_CREATE_SESSION_OK : (dropSession ? COUNTER_PQ_DELETE_SESSION_OK : COUNTER_PQ_SET_CLIENT_OFFSET_OK);
- Counters.Cumulative()[counter].Increment(1);
+ TabletCounters.Cumulative()[counter].Increment(1);
}
userInfo->WriteInProgress = false;
ProcessUserActs(*userInfo, ctx);
}
-void TPartition::ScheduleUpdateAvailableSize(const TActorContext& ctx)
-{
+void TPartition::ScheduleUpdateAvailableSize(const TActorContext& ctx) {
ctx.Schedule(UPDATE_AVAIL_SIZE_INTERVAL, new TEvPQ::TEvUpdateAvailableSize());
}
@@ -3646,11 +3600,11 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
SLIBigLatency.Inc();
}
- Counters.Percentile()[COUNTER_LATENCY_PQ_WRITE_CYCLE].IncrementFor(totalLatencyMs);
- Counters.Cumulative()[COUNTER_PQ_WRITE_CYCLE_BYTES_TOTAL].Increment(WriteCycleSize);
- Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_OK].Increment(WriteNewSize);
- Counters.Percentile()[COUNTER_PQ_WRITE_CYCLE_BYTES].IncrementFor(WriteCycleSize);
- Counters.Percentile()[COUNTER_PQ_WRITE_NEW_BYTES].IncrementFor(WriteNewSize);
+ TabletCounters.Percentile()[COUNTER_LATENCY_PQ_WRITE_CYCLE].IncrementFor(totalLatencyMs);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_CYCLE_BYTES_TOTAL].Increment(WriteCycleSize);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_OK].Increment(WriteNewSize);
+ TabletCounters.Percentile()[COUNTER_PQ_WRITE_CYCLE_BYTES].IncrementFor(WriteCycleSize);
+ TabletCounters.Percentile()[COUNTER_PQ_WRITE_NEW_BYTES].IncrementFor(WriteNewSize);
if (BytesWritten)
BytesWritten.Inc(WriteNewSizeInternal);
if (BytesWrittenUncompressed)
@@ -3697,7 +3651,7 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
ProcessTimestampsForNewData(prevEndOffset, ctx);
- ReportLabeledCounters(ctx);
+ ReportCounters(ctx);
HandleWrites(ctx);
}
@@ -3756,8 +3710,8 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c
TMaybe<ui64> offset = ev->Get()->Offset;
if (WriteInflightSize > Config.GetPartitionConfig().GetMaxWriteInflightSize()) {
- Counters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(ev->Get()->Msgs.size());
- Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(sz);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(ev->Get()->Msgs.size());
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(sz);
ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::OVERLOAD,
TStringBuilder() << "try later. Write inflight limit reached. "
@@ -3791,16 +3745,14 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c
}
}
- if (EndOffset - StartOffset >= static_cast<ui64>(Config.GetPartitionConfig().GetMaxCountInPartition())
- || BodySize + Head.PackedSize >= static_cast<ui64>(Config.GetPartitionConfig().GetMaxSizeInPartition())) {
-
- Counters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(ev->Get()->Msgs.size());
- Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(sz);
+ const ui64 maxSize = Config.GetPartitionConfig().GetMaxSizeInPartition();
+ const ui64 maxCount = Config.GetPartitionConfig().GetMaxCountInPartition();
+ if (EndOffset - StartOffset >= maxCount || BodySize + Head.PackedSize >= maxSize) {
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(ev->Get()->Msgs.size());
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(sz);
ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL,
- Sprintf("try later, partition is full - already have %" PRIu64" from %" PRIu64 " count, %" PRIu64 " from %" PRIu64 " size",
- EndOffset - StartOffset, static_cast<ui64>(Config.GetPartitionConfig().GetMaxCountInPartition()),
- BodySize + Head.PackedSize, static_cast<ui64>(Config.GetPartitionConfig().GetMaxSizeInPartition())));
+ Sprintf("try later, partition is full - already have %" PRIu64" from %" PRIu64 " count, %" PRIu64 " from %" PRIu64 " size", EndOffset - StartOffset, maxCount, BodySize + Head.PackedSize, maxSize));
return;
}
ui64 size = 0;
@@ -3815,8 +3767,9 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c
WriteInflightSize += size;
ReservedSize -= decReservedSize;
- Y_VERIFY(size <= decReservedSize || decReservedSize == 0); //TODO: remove decReservedSize == 0
- Counters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize);
+ // TODO: remove decReservedSize == 0
+ Y_VERIFY(size <= decReservedSize || decReservedSize == 0);
+ TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize);
UpdateWriteBufferIsFullState(ctx.Now());
}
@@ -3914,8 +3867,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TAct
Requests.emplace_back(std::move(msg), WriteQuota.GetQuotedTime(), ctx.Now().MilliSeconds(), 0);
}
-std::pair<TKey, ui32> TPartition::Compact(const TKey& key, const ui32 size, bool headCleared)
-{
+std::pair<TKey, ui32> TPartition::Compact(const TKey& key, const ui32 size, bool headCleared) {
std::pair<TKey, ui32> res({key, size});
ui32 x = headCleared ? 0 : Head.PackedSize;
Y_VERIFY(std::accumulate(DataKeysHead.begin(), DataKeysHead.end(), 0u, [](ui32 sum, const TKeyLevel& level){return sum + level.Sum();}) == NewHead.PackedSize + x);
@@ -3937,8 +3889,7 @@ std::pair<TKey, ui32> TPartition::Compact(const TKey& key, const ui32 size, bool
}
-void TPartition::ProcessChangeOwnerRequests(const TActorContext& ctx)
-{
+void TPartition::ProcessChangeOwnerRequests(const TActorContext& ctx) {
while (!WaitToChangeOwner.empty()) {
auto &ev = WaitToChangeOwner.front();
if (OwnerPipes.find(ev->PipeClient) != OwnerPipes.end()) { //this is not request from dead pipe
@@ -3954,8 +3905,7 @@ void TPartition::ProcessChangeOwnerRequests(const TActorContext& ctx)
}
-void TPartition::BecomeIdle(const TActorContext&)
-{
+void TPartition::BecomeIdle(const TActorContext&) {
Become(&TThis::StateIdle);
}
@@ -4011,7 +3961,7 @@ void TPartition::WriteClientInfo(const ui64 cookie, TUserInfo& userInfo, const T
&& (ev->Type != TEvPQ::TEvSetClientInfo::ESCI_DROP_SESSION || !userInfo.Session.empty()) //but allow DropSession request when session is already dropped - for idempotence
|| (ev->Type == TEvPQ::TEvSetClientInfo::ESCI_CREATE_SESSION && !userInfo.Session.empty()
&& (ev->Generation < userInfo.Generation || ev->Generation == userInfo.Generation && ev->Step <= userInfo.Step))) { //old generation request
- Counters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1);
ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::WRONG_COOKIE,
TStringBuilder() << "set offset in already dead session " << ev->SessionId << " actual is " << userInfo.Session);
userInfo.UserActs.pop_front();
@@ -4052,7 +4002,7 @@ void TPartition::WriteClientInfo(const ui64 cookie, TUserInfo& userInfo, const T
);
offset = EndOffset;
ev->Offset = offset;
-/* Counters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1);
+/* TabletCounters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1);
ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::SET_OFFSET_ERROR_COMMIT_TO_FUTURE,
TStringBuilder() << "can't commit to future. Offset " << offset << " EndOffset " << EndOffset);
userInfo.UserActrs.pop_front();
@@ -4112,8 +4062,8 @@ void TPartition::ClearOldHead(const ui64 offset, const ui16 partNo, TEvKeyValue:
void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode = NPersQueue::NErrorCode::BAD_REQUEST) {
ReplyError(ctx, p.Cookie, errorCode, errorStr);
- Counters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1);
- Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(p.Msg.Data.size() + p.Msg.SourceId.size());
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(p.Msg.Data.size() + p.Msg.SourceId.size());
FailBadClient(ctx);
NewHead.Clear();
NewHead.Offset = EndOffset;
@@ -4199,7 +4149,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
WriteInflightSize -= p.Msg.Data.size();
- Counters.Percentile()[COUNTER_LATENCY_PQ_RECEIVE_QUEUE].IncrementFor(ctx.Now().MilliSeconds() - p.Msg.ReceiveTimestamp);
+ TabletCounters.Percentile()[COUNTER_LATENCY_PQ_RECEIVE_QUEUE].IncrementFor(ctx.Now().MilliSeconds() - p.Msg.ReceiveTimestamp);
//check already written
ui64 poffset = p.Offset ? *p.Offset : curOffset;
@@ -4221,11 +4171,11 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
<< " EndOffset " << EndOffset << " CurOffset " << curOffset << " offset " << poffset
);
- Counters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
- Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size());
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size());
} else {
- Counters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1);
- Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size());
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size());
}
TString().swap(p.Msg.Data);
@@ -4465,8 +4415,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
}
-std::pair<TKey, ui32> TPartition::GetNewWriteKey(bool headCleared)
-{
+std::pair<TKey, ui32> TPartition::GetNewWriteKey(bool headCleared) {
bool needCompaction = false;
ui32 HeadSize = headCleared ? 0 : Head.PackedSize;
if (HeadSize + NewHead.PackedSize > 0 && HeadSize + NewHead.PackedSize
@@ -4693,8 +4642,7 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, const TActorCon
return true;
}
-void TPartition::FilterDeadlinedWrites(const TActorContext& ctx)
-{
+void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) {
if (QuotaDeadline == TInstant::Zero() || QuotaDeadline > ctx.Now())
return;
@@ -4702,8 +4650,8 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx)
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::OVERLOAD, "quota exceeded");
if (w.IsWrite()) {
const auto& msg = w.GetWrite().Msg;
- Counters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1);
- Counters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size());
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size());
WriteInflightSize -= msg.Data.size();
}
}
@@ -4714,8 +4662,7 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx)
}
-void TPartition::HandleWrites(const TActorContext& ctx)
-{
+void TPartition::HandleWrites(const TActorContext& ctx) {
Become(&TThis::StateWrite);
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
@@ -4753,8 +4700,7 @@ void TPartition::HandleWrites(const TActorContext& ctx)
}
-void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription)
-{
+void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription) {
ui32 count = 0;
ui32 size = 0;
@@ -4795,13 +4741,13 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
));
const auto& resp = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get())->Response;
if (info.IsSubscription) {
- Counters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_OK].Increment(1);
+ TabletCounters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_OK].Increment(1);
}
- Counters.Cumulative()[COUNTER_PQ_READ_HEAD_ONLY_OK].Increment(1);
- Counters.Percentile()[COUNTER_LATENCY_PQ_READ_HEAD_ONLY].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds());
- Counters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp.ByteSize());
+ TabletCounters.Cumulative()[COUNTER_PQ_READ_HEAD_ONLY_OK].Increment(1);
+ TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_HEAD_ONLY].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds());
+ TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp.ByteSize());
ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release());
- ReportLabeledCounters(ctx);
+ ReportCounters(ctx);
OnReadRequestFinished(std::move(info), answer.Size);
return;
}
@@ -4816,21 +4762,24 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
ctx.Send(BlobCache, request.Release());
}
-void TPartition::Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx)
-{
+void TPartition::Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx) {
const ui64 cookie = ev->Cookie;
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
- "Got quota. Topic: \"" << TopicConverter->GetClientsideName() << "\". Partition: "
- << Partition << ": " << ev->Get()->Result << ". Cookie: " << cookie
+ "Got quota." <<
+ " Topic: \"" << TopicConverter->GetClientsideName() << "\"." <<
+ " Partition: " << Partition << ": " << ev->Get()->Result << "." <<
+ " Cookie: " << cookie
);
// Check
if (Y_UNLIKELY(ev->Get()->Result != TEvQuota::TEvClearance::EResult::Success)) {
- Y_VERIFY(ev->Get()->Result != TEvQuota::TEvClearance::EResult::Deadline); // We set deadline == inf in quota request.
+ // We set deadline == inf in quota request.
+ Y_VERIFY(ev->Get()->Result != TEvQuota::TEvClearance::EResult::Deadline);
LOG_ERROR_S(
ctx, NKikimrServices::PERSQUEUE,
- "Got quota error. Topic: \"" << TopicConverter->GetClientsideName() << "\". Partition " << Partition
- << ": " << ev->Get()->Result
+ "Got quota error." <<
+ " Topic: \"" << TopicConverter->GetClientsideName() << "\"." <<
+ " Partition " << Partition << ": " << ev->Get()->Result
);
ctx.Send(Tablet, new TEvents::TEvPoisonPill());
return;
@@ -4853,25 +4802,24 @@ void TPartition::Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& c
HandleWrites(ctx);
}
-size_t TPartition::GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request)
-{
- if (AppData()->PQConfig.GetQuotingConfig().GetTopicWriteQuotaEntityToLimit() == NKikimrPQ::TPQConfig::TQuotingConfig::USER_PAYLOAD_SIZE) {
+size_t TPartition::GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request) {
+ if (AppData()->PQConfig.GetQuotingConfig().GetTopicWriteQuotaEntityToLimit() ==
+ NKikimrPQ::TPQConfig::TQuotingConfig::USER_PAYLOAD_SIZE) {
return WriteNewSize;
} else {
- size_t dataSize = 0;
- for (const auto& cmdWrite : request.Record.GetCmdWrite()) {
- dataSize += cmdWrite.GetValue().size();
- }
- return dataSize;
+ return std::accumulate(request.Record.GetCmdWrite().begin(), request.Record.GetCmdWrite().end(), 0ul,
+ [](size_t sum, const auto& el) { return sum + el.GetValue().size(); });
}
}
-void TPartition::RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie)
-{
+void TPartition::RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie) {
LOG_DEBUG_S(
TActivationContext::AsActorContext(), NKikimrServices::PERSQUEUE,
- "Send write quota request. Topic: \"" << TopicConverter->GetClientsideName() << "\". Partition: " << Partition
- << ". Amount: " << dataSize << ". Cookie: " << cookie
+ "Send write quota request." <<
+ " Topic: \"" << TopicConverter->GetClientsideName() << "\"." <<
+ " Partition: " << Partition << "." <<
+ " Amount: " << dataSize << "." <<
+ " Cookie: " << cookie
);
Send(MakeQuoterServiceID(),
@@ -4887,8 +4835,7 @@ bool TPartition::WaitingForPreviousBlobQuota() const {
return TopicQuotaRequestCookie != 0;
}
-void TPartition::WriteBlobWithQuota(THolder<TEvKeyValue::TEvRequest>&& request)
-{
+void TPartition::WriteBlobWithQuota(THolder<TEvKeyValue::TEvRequest>&& request) {
// Request quota and write blob.
// Mirrored topics are not quoted in local dc.
const bool skip = !IsQuotingEnabled() || TopicWriteQuotaResourcePath.empty();
@@ -4912,8 +4859,7 @@ void TPartition::WriteBlobWithQuota(THolder<TEvKeyValue::TEvRequest>&& request)
#endif
}
-void TPartition::CalcTopicWriteQuotaParams()
-{
+void TPartition::CalcTopicWriteQuotaParams() {
const auto& pqConfig = AppData()->PQConfig;
const auto& quotingConfig = pqConfig.GetQuotingConfig();
if (IsQuotingEnabled()) { // Mirrored topics are not quoted in local dc.
@@ -4944,8 +4890,8 @@ void TPartition::CalcTopicWriteQuotaParams()
void TPartition::CreateMirrorerActor() {
Mirrorer = MakeHolder<TMirrorerInfo>(
- Register(new TMirrorer(Tablet, SelfId(), TopicConverter, Partition, IsLocalDC, EndOffset, Config.GetPartitionConfig().GetMirrorFrom(), Counters)),
- Counters
+ Register(new TMirrorer(Tablet, SelfId(), TopicConverter, Partition, IsLocalDC, EndOffset, Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters)),
+ TabletCounters
);
}
@@ -4955,5 +4901,4 @@ bool TPartition::IsQuotingEnabled() const {
return IsLocalDC && !pqConfig.GetTopicsAreFirstClassCitizen() && quotingConfig.GetEnableQuoting();
}
-}// NPQ
-}// NKikimr
+} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 0cb644ac12..bdd7ed90db 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -1,218 +1,170 @@
#pragma once
#include <util/generic/set.h>
-#include <util/system/hp_timer.h>
-#include <ydb/core/base/quoter.h>
-#include <ydb/core/keyvalue/keyvalue_events.h>
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/sliding_window/sliding_window.h>
-#include <ydb/core/protos/pqconfig.pb.h>
-#include <ydb/core/persqueue/events/internal.h>
+#include <ydb/core/keyvalue/keyvalue_events.h>
#include <ydb/library/persqueue/counter_time_keeper/counter_time_keeper.h>
-#include <ydb/library/persqueue/topic_parser/topic_parser.h>
-#include "key.h"
#include "blob.h"
-#include "subscriber.h"
#include "header.h"
-#include "user_info.h"
+#include "key.h"
+#include "partition_types.h"
#include "sourceid.h"
-#include "ownerinfo.h"
+#include "subscriber.h"
+#include "user_info.h"
-#include <variant>
-namespace NKikimr {
-namespace NPQ {
+namespace NKikimr::NPQ {
-class TKeyLevel;
+static const ui32 MAX_BLOB_PART_SIZE = 500_KB;
-static const ui32 MAX_BLOB_PART_SIZE = 500 << 10; //500Kb
+ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 headOffset);
typedef TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor> TPartitionLabeledCounters;
-
-struct TDataKey {
- TKey Key;
- ui32 Size;
- TInstant Timestamp;
- ui64 CumulativeSize;
-};
-
-ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 headOffset);
-
+class TKeyLevel;
struct TMirrorerInfo;
class TPartition : public TActorBootstrapped<TPartition> {
private:
- static constexpr ui32 MAX_ERRORS_COUNT_TO_STORE = 10;
+ static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10;
private:
struct THasDataReq;
struct THasDataDeadline;
- //answer for requests when data arrives and drop deadlined requests
- void ProcessHasDataRequests(const TActorContext& ctx);
-
- void FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx);
- void ProcessUserActs(TUserInfo& userInfo, const TActorContext& ctx);
-
void ReplyError(const TActorContext& ctx, const ui64 dst, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error);
void ReplyErrorForStoredWrites(const TActorContext& ctx);
- void ReplyOk(const TActorContext& ctx, const ui64 dst);
- void ReplyWrite(
- const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts,
- ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo,
- ui64 partitionQuotedTime, TDuration topicQuotedTime, ui64 queueTime, ui64 writeTime);
- void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset,
- const TInstant writeTimestamp, const TInstant createTimestamp);
+ void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp);
+ void ReplyOk(const TActorContext& ctx, const ui64 dst);
void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie);
- void Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorContext& ctx);
-
- void Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContext& ctx);
- void Handle(NReadSpeedLimiterEvents::TEvCounters::TPtr& ev, const TActorContext& ctx);
-
- //answer for reads for Timestamps
- void Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx);
- void ProcessTimestampRead(const TActorContext& ctx);
-
- void HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx);
-
- void HandleGetDiskStatus(const NKikimrClient::TResponse& res, const TActorContext& ctx);
- void HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx);
- void HandleDataRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx);
- void HandleMetaRead(const NKikimrClient::TKeyValueResponse::TReadResult& response, const TActorContext& ctx);
+ void ReplyWrite(const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, ui64 partitionQuotedTime, TDuration topicQuotedTime, ui64 queueTime, ui64 writeTime);
- //forms DataKeysBody and other partition's info
- //ctx here only for logging
+ void AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx);
+ void AnswerCurrentWrites(const TActorContext& ctx);
+ void CancelAllWritesOnIdle(const TActorContext& ctx);
+ void CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode);
+ void ClearOldHead(const ui64 offset, const ui16 partNo, TEvKeyValue::TEvRequest* request);
+ void CreateMirrorerActor();
+ void DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const TActorContext& ctx);
+ void FailBadClient(const TActorContext& ctx);
void FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx);
- //will form head and request data keys from head or finish initialization
+ void FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx);
+ void FilterDeadlinedWrites(const TActorContext& ctx);
void FormHeadAndProceed(const TActorContext& ctx);
- void HandleDataRead(const NKikimrClient::TResponse& range, const TActorContext& ctx);
- void InitComplete(const TActorContext& ctx);
-
-
- void Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ctx);
- void ProcessChangeOwnerRequests(const TActorContext& ctx);
- void ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx);
+ void Handle(NReadSpeedLimiterEvents::TEvCounters::TPtr& ev, const TActorContext& ctx);
+ void Handle(NReadSpeedLimiterEvents::TEvResponse::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx);
-
+ void Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx);
-
+ void Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvGetClientOffset::TPtr& ev, const TActorContext& ctx);
-
- void Handle(TEvPQ::TEvUpdateWriteTimestamp::TPtr& ev, const TActorContext& ctx);
-
- void Handle(TEvPQ::TEvSetClientInfo::TPtr& ev, const TActorContext& ctx);
- void WriteClientInfo(const ui64 cookie, TUserInfo& ui, const TActorContext& ctx);
-
-
- void HandleOnInit(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx);
- void HandleOnInit(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQ::TEvGetPartitionClientInfo::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx);
+ void Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvPQ::TEvGetPartitionClientInfo::TPtr& ev, const TActorContext& ctx);
-
- void Handle(TEvPersQueue::TEvReportPartitionError::TPtr& ev, const TActorContext& ctx);
- void LogAndCollectError(const NKikimrPQ::TStatusResponse::TErrorMessage& error, const TActorContext& ctx);
- void LogAndCollectError(NKikimrServices::EServiceKikimr service, const TString& msg, const TActorContext& ctx);
-
- void HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr& ev, const TActorContext& ctx);
- void HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr& ev, const TActorContext& ctx);
-
+ void Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvQuotaDeadlineCheck::TPtr& ev, const TActorContext& ctx);
-
- void UpdateAvailableSize(const TActorContext& ctx);
- void ScheduleUpdateAvailableSize(const TActorContext& ctx);
-
- void Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorContext& ctx);
-
+ void Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ctx);
- void HandleWakeup(const TActorContext& ctx);
+ void Handle(TEvPQ::TEvReserveBytes::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQ::TEvSetClientInfo::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQ::TEvUpdateWriteTimestamp::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPersQueue::TEvReportPartitionError::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx);
void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx);
-
- void Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx);
- void Handle(NReadSpeedLimiterEvents::TEvResponse::TPtr& ev, const TActorContext& ctx);
- void DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const TActorContext& ctx);
- void OnReadRequestFinished(TReadInfo&& info, ui64 answerSize);
-
- // will return rcount and rsize also
- TVector<TRequestedBlob> GetReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize);
- TVector<TClientBlob> GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset);
- void ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription);
-
- void HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx);
- void HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx);
-
- void HandleOnIdle(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx);
- void HandleOnWrite(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx);
-
+ void HandleDataRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx);
+ void HandleDataRead(const NKikimrClient::TResponse& range, const TActorContext& ctx);
+ void HandleGetDiskStatus(const NKikimrClient::TResponse& res, const TActorContext& ctx);
+ void HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx);
+ void HandleMetaRead(const NKikimrClient::TKeyValueResponse::TReadResult& response, const TActorContext& ctx);
+ void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx);
void HandleOnIdle(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx);
- void HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx);
-
+ void HandleOnIdle(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx);
void HandleOnIdle(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx);
+ void HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr& ev, const TActorContext& ctx);
+ void HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx);
+ void HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx);
+ void HandleOnInit(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx);
+ void HandleOnInit(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx);
+ void HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx);
+ void HandleOnWrite(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx);
void HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx);
-
- void Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx);
- bool CleanUp(TEvKeyValue::TEvRequest* request, bool hasWrites, const TActorContext& ctx);
-
- //will fill sourceIds, request and NewHead
- //returns true if head is compacted
- bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, TSourceIdWriter& sourceIdWriter);
- std::pair<TKey, ui32> GetNewWriteKey(bool headCleared);
- void AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx);
-
- bool ProcessWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
- void FilterDeadlinedWrites(const TActorContext& ctx);
- void SetDeadlinesForWrites(const TActorContext& ctx);
-
- void ReadTimestampForOffset(const TString& user, TUserInfo& ui, const TActorContext& ctx);
- void ProcessTimestampsForNewData(const ui64 prevEndOffset, const TActorContext& ctx);
- void ReportLabeledCounters(const TActorContext& ctx);
- ui64 GetSizeLag(i64 offset);
-
- void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx);
+ void HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr& ev, const TActorContext& ctx);
+ void HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx);
void HandleSetOffsetResponse(NKikimrClient::TResponse& response, const TActorContext& ctx);
+ void HandleWakeup(const TActorContext& ctx);
void HandleWriteResponse(const TActorContext& ctx);
- void Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx);
-
-
- void AnswerCurrentWrites(const TActorContext& ctx);
- void SyncMemoryStateWithKVState(const TActorContext& ctx);
- //only Writes container is filled; only DISK_IS_FULL can be here
- void CancelAllWritesOnIdle(const TActorContext& ctx);
- //additional contaiters are half-filled, need to clear them too
- struct TWriteMsg; // forward
- void CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr,
- const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode);
+ void InitComplete(const TActorContext& ctx);
+ void InitUserInfoForImportantClients(const TActorContext& ctx);
+ void LogAndCollectError(NKikimrServices::EServiceKikimr service, const TString& msg, const TActorContext& ctx);
+ void LogAndCollectError(const NKikimrPQ::TStatusResponse::TErrorMessage& error, const TActorContext& ctx);
- void FailBadClient(const TActorContext& ctx);
- void ClearOldHead(const ui64 offset, const ui16 partNo, TEvKeyValue::TEvRequest* request);
+ void OnReadRequestFinished(TReadInfo&& info, ui64 answerSize);
- void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx);
+ void ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx);
+ void ProcessChangeOwnerRequests(const TActorContext& ctx);
+ void ProcessHasDataRequests(const TActorContext& ctx);
+ void ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription);
+ void ProcessReserveRequests(const TActorContext& ctx);
+ void ProcessTimestampRead(const TActorContext& ctx);
+ void ProcessTimestampsForNewData(const ui64 prevEndOffset, const TActorContext& ctx);
+ void ProcessUserActs(TUserInfo& userInfo, const TActorContext& ctx);
- void InitUserInfoForImportantClients(const TActorContext& ctx);
+ void ReadTimestampForOffset(const TString& user, TUserInfo& ui, const TActorContext& ctx);
+ void ReportCounters(const TActorContext& ctx);
+ void ScheduleUpdateAvailableSize(const TActorContext& ctx);
+ void SetDeadlinesForWrites(const TActorContext& ctx);
+ void SetupStreamCounters(const TActorContext& ctx);
+ void SetupTopicCounters(const TActorContext& ctx);
- THashMap<TString, TOwnerInfo>::iterator DropOwner(THashMap<TString, TOwnerInfo>::iterator& it, const TActorContext& ctx);
+ void SyncMemoryStateWithKVState(const TActorContext& ctx);
+ void UpdateAvailableSize(const TActorContext& ctx);
+ void WriteClientInfo(const ui64 cookie, TUserInfo& ui, const TActorContext& ctx);
- void Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContext& ctx);
+ void AddMetaKey(TEvKeyValue::TEvRequest* request);
+ void BecomeIdle(const TActorContext& ctx);
+ void CalcTopicWriteQuotaParams();
+ void CheckHeadConsistency() const;
+ void HandleWrites(const TActorContext& ctx);
+ void RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie);
+ void WriteBlobWithQuota(THolder<TEvKeyValue::TEvRequest>&& request);
- void Handle(TEvPQ::TEvReserveBytes::TPtr& ev, const TActorContext& ctx);
- void ProcessReserveRequests(const TActorContext& ctx);
+ void UpdateUserInfoEndOffset(const TInstant& now);
+ void UpdateWriteBufferIsFullState(const TInstant& now);
- void CreateMirrorerActor();
+ TInstant GetWriteTimeEstimate(ui64 offset) const;
+ bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, TSourceIdWriter& sourceIdWriter);
+ bool CleanUp(TEvKeyValue::TEvRequest* request, bool hasWrites, const TActorContext& ctx);
+ bool CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites, const TActorContext& ctx);
bool IsQuotingEnabled() const;
-
- void SetupTopicCounters(const TActorContext& ctx);
- void SetupStreamCounters(const TActorContext& ctx);
+ bool ProcessWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
+ bool WaitingForPreviousBlobQuota() const;
+ size_t GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request);
+ std::pair<TInstant, TInstant> GetTime(const TUserInfo& userInfo, ui64 offset) const;
+ std::pair<TKey, ui32> Compact(const TKey& key, const ui32 size, bool headCleared);
+ ui32 NextChannel(bool isHead, ui32 blobSize);
+ ui64 GetSizeLag(i64 offset);
+ std::pair<TKey, ui32> GetNewWriteKey(bool headCleared);
+ THashMap<TString, TOwnerInfo>::iterator DropOwner(THashMap<TString, TOwnerInfo>::iterator& it,
+ const TActorContext& ctx);
+ // will return rcount and rsize also
+ TVector<TRequestedBlob> GetReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize);
+ TVector<TClientBlob> GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset);
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -253,7 +205,7 @@ private:
STFUNC(StateInit)
{
- NPersQueue::TCounterTimeKeeper keeper(Counters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]);
+ NPersQueue::TCounterTimeKeeper keeper(TabletCounters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]);
LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, EventStr("StateInit", ev));
@@ -279,7 +231,7 @@ private:
STFUNC(StateIdle)
{
- NPersQueue::TCounterTimeKeeper keeper(Counters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]);
+ NPersQueue::TCounterTimeKeeper keeper(TabletCounters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]);
LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, EventStr("StateIdle", ev));
@@ -326,7 +278,7 @@ private:
STFUNC(StateWrite)
{
- NPersQueue::TCounterTimeKeeper keeper(Counters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]);
+ NPersQueue::TCounterTimeKeeper keeper(TabletCounters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]);
LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, EventStr("StateWrite", ev));
@@ -371,33 +323,7 @@ private:
break;
};
}
-
- bool CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites, const TActorContext& ctx);
- std::pair<TKey, ui32> Compact(const TKey& key, const ui32 size, bool headCleared);
-
- void HandleWrites(const TActorContext& ctx);
- void BecomeIdle(const TActorContext& ctx);
-
- void CheckHeadConsistency() const;
-
- std::pair<TInstant, TInstant> GetTime(const TUserInfo& userInfo, ui64 offset) const;
- TInstant GetWriteTimeEstimate(ui64 offset) const;
-
- ui32 NextChannel(bool isHead, ui32 blobSize);
-
- void WriteBlobWithQuota(THolder<TEvKeyValue::TEvRequest>&& request);
- void AddMetaKey(TEvKeyValue::TEvRequest* request);
-
- size_t GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request);
- void RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie);
- void CalcTopicWriteQuotaParams();
- bool WaitingForPreviousBlobQuota() const;
-
private:
- void UpdateUserInfoEndOffset(const TInstant& now);
-
- void UpdateWriteBufferIsFullState(const TInstant& now);
-
enum EInitState {
WaitDiskStatus,
WaitInfoRange,
@@ -406,13 +332,6 @@ private:
WaitMetaRead
};
-
- struct TUserCookie {
- TString User;
- ui64 Cookie;
- };
-
-
ui64 TabletID;
ui32 Partition;
NKikimrPQ::TPQTabletConfig Config;
@@ -440,111 +359,6 @@ private:
EInitState InitState;
- struct TWriteMsg {
- ui64 Cookie;
- TMaybe<ui64> Offset;
- TEvPQ::TEvWrite::TMsg Msg;
- };
-
- struct TOwnershipMsg {
- ui64 Cookie;
- TString OwnerCookie;
- };
-
- struct TRegisterMessageGroupMsg {
- ui64 Cookie;
- TEvPQ::TEvRegisterMessageGroup::TBody Body;
-
- explicit TRegisterMessageGroupMsg(TEvPQ::TEvRegisterMessageGroup& ev)
- : Cookie(ev.Cookie)
- , Body(std::move(ev.Body))
- {
- }
- };
-
- struct TDeregisterMessageGroupMsg {
- ui64 Cookie;
- TEvPQ::TEvDeregisterMessageGroup::TBody Body;
-
- explicit TDeregisterMessageGroupMsg(TEvPQ::TEvDeregisterMessageGroup& ev)
- : Cookie(ev.Cookie)
- , Body(std::move(ev.Body))
- {
- }
- };
-
- struct TSplitMessageGroupMsg {
- ui64 Cookie;
- TVector<TEvPQ::TEvDeregisterMessageGroup::TBody> Deregistrations;
- TVector<TEvPQ::TEvRegisterMessageGroup::TBody> Registrations;
-
- explicit TSplitMessageGroupMsg(ui64 cookie)
- : Cookie(cookie)
- {
- }
- };
-
- struct TMessage {
- std::variant<
- TWriteMsg,
- TOwnershipMsg,
- TRegisterMessageGroupMsg,
- TDeregisterMessageGroupMsg,
- TSplitMessageGroupMsg
- > Body;
-
- ui64 QuotedTime;
- ui64 QueueTime;
- ui64 WriteTime;
-
- template <typename T>
- explicit TMessage(T&& body, ui64 quotedTime, ui64 queueTime, ui64 writeTime)
- : Body(std::forward<T>(body))
- , QuotedTime(quotedTime)
- , QueueTime(queueTime)
- , WriteTime(writeTime)
- {
- }
-
- ui64 GetCookie() const {
- switch (Body.index()) {
- case 0:
- return std::get<0>(Body).Cookie;
- case 1:
- return std::get<1>(Body).Cookie;
- case 2:
- return std::get<2>(Body).Cookie;
- case 3:
- return std::get<3>(Body).Cookie;
- case 4:
- return std::get<4>(Body).Cookie;
- default:
- Y_FAIL("unreachable");
- }
- }
-
- #define DEFINE_CHECKER_GETTER(name, i) \
- bool Is##name() const { \
- return Body.index() == i; \
- } \
- const auto& Get##name() const { \
- Y_VERIFY(Is##name()); \
- return std::get<i>(Body); \
- } \
- auto& Get##name() { \
- Y_VERIFY(Is##name()); \
- return std::get<i>(Body); \
- }
-
- DEFINE_CHECKER_GETTER(Write, 0)
- DEFINE_CHECKER_GETTER(Ownership, 1)
- DEFINE_CHECKER_GETTER(RegisterMessageGroup, 2)
- DEFINE_CHECKER_GETTER(DeregisterMessageGroup, 3)
- DEFINE_CHECKER_GETTER(SplitMessageGroup, 4)
-
- #undef DEFINE_CHECKER_GETTER
- };
-
std::deque<TMessage> Requests;
std::deque<TMessage> Responses;
@@ -574,7 +388,7 @@ private:
bool ReadingTimestamp;
TString ReadingForUser;
ui64 ReadingForUserReadRuleGeneration;
- ui64 ReadingForOffset;
+ ui64 ReadingForOffset; // log only
THashMap<ui64, TString> CookieToUser;
ui64 SetOffsetCookie;
@@ -593,8 +407,8 @@ private:
std::deque<THolder<TEvPQ::TEvChangeOwner>> WaitToChangeOwner;
- TTabletCountersBase Counters;
- THolder<TPartitionLabeledCounters> PartitionLabeledCounters;
+ TTabletCountersBase TabletCounters;
+ THolder<TPartitionLabeledCounters> PartitionCounters;
TSubscriber Subscriber;
@@ -621,7 +435,6 @@ private:
TVector<NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>> AvgWriteBytes;
TVector<NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>> AvgQuotaBytes;
-
ui64 ReservedSize;
std::deque<THolder<TEvPQ::TEvReserveBytes>> ReserveRequests;
@@ -637,9 +450,9 @@ private:
THolder<TPercentileCounter> InputTimeLag;
THolder<TPercentileCounter> MessageSize;
TPercentileCounter WriteLatency;
+
NKikimr::NPQ::TMultiCounter SLIBigLatency;
NKikimr::NPQ::TMultiCounter WritesTotal;
-
NKikimr::NPQ::TMultiCounter BytesWritten;
NKikimr::NPQ::TMultiCounter BytesWrittenUncompressed;
NKikimr::NPQ::TMultiCounter BytesWrittenComp;
@@ -647,11 +460,13 @@ private:
// Writing blob with topic quota variables
ui64 TopicQuotaRequestCookie = 0;
+
// Wait topic quota metrics
THolder<TPercentileCounter> TopicWriteQuotaWaitCounter;
TInstant StartTopicQuotaWaitTimeForCurrentBlob;
TInstant WriteStartTime;
TDuration TopicQuotaWaitTimeForCurrentBlob;
+
// Topic quota parameters
TString TopicWriteQuoterPath;
TString TopicWriteQuotaResourcePath;
@@ -662,5 +477,4 @@ private:
THolder<TMirrorerInfo> Mirrorer;
};
-}// NPQ
-}// NKikimr
+} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/partition_types.h b/ydb/core/persqueue/partition_types.h
new file mode 100644
index 0000000000..9b709c990c
--- /dev/null
+++ b/ydb/core/persqueue/partition_types.h
@@ -0,0 +1,130 @@
+#pragma once
+
+#include <ydb/core/persqueue/events/internal.h>
+
+#include <util/generic/fwd.h>
+#include <util/generic/maybe.h>
+
+#include <variant>
+
+
+namespace NKikimr::NPQ {
+
+struct TUserCookie {
+ TString User;
+ ui64 Cookie;
+};
+
+struct TWriteMsg {
+ ui64 Cookie;
+ TMaybe<ui64> Offset;
+ TEvPQ::TEvWrite::TMsg Msg;
+};
+
+struct TOwnershipMsg {
+ ui64 Cookie;
+ TString OwnerCookie;
+};
+
+struct TRegisterMessageGroupMsg {
+ ui64 Cookie;
+ TEvPQ::TEvRegisterMessageGroup::TBody Body;
+
+ explicit TRegisterMessageGroupMsg(TEvPQ::TEvRegisterMessageGroup& ev)
+ : Cookie(ev.Cookie)
+ , Body(std::move(ev.Body))
+ {
+ }
+};
+
+struct TDeregisterMessageGroupMsg {
+ ui64 Cookie;
+ TEvPQ::TEvDeregisterMessageGroup::TBody Body;
+
+ explicit TDeregisterMessageGroupMsg(TEvPQ::TEvDeregisterMessageGroup& ev)
+ : Cookie(ev.Cookie)
+ , Body(std::move(ev.Body))
+ {
+ }
+};
+
+struct TSplitMessageGroupMsg {
+ ui64 Cookie;
+ TVector<TEvPQ::TEvDeregisterMessageGroup::TBody> Deregistrations;
+ TVector<TEvPQ::TEvRegisterMessageGroup::TBody> Registrations;
+
+ explicit TSplitMessageGroupMsg(ui64 cookie)
+ : Cookie(cookie)
+ {
+ }
+};
+
+struct TMessage {
+ std::variant<
+ TWriteMsg,
+ TOwnershipMsg,
+ TRegisterMessageGroupMsg,
+ TDeregisterMessageGroupMsg,
+ TSplitMessageGroupMsg
+ > Body;
+
+ ui64 QuotedTime;
+ ui64 QueueTime;
+ ui64 WriteTime;
+
+ template <typename T>
+ explicit TMessage(T&& body, ui64 quotedTime, ui64 queueTime, ui64 writeTime)
+ : Body(std::forward<T>(body))
+ , QuotedTime(quotedTime)
+ , QueueTime(queueTime)
+ , WriteTime(writeTime)
+ {
+ }
+
+ ui64 GetCookie() const {
+ switch (Body.index()) {
+ case 0:
+ return std::get<0>(Body).Cookie;
+ case 1:
+ return std::get<1>(Body).Cookie;
+ case 2:
+ return std::get<2>(Body).Cookie;
+ case 3:
+ return std::get<3>(Body).Cookie;
+ case 4:
+ return std::get<4>(Body).Cookie;
+ default:
+ Y_FAIL("unreachable");
+ }
+ }
+
+ #define DEFINE_CHECKER_GETTER(name, i) \
+ bool Is##name() const { \
+ return Body.index() == i; \
+ } \
+ const auto& Get##name() const { \
+ Y_VERIFY(Is##name()); \
+ return std::get<i>(Body); \
+ } \
+ auto& Get##name() { \
+ Y_VERIFY(Is##name()); \
+ return std::get<i>(Body); \
+ }
+
+ DEFINE_CHECKER_GETTER(Write, 0)
+ DEFINE_CHECKER_GETTER(Ownership, 1)
+ DEFINE_CHECKER_GETTER(RegisterMessageGroup, 2)
+ DEFINE_CHECKER_GETTER(DeregisterMessageGroup, 3)
+ DEFINE_CHECKER_GETTER(SplitMessageGroup, 4)
+
+ #undef DEFINE_CHECKER_GETTER
+};
+
+struct TDataKey {
+ TKey Key;
+ ui32 Size;
+ TInstant Timestamp;
+ ui64 CumulativeSize;
+};
+
+} // namespace NKikimr
diff --git a/ydb/core/persqueue/pq_ut.cpp b/ydb/core/persqueue/pq_ut.cpp
index e54c24a75b..d68710b156 100644
--- a/ydb/core/persqueue/pq_ut.cpp
+++ b/ydb/core/persqueue/pq_ut.cpp
@@ -21,13 +21,10 @@
namespace NKikimr {
-const static TString TOPIC_NAME = "rt3.dc1--topic";
-Y_UNIT_TEST_SUITE(TPQTest) {
+const static TString TOPIC_NAME = "rt3.dc1--topic";
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-// SINGLE COMMAND TEST FUNCTIONS
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+Y_UNIT_TEST_SUITE(TPQTest) {
Y_UNIT_TEST(TestGroupsBalancer) {
TTestContext tc;
@@ -1190,7 +1187,8 @@ Y_UNIT_TEST(TestWritePQ) {
tc.Prepare(dispatchName, setup, activeZone);
tc.Runtime->SetScheduledLimit(100);
- PQTabletPrepare({}, {{"user", true}}, tc); //important client, lifetimeseconds=0 - never delete
+ // Important client, lifetimeseconds=0 - never delete
+ PQTabletPrepare({}, {{"user", true}}, tc);
TVector<std::pair<ui64, TString>> data, data1, data2;
activeZone = PlainOrSoSlow(true, false);
@@ -1482,7 +1480,6 @@ Y_UNIT_TEST(TestWriteToFullPartition) {
}
-
Y_UNIT_TEST(TestTimeRetention) {
TTestContext tc;
RunTestWithReboots(tc.TabletIds, [&]() {
@@ -1560,7 +1557,6 @@ Y_UNIT_TEST(TestPQPartialRead) {
}, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
TFinalizer finalizer(tc);
tc.Prepare(dispatchName, setup, activeZone);
-
tc.Runtime->SetScheduledLimit(200);
PQTabletPrepare({}, {{"aaa", true}}, tc); //important client - never delete
diff --git a/ydb/core/persqueue/pq_ut.h b/ydb/core/persqueue/pq_ut.h
index de8425f855..753613f528 100644
--- a/ydb/core/persqueue/pq_ut.h
+++ b/ydb/core/persqueue/pq_ut.h
@@ -2,27 +2,10 @@
#include "pq.h"
#include "user_info.h"
-
+#include <ydb/core/testlib/actors/test_runtime.h>
#include <ydb/core/testlib/basics/runtime.h>
-#include <ydb/core/tablet_flat/tablet_flat_executed.h>
-#include <ydb/core/tx/schemeshard/schemeshard.h>
-#include <ydb/public/lib/base/msgbus.h>
-#include <ydb/core/keyvalue/keyvalue_events.h>
-#include <ydb/core/persqueue/events/global.h>
-#include <ydb/core/tablet/tablet_counters_aggregator.h>
-#include <ydb/core/persqueue/key.h>
-#include <ydb/core/keyvalue/keyvalue_events.h>
-#include <ydb/core/persqueue/partition.h>
-#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
-#include <ydb/core/security/ticket_parser.h>
-
-#include <ydb/core/testlib/fake_scheme_shard.h>
#include <ydb/core/testlib/tablet_helpers.h>
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/system/sanitizers.h>
-#include <util/system/valgrind.h>
const bool ENABLE_DETAILED_PQ_LOG = false;
const bool ENABLE_DETAILED_KV_LOG = false;
@@ -233,915 +216,183 @@ struct TTabletPreparationParameters {
ui32 specVersion{0};
i32 storageLimitBytes{0};
};
-void PQTabletPrepare(const TTabletPreparationParameters& parameters,
- const TVector<std::pair<TString, bool>>& users, TTestContext& tc) {
- TAutoPtr<IEventHandle> handle;
- static int version = 0;
- if (parameters.specVersion) {
- version = parameters.specVersion;
- } else {
- ++version;
- }
- for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
- try {
- tc.Runtime->ResetScheduledCount();
-
- THolder<TEvPersQueue::TEvUpdateConfig> request(new TEvPersQueue::TEvUpdateConfig());
- for (ui32 i = 0; i < parameters.partitions; ++i) {
- request->Record.MutableTabletConfig()->AddPartitionIds(i);
- }
- request->Record.MutableTabletConfig()->SetCacheSize(10_MB);
- request->Record.SetTxId(12345);
- auto tabletConfig = request->Record.MutableTabletConfig();
- if (tc.Runtime->GetAppData().PQConfig.GetTopicsAreFirstClassCitizen()) {
- tabletConfig->SetTopicName("topic");
- tabletConfig->SetTopicPath(tc.Runtime->GetAppData().PQConfig.GetDatabase() + "/topic");
- } else {
- tabletConfig->SetTopicName("rt3.dc1--topic");
- tabletConfig->SetTopicPath("/Root/PQ/rt3.dc1--topic");
- }
- tabletConfig->SetTopic("topic");
- tabletConfig->SetVersion(version);
- tabletConfig->SetLocalDC(parameters.localDC);
- tabletConfig->AddReadRules("user");
- tabletConfig->AddReadFromTimestampsMs(parameters.readFromTimestampsMs);
- auto config = tabletConfig->MutablePartitionConfig();
- config->SetMaxCountInPartition(parameters.maxCountInPartition);
- config->SetMaxSizeInPartition(parameters.maxSizeInPartition);
- if (parameters.storageLimitBytes > 0) {
- config->SetStorageLimitBytes(parameters.storageLimitBytes);
- } else {
- config->SetLifetimeSeconds(parameters.deleteTime);
- }
- config->SetSourceIdLifetimeSeconds(TDuration::Hours(1).Seconds());
- if (parameters.sidMaxCount > 0)
- config->SetSourceIdMaxCounts(parameters.sidMaxCount);
- config->SetMaxWriteInflightSize(90'000'000);
- config->SetLowWatermark(parameters.lowWatermark);
-
- for (auto& u : users) {
- if (u.second)
- config->AddImportantClientId(u.first);
- if (u.first != "user")
- tabletConfig->AddReadRules(u.first);
- }
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- TEvPersQueue::TEvUpdateConfigResponse* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvUpdateConfigResponse>(handle);
-
- UNIT_ASSERT(result);
- auto& rec = result->Record;
- UNIT_ASSERT(rec.HasStatus() && rec.GetStatus() == NKikimrPQ::OK);
- UNIT_ASSERT(rec.HasTxId() && rec.GetTxId() == 12345);
- UNIT_ASSERT(rec.HasOrigin() && result->GetOrigin() == 1);
- retriesLeft = 0;
- } catch (NActors::TSchedulingLimitReachedException) {
- UNIT_ASSERT(retriesLeft >= 1);
- }
- }
- TEvKeyValue::TEvResponse *result;
- THolder<TEvKeyValue::TEvRequest> request;
- for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
- try {
-
- request.Reset(new TEvKeyValue::TEvRequest);
- auto read = request->Record.AddCmdRead();
- read->SetKey("_config");
-
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
-
- UNIT_ASSERT(result);
- UNIT_ASSERT(result->Record.HasStatus());
- UNIT_ASSERT_EQUAL(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
- retriesLeft = 0;
- } catch (NActors::TSchedulingLimitReachedException) {
- UNIT_ASSERT(retriesLeft >= 1);
- }
- }
-}
-
-
-
-void BalancerPrepare(const TString topic, const TVector<std::pair<ui32, std::pair<ui64, ui32>>>& map, const ui64 ssId, TTestContext& tc, const bool requireAuth = false) {
- TAutoPtr<IEventHandle> handle;
- static int version = 0;
- ++version;
-
- for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
- try {
- tc.Runtime->ResetScheduledCount();
-
- THolder<TEvPersQueue::TEvUpdateBalancerConfig> request(new TEvPersQueue::TEvUpdateBalancerConfig());
- for (const auto& p : map) {
- auto part = request->Record.AddPartitions();
- part->SetPartition(p.first);
- part->SetGroup(p.second.second);
- part->SetTabletId(p.second.first);
-
- auto tablet = request->Record.AddTablets();
- tablet->SetTabletId(p.second.first);
- tablet->SetOwner(1);
- tablet->SetIdx(p.second.first);
- }
- request->Record.SetTxId(12345);
- request->Record.SetPathId(1);
- request->Record.SetVersion(version);
- request->Record.SetTopicName(topic);
- request->Record.SetPath("path");
- request->Record.SetSchemeShardId(ssId);
- request->Record.MutableTabletConfig()->AddReadRules("client");
- request->Record.MutableTabletConfig()->SetRequireAuthWrite(requireAuth);
- request->Record.MutableTabletConfig()->SetRequireAuthRead(requireAuth);
-
- tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- TEvPersQueue::TEvUpdateConfigResponse* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvUpdateConfigResponse>(handle);
-
- UNIT_ASSERT(result);
- auto& rec = result->Record;
- UNIT_ASSERT(rec.HasStatus() && rec.GetStatus() == NKikimrPQ::OK);
- UNIT_ASSERT(rec.HasTxId() && rec.GetTxId() == 12345);
- UNIT_ASSERT(rec.HasOrigin() && result->GetOrigin() == tc.BalancerTabletId);
- retriesLeft = 0;
- } catch (NActors::TSchedulingLimitReachedException) {
- UNIT_ASSERT(retriesLeft >= 1);
- }
- }
- //TODO: check state
- TTestActorRuntime& runtime = *tc.Runtime;
-
- ForwardToTablet(runtime, tc.BalancerTabletId, tc.Edge, new TEvents::TEvPoisonPill());
- TDispatchOptions rebootOptions;
- rebootOptions.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 2));
- runtime.DispatchEvents(rebootOptions);
-}
-
-
-void PQGetPartInfo(ui64 startOffset, ui64 endOffset, TTestContext& tc) {
- TAutoPtr<IEventHandle> handle;
- TEvPersQueue::TEvOffsetsResponse *result;
- THolder<TEvPersQueue::TEvOffsets> request;
-
- for (i32 retriesLeft = 3; retriesLeft > 0; --retriesLeft) {
- try {
-
- tc.Runtime->ResetScheduledCount();
- request.Reset(new TEvPersQueue::TEvOffsets);
-
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvOffsetsResponse>(handle);
- UNIT_ASSERT(result);
-
- if (result->Record.PartResultSize() == 0 ||
- result->Record.GetPartResult(0).GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
- tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
- retriesLeft = 3;
- continue;
- }
-
- UNIT_ASSERT(result->Record.PartResultSize());
- UNIT_ASSERT_VALUES_EQUAL((ui64)result->Record.GetPartResult(0).GetStartOffset(), startOffset);
- UNIT_ASSERT_VALUES_EQUAL((ui64)result->Record.GetPartResult(0).GetEndOffset(), endOffset);
- retriesLeft = 0;
- } catch (NActors::TSchedulingLimitReachedException) {
- UNIT_ASSERT(retriesLeft > 0);
- }
- }
-
-}
-
-void RestartTablet(TTestContext& tc) {
- TTestActorRuntime& runtime = *tc.Runtime;
-
- ForwardToTablet(runtime, tc.TabletId, tc.Edge, new TEvents::TEvPoisonPill());
- TDispatchOptions rebootOptions;
- rebootOptions.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 2));
- runtime.DispatchEvents(rebootOptions);
-}
-
-
-TActorId SetOwner(const ui32 partition, TTestContext& tc, const TString& owner, bool force) {
- TActorId pipeClient = tc.Runtime->ConnectToPipe(tc.TabletId, tc.Edge, 0, GetPipeConfigWithRetries());
-
- THolder<TEvPersQueue::TEvRequest> request;
-
- request.Reset(new TEvPersQueue::TEvRequest);
- auto req = request->Record.MutablePartitionRequest();
- req->SetPartition(partition);
- req->MutableCmdGetOwnership()->SetOwner(owner);
- req->MutableCmdGetOwnership()->SetForce(force);
- ActorIdToProto(pipeClient, req->MutablePipeClient());
-
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries(), pipeClient);
- return pipeClient;
-}
-
-TActorId RegisterReadSession(const TString& session, TTestContext& tc, const TVector<ui32>& groups = {}) {
- TActorId pipeClient = tc.Runtime->ConnectToPipe(tc.BalancerTabletId, tc.Edge, 0, GetPipeConfigWithRetries());
-
- THolder<TEvPersQueue::TEvRegisterReadSession> request;
-
- request.Reset(new TEvPersQueue::TEvRegisterReadSession);
- auto& req = request->Record;
- req.SetSession(session);
- ActorIdToProto(pipeClient, req.MutablePipeClient());
- req.SetClientId("user");
- for (const auto& g : groups) {
- req.AddGroups(g);
- }
-
- tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries(), pipeClient);
- return pipeClient;
-}
-
-void WaitSessionKill(TTestContext& tc) {
- TAutoPtr<IEventHandle> handle;
-
- tc.Runtime->ResetScheduledCount();
-
- TEvPersQueue::TEvError *result;
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvError>(handle);
- UNIT_ASSERT(result);
- Cerr << "ANS: " << result->Record << "\n";
-// UNIT_ASSERT_EQUAL(result->Record.GetSession(), session);
-}
-
-
-void WaitPartition(const TString &session, TTestContext& tc, ui32 partition, const TString& sessionToRelease, const TString& topic, const TActorId& pipe, bool ok = true) {
- TAutoPtr<IEventHandle> handle;
-
- tc.Runtime->ResetScheduledCount();
-
- for (ui32 i = 0; i < 3; ++i) {
- Cerr << "STEP " << i << " ok " << ok << "\n";
-
- try {
- tc.Runtime->ResetScheduledCount();
- if (i % 2 == 0) {
- TEvPersQueue::TEvLockPartition *result;
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvLockPartition>(handle);
- UNIT_ASSERT(result);
- Cerr << "ANS: " << result->Record << "\n";
- UNIT_ASSERT(ok);
- UNIT_ASSERT_EQUAL(result->Record.GetSession(), session);
- break;
- } else {
- TEvPersQueue::TEvReleasePartition *result;
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvReleasePartition>(handle);
- UNIT_ASSERT(result);
-
- Cerr << "ANS2: " << result->Record << "\n";
-
- UNIT_ASSERT_EQUAL(result->Record.GetSession(), sessionToRelease);
- UNIT_ASSERT(ok);
-
- THolder<TEvPersQueue::TEvPartitionReleased> request;
-
- request.Reset(new TEvPersQueue::TEvPartitionReleased);
- auto& req = request->Record;
- req.SetSession(sessionToRelease);
- req.SetPartition(partition);
- req.SetTopic(topic);
- req.SetClientId("user");
- ActorIdToProto(pipe, req.MutablePipeClient());
-
- tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries(), pipe);
- }
- } catch (NActors::TSchedulingLimitReachedException) {
- UNIT_ASSERT(i < 2 || !ok);
- } catch (NActors::TEmptyEventQueueException) {
- UNIT_ASSERT(i < 2 || !ok);
- }
- }
-}
-
-
-std::pair<TString, TActorId> CmdSetOwner(const ui32 partition, TTestContext& tc, const TString& owner = "default", bool force = true) {
- TAutoPtr<IEventHandle> handle;
- TEvPersQueue::TEvResponse *result;
- TString cookie;
- TActorId pipeClient;
- for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
- try {
- tc.Runtime->ResetScheduledCount();
-
- pipeClient = SetOwner(partition, tc, owner, force);
-
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
-
- UNIT_ASSERT(result);
- UNIT_ASSERT(result->Record.HasStatus());
- if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
- tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
- retriesLeft = 3;
- continue;
- }
-
- if (result->Record.GetErrorReason().StartsWith("ownership session is killed by another session with id ")) {
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
- UNIT_ASSERT(result);
- UNIT_ASSERT(result->Record.HasStatus());
- }
-
- if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
- tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
- retriesLeft = 3;
- continue;
- }
-
- UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
- UNIT_ASSERT(result->Record.HasPartitionResponse());
- UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdGetOwnershipResult());
- UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdGetOwnershipResult().HasOwnerCookie());
- cookie = result->Record.GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();
- UNIT_ASSERT(!cookie.empty());
- retriesLeft = 0;
- } catch (NActors::TSchedulingLimitReachedException) {
- UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
- }
- }
- return std::make_pair(cookie, pipeClient);
-}
-
-
-void WritePartData(const ui32 partition, const TString& sourceId, const i64 offset, const ui64 seqNo, const ui16 partNo, const ui16 totalParts,
- const ui32 totalSize, const TString& data, TTestContext& tc, const TString& cookie, i32 msgSeqNo)
-{
- THolder<TEvPersQueue::TEvRequest> request;
- tc.Runtime->ResetScheduledCount();
- request.Reset(new TEvPersQueue::TEvRequest);
- auto req = request->Record.MutablePartitionRequest();
- req->SetPartition(partition);
- req->SetOwnerCookie(cookie);
- req->SetMessageNo(msgSeqNo);
- if (offset != -1)
- req->SetCmdWriteOffset(offset);
- auto write = req->AddCmdWrite();
- write->SetSourceId(sourceId);
- write->SetSeqNo(seqNo);
- write->SetPartNo(partNo);
- write->SetTotalParts(totalParts);
- if (partNo == 0)
- write->SetTotalSize(totalSize);
- write->SetData(data);
-
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
-}
-
-void WritePartDataWithBigMsg(const ui32 partition, const TString& sourceId, const ui64 seqNo, const ui16 partNo, const ui16 totalParts,
- const ui32 totalSize, const TString& data, TTestContext& tc, const TString& cookie, i32 msgSeqNo, ui32 bigMsgSize)
-{
- THolder<TEvPersQueue::TEvRequest> request;
- tc.Runtime->ResetScheduledCount();
- request.Reset(new TEvPersQueue::TEvRequest);
- auto req = request->Record.MutablePartitionRequest();
- req->SetPartition(partition);
- req->SetOwnerCookie(cookie);
- req->SetMessageNo(msgSeqNo);
-
- TString bigData(bigMsgSize, 'a');
-
- auto write = req->AddCmdWrite();
- write->SetSourceId(sourceId);
- write->SetSeqNo(seqNo);
- write->SetData(bigData);
-
- write = req->AddCmdWrite();
- write->SetSourceId(sourceId);
- write->SetSeqNo(seqNo + 1);
- write->SetPartNo(partNo);
- write->SetTotalParts(totalParts);
- if (partNo == 0)
- write->SetTotalSize(totalSize);
- write->SetData(data);
-
-
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
-}
-
-
-
-void WriteData(const ui32 partition, const TString& sourceId, const TVector<std::pair<ui64, TString>> data, TTestContext& tc,
- const TString& cookie, i32 msgSeqNo, i64 offset, bool disableDeduplication = false)
-{
- THolder<TEvPersQueue::TEvRequest> request;
- tc.Runtime->ResetScheduledCount();
- request.Reset(new TEvPersQueue::TEvRequest);
- auto req = request->Record.MutablePartitionRequest();
- req->SetPartition(partition);
- req->SetOwnerCookie(cookie);
- req->SetMessageNo(msgSeqNo);
- if (offset >= 0)
- req->SetCmdWriteOffset(offset);
- for (auto& p : data) {
- auto write = req->AddCmdWrite();
- write->SetSourceId(sourceId);
- write->SetSeqNo(p.first);
- write->SetData(p.second);
- write->SetDisableDeduplication(disableDeduplication);
- }
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
-}
-
-void CmdWrite(const ui32 partition, const TString& sourceId, const TVector<std::pair<ui64, TString>> data,
- TTestContext& tc, bool error = false, const THashSet<ui32>& alreadyWrittenSeqNo = {},
- bool isFirst = false, const TString& ownerCookie = "", i32 msn = -1, i64 offset = -1,
- bool treatWrongCookieAsError = false, bool treatBadOffsetAsError = true,
- bool disableDeduplication = false) {
- TAutoPtr<IEventHandle> handle;
- TEvPersQueue::TEvResponse *result;
-
- ui32& msgSeqNo = tc.MsgSeqNoMap[partition];
- if (msn != -1) msgSeqNo = msn;
- TString cookie = ownerCookie;
- for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
- try {
- WriteData(partition, sourceId, data, tc, cookie, msgSeqNo, offset, disableDeduplication);
- result = tc.Runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle, [](const TEvPersQueue::TEvResponse& ev){
- if (ev.Record.HasPartitionResponse() && ev.Record.GetPartitionResponse().CmdWriteResultSize() > 0 || ev.Record.GetErrorCode() != NPersQueue::NErrorCode::OK)
- return true;
- return false;
- }); //there could be outgoing reads in TestReadSubscription test
-
- UNIT_ASSERT(result);
- UNIT_ASSERT(result->Record.HasStatus());
- if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
- tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
- retriesLeft = 3;
- continue;
- }
-
- if (!treatWrongCookieAsError && result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE) {
- cookie = CmdSetOwner(partition, tc).first;
- msgSeqNo = 0;
- retriesLeft = 3;
- continue;
- }
-
- if (!treatBadOffsetAsError && result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRITE_ERROR_BAD_OFFSET) {
- return;
- }
-
- if (error) {
- UNIT_ASSERT(
- result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL ||
- result->Record.GetErrorCode() == NPersQueue::NErrorCode::BAD_REQUEST ||
- result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE
- );
- break;
- } else {
- Cerr << result->Record.GetErrorReason();
- UNIT_ASSERT_VALUES_EQUAL((ui32)result->Record.GetErrorCode(), (ui32)NPersQueue::NErrorCode::OK);
- }
- UNIT_ASSERT_VALUES_EQUAL(result->Record.GetPartitionResponse().CmdWriteResultSize(), data.size());
-
- for (ui32 i = 0; i < data.size(); ++i) {
- UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasAlreadyWritten());
- UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasOffset());
- UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasMaxSeqNo() ==
- result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetAlreadyWritten());
- if (result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasMaxSeqNo()) {
- UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetMaxSeqNo() >= (i64)data[i].first);
- }
- if (isFirst || offset != -1) {
- UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetAlreadyWritten()
- || result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetOffset() == i + (offset == -1 ? 0 : offset));
- }
- }
- for (ui32 i = 0; i < data.size(); ++i) {
- auto res = result->Record.GetPartitionResponse().GetCmdWriteResult(i);
- UNIT_ASSERT(!alreadyWrittenSeqNo.contains(res.GetSeqNo()) || res.GetAlreadyWritten());
- }
- retriesLeft = 0;
- } catch (NActors::TSchedulingLimitReachedException) {
- UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
- retriesLeft = 3;
- }
- }
- ++msgSeqNo;
-}
-
-
-void ReserveBytes(const ui32 partition, TTestContext& tc,
- const TString& cookie, i32 msgSeqNo, i64 size, const TActorId& pipeClient, bool lastRequest)
-{
- THolder<TEvPersQueue::TEvRequest> request;
- tc.Runtime->ResetScheduledCount();
- request.Reset(new TEvPersQueue::TEvRequest);
- auto req = request->Record.MutablePartitionRequest();
- req->SetPartition(partition);
- req->SetOwnerCookie(cookie);
- req->SetMessageNo(msgSeqNo);
- ActorIdToProto(pipeClient, req->MutablePipeClient());
- req->MutableCmdReserveBytes()->SetSize(size);
- req->MutableCmdReserveBytes()->SetLastRequest(lastRequest);
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
-
- tc.Runtime->DispatchEvents();
-}
-
-
-void CmdReserveBytes(const ui32 partition, TTestContext& tc, const TString& ownerCookie, i32 msn, i64 size, TActorId pipeClient, bool noAnswer = false, bool lastRequest = false) {
- TAutoPtr<IEventHandle> handle;
- TEvPersQueue::TEvResponse *result;
-
- ui32& msgSeqNo = tc.MsgSeqNoMap[partition];
- if (msn != -1) msgSeqNo = msn;
- TString cookie = ownerCookie;
-
- for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
- try {
- ReserveBytes(partition, tc, cookie, msgSeqNo, size, pipeClient, lastRequest);
- result = tc.Runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle, [](const TEvPersQueue::TEvResponse& ev){
- if (!ev.Record.HasPartitionResponse() || !ev.Record.GetPartitionResponse().HasCmdReadResult())
- return true;
- return false;
- }); //there could be outgoing reads in TestReadSubscription test
-
- UNIT_ASSERT(result);
- UNIT_ASSERT(result->Record.HasStatus());
-
- if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
- retriesLeft = 3;
- continue;
- }
-
- if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE) {
- auto p = CmdSetOwner(partition, tc);
- pipeClient = p.second;
- cookie = p.first;
- msgSeqNo = 0;
- retriesLeft = 3;
- continue;
- }
- UNIT_ASSERT(!noAnswer);
-
- UNIT_ASSERT_C(result->Record.GetErrorCode() == NPersQueue::NErrorCode::OK, result->Record);
-
- retriesLeft = 0;
- } catch (NActors::TSchedulingLimitReachedException) {
- if (noAnswer)
- break;
- UNIT_ASSERT(retriesLeft == 2);
- }
- }
- ++msgSeqNo;
-}
-
-
-void CmdSetOffset(const ui32 partition, const TString& user, ui64 offset, bool error, TTestContext& tc, const TString& session = "") {
- TAutoPtr<IEventHandle> handle;
- TEvPersQueue::TEvResponse *result;
- THolder<TEvPersQueue::TEvRequest> request;
- for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
- try {
- tc.Runtime->ResetScheduledCount();
- request.Reset(new TEvPersQueue::TEvRequest);
- auto req = request->Record.MutablePartitionRequest();
- req->SetPartition(partition);
- auto off = req->MutableCmdSetClientOffset();
- off->SetClientId(user);
- off->SetOffset(offset);
- if (!session.empty())
- off->SetSessionId(session);
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
-
- UNIT_ASSERT(result);
- UNIT_ASSERT(result->Record.HasStatus());
- if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
- tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
- retriesLeft = 3;
- continue;
- }
- if ((result->Record.GetErrorCode() == NPersQueue::NErrorCode::SET_OFFSET_ERROR_COMMIT_TO_FUTURE ||
- result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE) && error) {
- break;
- }
- UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
- retriesLeft = 0;
- } catch (NActors::TSchedulingLimitReachedException) {
- UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
- }
- }
-}
-
-
-void CmdCreateSession(const ui32 partition, const TString& user, const TString& session, TTestContext& tc, const i64 offset = 0,
- const ui32 gen = 0, const ui32 step = 0, bool error = false) {
- TAutoPtr<IEventHandle> handle;
- TEvPersQueue::TEvResponse *result;
- THolder<TEvPersQueue::TEvRequest> request;
- for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
- try {
- tc.Runtime->ResetScheduledCount();
- request.Reset(new TEvPersQueue::TEvRequest);
- auto req = request->Record.MutablePartitionRequest();
- req->SetPartition(partition);
- auto off = req->MutableCmdCreateSession();
- off->SetClientId(user);
- off->SetSessionId(session);
- off->SetGeneration(gen);
- off->SetStep(step);
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
-
- UNIT_ASSERT(result);
- UNIT_ASSERT(result->Record.HasStatus());
- if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
- tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
- retriesLeft = 3;
- continue;
- }
-
- if (error) {
- UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::WRONG_COOKIE);
- return;
- }
-
- UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
-
- UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdGetClientOffsetResult());
- auto resp = result->Record.GetPartitionResponse().GetCmdGetClientOffsetResult();
- UNIT_ASSERT(resp.HasOffset() && (i64)resp.GetOffset() == offset);
- retriesLeft = 0;
- } catch (NActors::TSchedulingLimitReachedException) {
- UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
- }
- }
-}
-
-void CmdKillSession(const ui32 partition, const TString& user, const TString& session, TTestContext& tc) {
- TAutoPtr<IEventHandle> handle;
- TEvPersQueue::TEvResponse *result;
- THolder<TEvPersQueue::TEvRequest> request;
- for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
- try {
- tc.Runtime->ResetScheduledCount();
- request.Reset(new TEvPersQueue::TEvRequest);
- auto req = request->Record.MutablePartitionRequest();
- req->SetPartition(partition);
- auto off = req->MutableCmdDeleteSession();
- off->SetClientId(user);
- off->SetSessionId(session);
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
-
- UNIT_ASSERT(result);
- UNIT_ASSERT(result->Record.HasStatus());
- if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
- tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
- retriesLeft = 3;
- continue;
- }
- UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
- retriesLeft = 0;
- } catch (NActors::TSchedulingLimitReachedException) {
- UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
- }
- }
-}
-
-
-
-void CmdGetOffset(const ui32 partition, const TString& user, i64 offset, TTestContext& tc, i64 ctime = -1, ui64 writeTime = 0) {
- TAutoPtr<IEventHandle> handle;
- TEvPersQueue::TEvResponse *result;
- THolder<TEvPersQueue::TEvRequest> request;
- for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
- try {
- tc.Runtime->ResetScheduledCount();
- request.Reset(new TEvPersQueue::TEvRequest);
- auto req = request->Record.MutablePartitionRequest();
- req->SetPartition(partition);
- auto off = req->MutableCmdGetClientOffset();
- off->SetClientId(user);
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
-
- UNIT_ASSERT(result);
- UNIT_ASSERT(result->Record.HasStatus());
-
- if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
- tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
- retriesLeft = 3;
- continue;
- }
-
- UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
- UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdGetClientOffsetResult());
- auto resp = result->Record.GetPartitionResponse().GetCmdGetClientOffsetResult();
- if (ctime != -1) {
- UNIT_ASSERT_EQUAL(resp.HasCreateTimestampMS(), ctime > 0);
- if (ctime > 0) {
- if (ctime == Max<i64>()) {
- UNIT_ASSERT(resp.GetCreateTimestampMS() + 86000000 < TAppData::TimeProvider->Now().MilliSeconds());
- } else {
- UNIT_ASSERT_EQUAL((i64)resp.GetCreateTimestampMS(), ctime);
- }
- }
- }
- Cerr << "CMDGETOFFSET partition " << partition << " waiting for offset " << offset << ": " << resp << "\n";
- UNIT_ASSERT((offset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == offset);
- if (writeTime > 0) {
- UNIT_ASSERT(resp.HasWriteTimestampEstimateMS());
- UNIT_ASSERT(resp.GetWriteTimestampEstimateMS() >= writeTime);
- }
- retriesLeft = 0;
- } catch (NActors::TSchedulingLimitReachedException) {
- UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
- }
- }
-}
-
-
-void CmdUpdateWriteTimestamp(const ui32 partition, ui64 timestamp, TTestContext& tc) {
- TAutoPtr<IEventHandle> handle;
- TEvPersQueue::TEvResponse *result;
- THolder<TEvPersQueue::TEvRequest> request;
- for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
- try {
- tc.Runtime->ResetScheduledCount();
- request.Reset(new TEvPersQueue::TEvRequest);
- auto req = request->Record.MutablePartitionRequest();
- req->SetPartition(partition);
- auto off = req->MutableCmdUpdateWriteTimestamp();
- off->SetWriteTimeMS(timestamp);
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
-
- UNIT_ASSERT(result);
- UNIT_ASSERT(result->Record.HasStatus());
-
- if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
- tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
- retriesLeft = 3;
- continue;
- }
-
- UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
- retriesLeft = 0;
- } catch (NActors::TSchedulingLimitReachedException) {
- UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
- }
- }
-}
-
-
-TVector<TString> CmdSourceIdRead(TTestContext& tc) {
- TAutoPtr<IEventHandle> handle;
- TVector<TString> sourceIds;
- THolder<TEvKeyValue::TEvRequest> request;
- TEvKeyValue::TEvResponse *result;
-
- for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
- try {
- request.Reset(new TEvKeyValue::TEvRequest);
- sourceIds.clear();
- auto read = request->Record.AddCmdReadRange();
- auto range = read->MutableRange();
- NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkSourceId);
- range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size());
- range->SetIncludeFrom(true);
- NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkUserDeprecated);
- range->SetTo(ikeyTo.Data(), ikeyTo.Size());
- range->SetIncludeTo(false);
- Cout << request.Get()->ToString() << Endl;
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
- UNIT_ASSERT(result);
- UNIT_ASSERT(result->Record.HasStatus());
- UNIT_ASSERT_EQUAL(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
- for (ui64 idx = 0; idx < result->Record.ReadRangeResultSize(); ++idx) {
- const auto &readResult = result->Record.GetReadRangeResult(idx);
- UNIT_ASSERT(readResult.HasStatus());
- UNIT_ASSERT_EQUAL(readResult.GetStatus(), NKikimrProto::OK);
- for (size_t j = 0; j < readResult.PairSize(); ++j) {
- const auto& pair = readResult.GetPair(j);
- TString s = pair.GetKey().substr(NPQ::TKeyPrefix::MarkedSize());
- sourceIds.push_back(s);
- }
- }
- retriesLeft = 0;
- } catch (NActors::TSchedulingLimitReachedException) {
- UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
- }
- }
- return sourceIds;
-}
-
-
-void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui32 size, const ui32 resCount, bool timeouted, TTestContext& tc, TVector<i32> offsets = {}, const ui32 maxTimeLagMs = 0, const ui64 readTimestampMs = 0) {
- TAutoPtr<IEventHandle> handle;
- TEvPersQueue::TEvResponse *result;
- THolder<TEvPersQueue::TEvRequest> request;
-
- for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
- try {
- tc.Runtime->ResetScheduledCount();
- request.Reset(new TEvPersQueue::TEvRequest);
- auto req = request->Record.MutablePartitionRequest();
- req->SetPartition(partition);
- auto read = req->MutableCmdRead();
- read->SetOffset(offset);
- read->SetClientId("user");
- read->SetCount(count);
- read->SetBytes(size);
- if (maxTimeLagMs > 0) {
- read->SetMaxTimeLagMs(maxTimeLagMs);
- }
- if (readTimestampMs > 0) {
- read->SetReadTimestampMs(readTimestampMs);
- }
- req->SetCookie(123);
-
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
- result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
-
-
- UNIT_ASSERT(result);
- UNIT_ASSERT(result->Record.HasStatus());
-
- UNIT_ASSERT(result->Record.HasPartitionResponse());
- UNIT_ASSERT_EQUAL(result->Record.GetPartitionResponse().GetCookie(), 123);
- if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
- tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
- retriesLeft = 3;
- continue;
- }
- if (timeouted) {
- UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
- UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdReadResult());
- auto res = result->Record.GetPartitionResponse().GetCmdReadResult();
- UNIT_ASSERT_EQUAL(res.ResultSize(), 0);
- break;
- }
- UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
-
- UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdReadResult());
- auto res = result->Record.GetPartitionResponse().GetCmdReadResult();
-
- UNIT_ASSERT_EQUAL(res.ResultSize(), resCount);
- ui64 off = offset;
-
- for (ui32 i = 0; i < resCount; ++i) {
-
- auto r = res.GetResult(i);
- if (offsets.empty()) {
- if (readTimestampMs == 0) {
- UNIT_ASSERT_EQUAL((ui64)r.GetOffset(), off);
- }
- UNIT_ASSERT(r.GetSourceId().size() == 9 && r.GetSourceId().StartsWith("sourceid"));
- UNIT_ASSERT_EQUAL(ui32(r.GetData()[0]), off);
- UNIT_ASSERT_EQUAL(ui32((unsigned char)r.GetData().back()), r.GetSeqNo() % 256);
- ++off;
- } else {
- UNIT_ASSERT(offsets[i] == (i64)r.GetOffset());
- }
- }
- retriesLeft = 0;
- } catch (NActors::TSchedulingLimitReachedException) {
- UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
- }
- }
-}
-
-
-void FillUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, ui32 partition, ui64 offset) {
- NPQ::TKeyPrefix ikey(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUser);
- ikey.Append(client.c_str(), client.size());
-
- NKikimrPQ::TUserInfo userInfo;
- userInfo.SetOffset(offset);
- userInfo.SetGeneration(1);
- userInfo.SetStep(2);
- userInfo.SetSession("test-session");
- userInfo.SetOffsetRewindSum(10);
- userInfo.SetReadRuleGeneration(1);
- TString out;
- Y_PROTOBUF_SUPPRESS_NODISCARD userInfo.SerializeToString(&out);
-
- TBuffer idata;
- idata.Append(out.c_str(), out.size());
-
- write->SetKey(ikey.Data(), ikey.Size());
- write->SetValue(idata.Data(), idata.Size());
-}
-
-void FillDeprecatedUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, ui32 partition, ui64 offset) {
- TString session = "test-session";
- ui32 gen = 1;
- ui32 step = 2;
- NPQ::TKeyPrefix ikeyDeprecated(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUserDeprecated);
- ikeyDeprecated.Append(client.c_str(), client.size());
-
- TBuffer idataDeprecated = NPQ::NDeprecatedUserData::Serialize(offset, gen, step, session);
- write->SetKey(ikeyDeprecated.Data(), ikeyDeprecated.Size());
- write->SetValue(idataDeprecated.Data(), idataDeprecated.Size());
-}
-
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-// TEST CASES
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-
-} // NKikimr
+void PQTabletPrepare(
+ const TTabletPreparationParameters& parameters,
+ const TVector<std::pair<TString, bool>>& users,
+ TTestContext& tc);
+
+TActorId RegisterReadSession(
+ const TString& session,
+ TTestContext& tc,
+ const TVector<ui32>& groups = {});
+
+TActorId SetOwner(
+ const ui32 partition,
+ TTestContext& tc,
+ const TString& owner,
+ bool force);
+
+TVector<TString> CmdSourceIdRead(TTestContext& tc);
+
+std::pair<TString, TActorId> CmdSetOwner(
+ const ui32 partition,
+ TTestContext& tc,
+ const TString& owner = "default",
+ bool force = true);
+
+void BalancerPrepare(
+ const TString topic,
+ const TVector<std::pair<ui32, std::pair<ui64, ui32>>>& map,
+ const ui64 ssId,
+ TTestContext& tc,
+ const bool requireAuth = false);
+
+void CmdCreateSession(
+ const ui32 partition,
+ const TString& user,
+ const TString& session,
+ TTestContext& tc,
+ const i64 offset = 0,
+ const ui32 gen = 0,
+ const ui32 step = 0,
+ bool error = false);
+
+void CmdGetOffset(
+ const ui32 partition,
+ const TString& user,
+ i64 offset,
+ TTestContext& tc,
+ i64 ctime = -1,
+ ui64 writeTime = 0);
+
+void CmdKillSession(
+ const ui32 partition,
+ const TString& user,
+ const TString& session,
+ TTestContext& tc);
+
+void CmdRead(
+ const ui32 partition,
+ const ui64 offset,
+ const ui32 count,
+ const ui32 size,
+ const ui32 resCount,
+ bool timeouted,
+ TTestContext& tc,
+ TVector<i32> offsets = {},
+ const ui32 maxTimeLagMs = 0,
+ const ui64 readTimestampMs = 0);
+
+void CmdReserveBytes(
+ const ui32 partition,
+ TTestContext& tc,
+ const TString& ownerCookie,
+ i32 msn, i64 size,
+ TActorId pipeClient,
+ bool noAnswer = false,
+ bool lastRequest = false);
+
+void CmdSetOffset(
+ const ui32 partition,
+ const TString& user,
+ ui64 offset,
+ bool error,
+ TTestContext& tc,
+ const TString& session = "");
+
+void CmdUpdateWriteTimestamp(
+ const ui32 partition,
+ ui64 timestamp,
+ TTestContext& tc);
+
+void CmdWrite(
+ const ui32 partition,
+ const TString& sourceId,
+ const TVector<std::pair<ui64, TString>> data,
+ TTestContext& tc,
+ bool error = false,
+ const THashSet<ui32>& alreadyWrittenSeqNo = {},
+ bool isFirst = false,
+ const TString& ownerCookie = "",
+ i32 msn = -1,
+ i64 offset = -1,
+ bool treatWrongCookieAsError = false,
+ bool treatBadOffsetAsError = true,
+ bool disableDeduplication = false);
+
+void FillDeprecatedUserInfo(
+ NKikimrClient::TKeyValueRequest_TCmdWrite* write,
+ const TString& client,
+ ui32 partition,
+ ui64 offset);
+
+void FillUserInfo(
+ NKikimrClient::TKeyValueRequest_TCmdWrite* write,
+ const TString& client,
+ ui32 partition,
+ ui64 offset);
+
+void PQGetPartInfo(
+ ui64 startOffset,
+ ui64 endOffset,
+ TTestContext& tc);
+
+void ReserveBytes(
+ const ui32 partition,
+ TTestContext& tc,
+ const TString& cookie,
+ i32 msgSeqNo,
+ i64 size,
+ const TActorId& pipeClient,
+ bool lastRequest);
+
+void RestartTablet(TTestContext& tc);
+
+void WaitPartition(
+ const TString &session,
+ TTestContext& tc,
+ ui32 partition,
+ const TString& sessionToRelease,
+ const TString& topic,
+ const TActorId& pipe,
+ bool ok = true);
+
+void WaitSessionKill(TTestContext& tc);
+
+void WriteData(
+ const ui32 partition,
+ const TString& sourceId,
+ const TVector<std::pair<ui64, TString>> data,
+ TTestContext& tc,
+ const TString& cookie,
+ i32 msgSeqNo,
+ i64 offset,
+ bool disableDeduplication = false);
+
+void WritePartData(
+ const ui32 partition,
+ const TString& sourceId,
+ const i64 offset,
+ const ui64 seqNo,
+ const ui16 partNo,
+ const ui16 totalParts,
+ const ui32 totalSize,
+ const TString& data,
+ TTestContext& tc,
+ const TString& cookie,
+ i32 msgSeqNo);
+
+void WritePartDataWithBigMsg(
+ const ui32 partition,
+ const TString& sourceId,
+ const ui64 seqNo,
+ const ui16 partNo,
+ const ui16 totalParts,
+ const ui32 totalSize,
+ const TString& data,
+ TTestContext& tc,
+ const TString& cookie,
+ i32 msgSeqNo,
+ ui32 bigMsgSize);
+
+} // namespace NKikimr
diff --git a/ydb/core/persqueue/pq_ut_impl.cpp b/ydb/core/persqueue/pq_ut_impl.cpp
new file mode 100644
index 0000000000..b7c39bed47
--- /dev/null
+++ b/ydb/core/persqueue/pq_ut_impl.cpp
@@ -0,0 +1,908 @@
+#include "pq_ut.h"
+
+#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
+#include <ydb/core/keyvalue/keyvalue_events.h>
+#include <ydb/core/keyvalue/keyvalue_events.h>
+#include <ydb/core/persqueue/events/global.h>
+#include <ydb/core/persqueue/key.h>
+#include <ydb/core/persqueue/partition.h>
+#include <ydb/core/security/ticket_parser.h>
+#include <ydb/core/tablet/tablet_counters_aggregator.h>
+#include <ydb/core/tablet_flat/tablet_flat_executed.h>
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/public/lib/base/msgbus.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+
+namespace NKikimr {
+
+void PQTabletPrepare(const TTabletPreparationParameters& parameters,
+ const TVector<std::pair<TString, bool>>& users, TTestContext& tc) {
+ TAutoPtr<IEventHandle> handle;
+ static int version = 0;
+ if (parameters.specVersion) {
+ version = parameters.specVersion;
+ } else {
+ ++version;
+ }
+ for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
+ try {
+ tc.Runtime->ResetScheduledCount();
+
+ THolder<TEvPersQueue::TEvUpdateConfig> request(new TEvPersQueue::TEvUpdateConfig());
+ for (ui32 i = 0; i < parameters.partitions; ++i) {
+ request->Record.MutableTabletConfig()->AddPartitionIds(i);
+ }
+ request->Record.MutableTabletConfig()->SetCacheSize(10_MB);
+ request->Record.SetTxId(12345);
+ auto tabletConfig = request->Record.MutableTabletConfig();
+ if (tc.Runtime->GetAppData().PQConfig.GetTopicsAreFirstClassCitizen()) {
+ tabletConfig->SetTopicName("topic");
+ tabletConfig->SetTopicPath(tc.Runtime->GetAppData().PQConfig.GetDatabase() + "/topic");
+ } else {
+ tabletConfig->SetTopicName("rt3.dc1--topic");
+ tabletConfig->SetTopicPath("/Root/PQ/rt3.dc1--topic");
+ }
+ tabletConfig->SetTopic("topic");
+ tabletConfig->SetVersion(version);
+ tabletConfig->SetLocalDC(parameters.localDC);
+ tabletConfig->AddReadRules("user");
+ tabletConfig->AddReadFromTimestampsMs(parameters.readFromTimestampsMs);
+ auto config = tabletConfig->MutablePartitionConfig();
+ config->SetMaxCountInPartition(parameters.maxCountInPartition);
+ config->SetMaxSizeInPartition(parameters.maxSizeInPartition);
+ if (parameters.storageLimitBytes > 0) {
+ config->SetStorageLimitBytes(parameters.storageLimitBytes);
+ } else {
+ config->SetLifetimeSeconds(parameters.deleteTime);
+ }
+ config->SetSourceIdLifetimeSeconds(TDuration::Hours(1).Seconds());
+ if (parameters.sidMaxCount > 0)
+ config->SetSourceIdMaxCounts(parameters.sidMaxCount);
+ config->SetMaxWriteInflightSize(90'000'000);
+ config->SetLowWatermark(parameters.lowWatermark);
+
+ for (auto& u : users) {
+ if (u.second)
+ config->AddImportantClientId(u.first);
+ if (u.first != "user")
+ tabletConfig->AddReadRules(u.first);
+ }
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ TEvPersQueue::TEvUpdateConfigResponse* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvUpdateConfigResponse>(handle);
+
+ UNIT_ASSERT(result);
+ auto& rec = result->Record;
+ UNIT_ASSERT(rec.HasStatus() && rec.GetStatus() == NKikimrPQ::OK);
+ UNIT_ASSERT(rec.HasTxId() && rec.GetTxId() == 12345);
+ UNIT_ASSERT(rec.HasOrigin() && result->GetOrigin() == 1);
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT(retriesLeft >= 1);
+ }
+ }
+ TEvKeyValue::TEvResponse *result;
+ THolder<TEvKeyValue::TEvRequest> request;
+ for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
+ try {
+
+ request.Reset(new TEvKeyValue::TEvRequest);
+ auto read = request->Record.AddCmdRead();
+ read->SetKey("_config");
+
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
+
+ UNIT_ASSERT(result);
+ UNIT_ASSERT(result->Record.HasStatus());
+ UNIT_ASSERT_EQUAL(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT(retriesLeft >= 1);
+ }
+ }
+}
+
+void CmdGetOffset(const ui32 partition, const TString& user, i64 offset, TTestContext& tc, i64 ctime,
+ ui64 writeTime) {
+ TAutoPtr<IEventHandle> handle;
+ TEvPersQueue::TEvResponse *result;
+ THolder<TEvPersQueue::TEvRequest> request;
+ for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
+ try {
+ tc.Runtime->ResetScheduledCount();
+ request.Reset(new TEvPersQueue::TEvRequest);
+ auto req = request->Record.MutablePartitionRequest();
+ req->SetPartition(partition);
+ auto off = req->MutableCmdGetClientOffset();
+ off->SetClientId(user);
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
+
+ UNIT_ASSERT(result);
+ UNIT_ASSERT(result->Record.HasStatus());
+
+ if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
+ tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
+ retriesLeft = 3;
+ continue;
+ }
+
+ UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
+ UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdGetClientOffsetResult());
+ auto resp = result->Record.GetPartitionResponse().GetCmdGetClientOffsetResult();
+ if (ctime != -1) {
+ UNIT_ASSERT_EQUAL(resp.HasCreateTimestampMS(), ctime > 0);
+ if (ctime > 0) {
+ if (ctime == Max<i64>()) {
+ UNIT_ASSERT(resp.GetCreateTimestampMS() + 86000000 < TAppData::TimeProvider->Now().MilliSeconds());
+ } else {
+ UNIT_ASSERT_EQUAL((i64)resp.GetCreateTimestampMS(), ctime);
+ }
+ }
+ }
+ Cerr << "CMDGETOFFSET partition " << partition << " waiting for offset " << offset << ": " << resp << "\n";
+ UNIT_ASSERT((offset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == offset);
+ if (writeTime > 0) {
+ UNIT_ASSERT(resp.HasWriteTimestampEstimateMS());
+ UNIT_ASSERT(resp.GetWriteTimestampEstimateMS() >= writeTime);
+ }
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
+ }
+ }
+}
+
+void BalancerPrepare(const TString topic, const TVector<std::pair<ui32, std::pair<ui64, ui32>>>& map, const ui64 ssId, TTestContext& tc, const bool requireAuth) {
+ TAutoPtr<IEventHandle> handle;
+ static int version = 0;
+ ++version;
+
+ for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
+ try {
+ tc.Runtime->ResetScheduledCount();
+
+ THolder<TEvPersQueue::TEvUpdateBalancerConfig> request(new TEvPersQueue::TEvUpdateBalancerConfig());
+ for (const auto& p : map) {
+ auto part = request->Record.AddPartitions();
+ part->SetPartition(p.first);
+ part->SetGroup(p.second.second);
+ part->SetTabletId(p.second.first);
+
+ auto tablet = request->Record.AddTablets();
+ tablet->SetTabletId(p.second.first);
+ tablet->SetOwner(1);
+ tablet->SetIdx(p.second.first);
+ }
+ request->Record.SetTxId(12345);
+ request->Record.SetPathId(1);
+ request->Record.SetVersion(version);
+ request->Record.SetTopicName(topic);
+ request->Record.SetPath("path");
+ request->Record.SetSchemeShardId(ssId);
+ request->Record.MutableTabletConfig()->AddReadRules("client");
+ request->Record.MutableTabletConfig()->SetRequireAuthWrite(requireAuth);
+ request->Record.MutableTabletConfig()->SetRequireAuthRead(requireAuth);
+
+ tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ TEvPersQueue::TEvUpdateConfigResponse* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvUpdateConfigResponse>(handle);
+
+ UNIT_ASSERT(result);
+ auto& rec = result->Record;
+ UNIT_ASSERT(rec.HasStatus() && rec.GetStatus() == NKikimrPQ::OK);
+ UNIT_ASSERT(rec.HasTxId() && rec.GetTxId() == 12345);
+ UNIT_ASSERT(rec.HasOrigin() && result->GetOrigin() == tc.BalancerTabletId);
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT(retriesLeft >= 1);
+ }
+ }
+ //TODO: check state
+ TTestActorRuntime& runtime = *tc.Runtime;
+
+ ForwardToTablet(runtime, tc.BalancerTabletId, tc.Edge, new TEvents::TEvPoisonPill());
+ TDispatchOptions rebootOptions;
+ rebootOptions.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 2));
+ runtime.DispatchEvents(rebootOptions);
+}
+
+void PQGetPartInfo(ui64 startOffset, ui64 endOffset, TTestContext& tc) {
+ TAutoPtr<IEventHandle> handle;
+ TEvPersQueue::TEvOffsetsResponse *result;
+ THolder<TEvPersQueue::TEvOffsets> request;
+
+ for (i32 retriesLeft = 3; retriesLeft > 0; --retriesLeft) {
+ try {
+
+ tc.Runtime->ResetScheduledCount();
+ request.Reset(new TEvPersQueue::TEvOffsets);
+
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvOffsetsResponse>(handle);
+ UNIT_ASSERT(result);
+
+ if (result->Record.PartResultSize() == 0 ||
+ result->Record.GetPartResult(0).GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
+ tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
+ retriesLeft = 3;
+ continue;
+ }
+
+ UNIT_ASSERT(result->Record.PartResultSize());
+ UNIT_ASSERT_VALUES_EQUAL((ui64)result->Record.GetPartResult(0).GetStartOffset(), startOffset);
+ UNIT_ASSERT_VALUES_EQUAL((ui64)result->Record.GetPartResult(0).GetEndOffset(), endOffset);
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT(retriesLeft > 0);
+ }
+ }
+}
+
+void RestartTablet(TTestContext& tc) {
+ TTestActorRuntime& runtime = *tc.Runtime;
+
+ ForwardToTablet(runtime, tc.TabletId, tc.Edge, new TEvents::TEvPoisonPill());
+ TDispatchOptions rebootOptions;
+ rebootOptions.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 2));
+ runtime.DispatchEvents(rebootOptions);
+}
+
+TActorId SetOwner(const ui32 partition, TTestContext& tc, const TString& owner, bool force) {
+ TActorId pipeClient = tc.Runtime->ConnectToPipe(tc.TabletId, tc.Edge, 0, GetPipeConfigWithRetries());
+
+ THolder<TEvPersQueue::TEvRequest> request;
+
+ request.Reset(new TEvPersQueue::TEvRequest);
+ auto req = request->Record.MutablePartitionRequest();
+ req->SetPartition(partition);
+ req->MutableCmdGetOwnership()->SetOwner(owner);
+ req->MutableCmdGetOwnership()->SetForce(force);
+ ActorIdToProto(pipeClient, req->MutablePipeClient());
+
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries(), pipeClient);
+ return pipeClient;
+}
+
+TActorId RegisterReadSession(const TString& session, TTestContext& tc, const TVector<ui32>& groups) {
+ TActorId pipeClient = tc.Runtime->ConnectToPipe(tc.BalancerTabletId, tc.Edge, 0, GetPipeConfigWithRetries());
+
+ THolder<TEvPersQueue::TEvRegisterReadSession> request;
+
+ request.Reset(new TEvPersQueue::TEvRegisterReadSession);
+ auto& req = request->Record;
+ req.SetSession(session);
+ ActorIdToProto(pipeClient, req.MutablePipeClient());
+ req.SetClientId("user");
+ for (const auto& g : groups) {
+ req.AddGroups(g);
+ }
+
+ tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries(), pipeClient);
+ return pipeClient;
+}
+
+void WaitSessionKill(TTestContext& tc) {
+ TAutoPtr<IEventHandle> handle;
+
+ tc.Runtime->ResetScheduledCount();
+
+ TEvPersQueue::TEvError *result;
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvError>(handle);
+ UNIT_ASSERT(result);
+ Cerr << "ANS: " << result->Record << "\n";
+// UNIT_ASSERT_EQUAL(result->Record.GetSession(), session);
+}
+
+void WaitPartition(const TString &session, TTestContext& tc, ui32 partition, const TString& sessionToRelease, const TString& topic, const TActorId& pipe, bool ok) {
+ TAutoPtr<IEventHandle> handle;
+
+ tc.Runtime->ResetScheduledCount();
+
+ for (ui32 i = 0; i < 3; ++i) {
+ Cerr << "STEP " << i << " ok " << ok << "\n";
+
+ try {
+ tc.Runtime->ResetScheduledCount();
+ if (i % 2 == 0) {
+ TEvPersQueue::TEvLockPartition *result;
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvLockPartition>(handle);
+ UNIT_ASSERT(result);
+ Cerr << "ANS: " << result->Record << "\n";
+ UNIT_ASSERT(ok);
+ UNIT_ASSERT_EQUAL(result->Record.GetSession(), session);
+ break;
+ } else {
+ TEvPersQueue::TEvReleasePartition *result;
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvReleasePartition>(handle);
+ UNIT_ASSERT(result);
+
+ Cerr << "ANS2: " << result->Record << "\n";
+
+ UNIT_ASSERT_EQUAL(result->Record.GetSession(), sessionToRelease);
+ UNIT_ASSERT(ok);
+
+ THolder<TEvPersQueue::TEvPartitionReleased> request;
+
+ request.Reset(new TEvPersQueue::TEvPartitionReleased);
+ auto& req = request->Record;
+ req.SetSession(sessionToRelease);
+ req.SetPartition(partition);
+ req.SetTopic(topic);
+ req.SetClientId("user");
+ ActorIdToProto(pipe, req.MutablePipeClient());
+
+ tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries(), pipe);
+ }
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT(i < 2 || !ok);
+ } catch (NActors::TEmptyEventQueueException) {
+ UNIT_ASSERT(i < 2 || !ok);
+ }
+ }
+}
+
+std::pair<TString, TActorId> CmdSetOwner(const ui32 partition, TTestContext& tc, const TString& owner, bool force) {
+ TAutoPtr<IEventHandle> handle;
+ TEvPersQueue::TEvResponse *result;
+ TString cookie;
+ TActorId pipeClient;
+ for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
+ try {
+ tc.Runtime->ResetScheduledCount();
+
+ pipeClient = SetOwner(partition, tc, owner, force);
+
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
+
+ UNIT_ASSERT(result);
+ UNIT_ASSERT(result->Record.HasStatus());
+ if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
+ tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
+ retriesLeft = 3;
+ continue;
+ }
+
+ if (result->Record.GetErrorReason().StartsWith("ownership session is killed by another session with id ")) {
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
+ UNIT_ASSERT(result);
+ UNIT_ASSERT(result->Record.HasStatus());
+ }
+
+ if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
+ tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
+ retriesLeft = 3;
+ continue;
+ }
+
+ UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
+ UNIT_ASSERT(result->Record.HasPartitionResponse());
+ UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdGetOwnershipResult());
+ UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdGetOwnershipResult().HasOwnerCookie());
+ cookie = result->Record.GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();
+ UNIT_ASSERT(!cookie.empty());
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
+ }
+ }
+ return std::make_pair(cookie, pipeClient);
+}
+
+void WritePartData(const ui32 partition, const TString& sourceId, const i64 offset, const ui64 seqNo, const ui16 partNo, const ui16 totalParts,
+ const ui32 totalSize, const TString& data, TTestContext& tc, const TString& cookie, i32 msgSeqNo) {
+ THolder<TEvPersQueue::TEvRequest> request;
+ tc.Runtime->ResetScheduledCount();
+ request.Reset(new TEvPersQueue::TEvRequest);
+ auto req = request->Record.MutablePartitionRequest();
+ req->SetPartition(partition);
+ req->SetOwnerCookie(cookie);
+ req->SetMessageNo(msgSeqNo);
+ if (offset != -1)
+ req->SetCmdWriteOffset(offset);
+ auto write = req->AddCmdWrite();
+ write->SetSourceId(sourceId);
+ write->SetSeqNo(seqNo);
+ write->SetPartNo(partNo);
+ write->SetTotalParts(totalParts);
+ if (partNo == 0)
+ write->SetTotalSize(totalSize);
+ write->SetData(data);
+
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+}
+
+void WritePartDataWithBigMsg(const ui32 partition, const TString& sourceId, const ui64 seqNo, const ui16 partNo, const ui16 totalParts,
+ const ui32 totalSize, const TString& data, TTestContext& tc, const TString& cookie, i32 msgSeqNo, ui32 bigMsgSize) {
+ THolder<TEvPersQueue::TEvRequest> request;
+ tc.Runtime->ResetScheduledCount();
+ request.Reset(new TEvPersQueue::TEvRequest);
+ auto req = request->Record.MutablePartitionRequest();
+ req->SetPartition(partition);
+ req->SetOwnerCookie(cookie);
+ req->SetMessageNo(msgSeqNo);
+
+ TString bigData(bigMsgSize, 'a');
+
+ auto write = req->AddCmdWrite();
+ write->SetSourceId(sourceId);
+ write->SetSeqNo(seqNo);
+ write->SetData(bigData);
+
+ write = req->AddCmdWrite();
+ write->SetSourceId(sourceId);
+ write->SetSeqNo(seqNo + 1);
+ write->SetPartNo(partNo);
+ write->SetTotalParts(totalParts);
+ if (partNo == 0)
+ write->SetTotalSize(totalSize);
+ write->SetData(data);
+
+
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+}
+
+void WriteData(const ui32 partition, const TString& sourceId, const TVector<std::pair<ui64, TString>> data, TTestContext& tc,
+ const TString& cookie, i32 msgSeqNo, i64 offset, bool disableDeduplication) {
+ THolder<TEvPersQueue::TEvRequest> request;
+ tc.Runtime->ResetScheduledCount();
+ request.Reset(new TEvPersQueue::TEvRequest);
+ auto req = request->Record.MutablePartitionRequest();
+ req->SetPartition(partition);
+ req->SetOwnerCookie(cookie);
+ req->SetMessageNo(msgSeqNo);
+ if (offset >= 0)
+ req->SetCmdWriteOffset(offset);
+ for (auto& p : data) {
+ auto write = req->AddCmdWrite();
+ write->SetSourceId(sourceId);
+ write->SetSeqNo(p.first);
+ write->SetData(p.second);
+ write->SetDisableDeduplication(disableDeduplication);
+ }
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+}
+
+void CmdWrite(const ui32 partition, const TString& sourceId, const TVector<std::pair<ui64, TString>> data,
+ TTestContext& tc, bool error, const THashSet<ui32>& alreadyWrittenSeqNo,
+ bool isFirst, const TString& ownerCookie, i32 msn, i64 offset,
+ bool treatWrongCookieAsError, bool treatBadOffsetAsError,
+ bool disableDeduplication) {
+ TAutoPtr<IEventHandle> handle;
+ TEvPersQueue::TEvResponse *result;
+
+ ui32& msgSeqNo = tc.MsgSeqNoMap[partition];
+ if (msn != -1) msgSeqNo = msn;
+ TString cookie = ownerCookie;
+ for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
+ try {
+ WriteData(partition, sourceId, data, tc, cookie, msgSeqNo, offset, disableDeduplication);
+ result = tc.Runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle, [](const TEvPersQueue::TEvResponse& ev){
+ if (ev.Record.HasPartitionResponse() && ev.Record.GetPartitionResponse().CmdWriteResultSize() > 0 || ev.Record.GetErrorCode() != NPersQueue::NErrorCode::OK)
+ return true;
+ return false;
+ }); //there could be outgoing reads in TestReadSubscription test
+
+ UNIT_ASSERT(result);
+ UNIT_ASSERT(result->Record.HasStatus());
+ if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
+ tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
+ retriesLeft = 3;
+ continue;
+ }
+
+ if (!treatWrongCookieAsError && result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE) {
+ cookie = CmdSetOwner(partition, tc).first;
+ msgSeqNo = 0;
+ retriesLeft = 3;
+ continue;
+ }
+
+ if (!treatBadOffsetAsError && result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRITE_ERROR_BAD_OFFSET) {
+ return;
+ }
+
+ if (error) {
+ UNIT_ASSERT(
+ result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL ||
+ result->Record.GetErrorCode() == NPersQueue::NErrorCode::BAD_REQUEST ||
+ result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE
+ );
+ break;
+ } else {
+ Cerr << result->Record.GetErrorReason();
+ UNIT_ASSERT_VALUES_EQUAL((ui32)result->Record.GetErrorCode(), (ui32)NPersQueue::NErrorCode::OK);
+ }
+ UNIT_ASSERT_VALUES_EQUAL(result->Record.GetPartitionResponse().CmdWriteResultSize(), data.size());
+
+ for (ui32 i = 0; i < data.size(); ++i) {
+ UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasAlreadyWritten());
+ UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasOffset());
+ UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasMaxSeqNo() ==
+ result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetAlreadyWritten());
+ if (result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasMaxSeqNo()) {
+ UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetMaxSeqNo() >= (i64)data[i].first);
+ }
+ if (isFirst || offset != -1) {
+ UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetAlreadyWritten()
+ || result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetOffset() == i + (offset == -1 ? 0 : offset));
+ }
+ }
+ for (ui32 i = 0; i < data.size(); ++i) {
+ auto res = result->Record.GetPartitionResponse().GetCmdWriteResult(i);
+ UNIT_ASSERT(!alreadyWrittenSeqNo.contains(res.GetSeqNo()) || res.GetAlreadyWritten());
+ }
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
+ retriesLeft = 3;
+ }
+ }
+ ++msgSeqNo;
+}
+
+void ReserveBytes(const ui32 partition, TTestContext& tc,
+ const TString& cookie, i32 msgSeqNo, i64 size, const TActorId& pipeClient, bool lastRequest) {
+ THolder<TEvPersQueue::TEvRequest> request;
+ tc.Runtime->ResetScheduledCount();
+ request.Reset(new TEvPersQueue::TEvRequest);
+ auto req = request->Record.MutablePartitionRequest();
+ req->SetPartition(partition);
+ req->SetOwnerCookie(cookie);
+ req->SetMessageNo(msgSeqNo);
+ ActorIdToProto(pipeClient, req->MutablePipeClient());
+ req->MutableCmdReserveBytes()->SetSize(size);
+ req->MutableCmdReserveBytes()->SetLastRequest(lastRequest);
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+
+ tc.Runtime->DispatchEvents();
+}
+
+void CmdReserveBytes(const ui32 partition, TTestContext& tc, const TString& ownerCookie, i32 msn, i64 size, TActorId pipeClient, bool noAnswer, bool lastRequest) {
+ TAutoPtr<IEventHandle> handle;
+ TEvPersQueue::TEvResponse *result;
+
+ ui32& msgSeqNo = tc.MsgSeqNoMap[partition];
+ if (msn != -1) msgSeqNo = msn;
+ TString cookie = ownerCookie;
+
+ for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
+ try {
+ ReserveBytes(partition, tc, cookie, msgSeqNo, size, pipeClient, lastRequest);
+ result = tc.Runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle, [](const TEvPersQueue::TEvResponse& ev){
+ if (!ev.Record.HasPartitionResponse() || !ev.Record.GetPartitionResponse().HasCmdReadResult())
+ return true;
+ return false;
+ }); //there could be outgoing reads in TestReadSubscription test
+
+ UNIT_ASSERT(result);
+ UNIT_ASSERT(result->Record.HasStatus());
+
+ if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
+ retriesLeft = 3;
+ continue;
+ }
+
+ if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE) {
+ auto p = CmdSetOwner(partition, tc);
+ pipeClient = p.second;
+ cookie = p.first;
+ msgSeqNo = 0;
+ retriesLeft = 3;
+ continue;
+ }
+ UNIT_ASSERT(!noAnswer);
+
+ UNIT_ASSERT_C(result->Record.GetErrorCode() == NPersQueue::NErrorCode::OK, result->Record);
+
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ if (noAnswer)
+ break;
+ UNIT_ASSERT(retriesLeft == 2);
+ }
+ }
+ ++msgSeqNo;
+}
+
+
+void CmdSetOffset(const ui32 partition, const TString& user, ui64 offset, bool error, TTestContext& tc, const TString& session) {
+ TAutoPtr<IEventHandle> handle;
+ TEvPersQueue::TEvResponse *result;
+ THolder<TEvPersQueue::TEvRequest> request;
+ for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
+ try {
+ tc.Runtime->ResetScheduledCount();
+ request.Reset(new TEvPersQueue::TEvRequest);
+ auto req = request->Record.MutablePartitionRequest();
+ req->SetPartition(partition);
+ auto off = req->MutableCmdSetClientOffset();
+ off->SetClientId(user);
+ off->SetOffset(offset);
+ if (!session.empty())
+ off->SetSessionId(session);
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
+
+ UNIT_ASSERT(result);
+ UNIT_ASSERT(result->Record.HasStatus());
+ if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
+ tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
+ retriesLeft = 3;
+ continue;
+ }
+ if ((result->Record.GetErrorCode() == NPersQueue::NErrorCode::SET_OFFSET_ERROR_COMMIT_TO_FUTURE ||
+ result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRONG_COOKIE) && error) {
+ break;
+ }
+ UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
+ }
+ }
+}
+
+
+void CmdCreateSession(const ui32 partition, const TString& user, const TString& session, TTestContext& tc, const i64 offset, const ui32 gen, const ui32 step, bool error) {
+ TAutoPtr<IEventHandle> handle;
+ TEvPersQueue::TEvResponse *result;
+ THolder<TEvPersQueue::TEvRequest> request;
+ for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
+ try {
+ tc.Runtime->ResetScheduledCount();
+ request.Reset(new TEvPersQueue::TEvRequest);
+ auto req = request->Record.MutablePartitionRequest();
+ req->SetPartition(partition);
+ auto off = req->MutableCmdCreateSession();
+ off->SetClientId(user);
+ off->SetSessionId(session);
+ off->SetGeneration(gen);
+ off->SetStep(step);
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
+
+ UNIT_ASSERT(result);
+ UNIT_ASSERT(result->Record.HasStatus());
+ if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
+ tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
+ retriesLeft = 3;
+ continue;
+ }
+
+ if (error) {
+ UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::WRONG_COOKIE);
+ return;
+ }
+
+ UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
+
+ UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdGetClientOffsetResult());
+ auto resp = result->Record.GetPartitionResponse().GetCmdGetClientOffsetResult();
+ UNIT_ASSERT(resp.HasOffset() && (i64)resp.GetOffset() == offset);
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
+ }
+ }
+}
+
+void CmdKillSession(const ui32 partition, const TString& user, const TString& session, TTestContext& tc) {
+ TAutoPtr<IEventHandle> handle;
+ TEvPersQueue::TEvResponse *result;
+ THolder<TEvPersQueue::TEvRequest> request;
+ for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
+ try {
+ tc.Runtime->ResetScheduledCount();
+ request.Reset(new TEvPersQueue::TEvRequest);
+ auto req = request->Record.MutablePartitionRequest();
+ req->SetPartition(partition);
+ auto off = req->MutableCmdDeleteSession();
+ off->SetClientId(user);
+ off->SetSessionId(session);
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
+
+ UNIT_ASSERT(result);
+ UNIT_ASSERT(result->Record.HasStatus());
+ if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
+ tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
+ retriesLeft = 3;
+ continue;
+ }
+ UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
+ }
+ }
+}
+
+
+void CmdUpdateWriteTimestamp(const ui32 partition, ui64 timestamp, TTestContext& tc) {
+ TAutoPtr<IEventHandle> handle;
+ TEvPersQueue::TEvResponse *result;
+ THolder<TEvPersQueue::TEvRequest> request;
+ for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
+ try {
+ tc.Runtime->ResetScheduledCount();
+ request.Reset(new TEvPersQueue::TEvRequest);
+ auto req = request->Record.MutablePartitionRequest();
+ req->SetPartition(partition);
+ auto off = req->MutableCmdUpdateWriteTimestamp();
+ off->SetWriteTimeMS(timestamp);
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
+
+ UNIT_ASSERT(result);
+ UNIT_ASSERT(result->Record.HasStatus());
+
+ if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
+ tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
+ retriesLeft = 3;
+ continue;
+ }
+
+ UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
+ }
+ }
+}
+
+
+TVector<TString> CmdSourceIdRead(TTestContext& tc) {
+ TAutoPtr<IEventHandle> handle;
+ TVector<TString> sourceIds;
+ THolder<TEvKeyValue::TEvRequest> request;
+ TEvKeyValue::TEvResponse *result;
+
+ for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
+ try {
+ request.Reset(new TEvKeyValue::TEvRequest);
+ sourceIds.clear();
+ auto read = request->Record.AddCmdReadRange();
+ auto range = read->MutableRange();
+ NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkSourceId);
+ range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size());
+ range->SetIncludeFrom(true);
+ NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkUserDeprecated);
+ range->SetTo(ikeyTo.Data(), ikeyTo.Size());
+ range->SetIncludeTo(false);
+ Cout << request.Get()->ToString() << Endl;
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
+ UNIT_ASSERT(result);
+ UNIT_ASSERT(result->Record.HasStatus());
+ UNIT_ASSERT_EQUAL(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
+ for (ui64 idx = 0; idx < result->Record.ReadRangeResultSize(); ++idx) {
+ const auto &readResult = result->Record.GetReadRangeResult(idx);
+ UNIT_ASSERT(readResult.HasStatus());
+ UNIT_ASSERT_EQUAL(readResult.GetStatus(), NKikimrProto::OK);
+ for (size_t j = 0; j < readResult.PairSize(); ++j) {
+ const auto& pair = readResult.GetPair(j);
+ TString s = pair.GetKey().substr(NPQ::TKeyPrefix::MarkedSize());
+ sourceIds.push_back(s);
+ }
+ }
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
+ }
+ }
+ return sourceIds;
+}
+
+
+void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui32 size, const ui32 resCount, bool timeouted, TTestContext& tc, TVector<i32> offsets, const ui32 maxTimeLagMs, const ui64 readTimestampMs) {
+ TAutoPtr<IEventHandle> handle;
+ TEvPersQueue::TEvResponse *result;
+ THolder<TEvPersQueue::TEvRequest> request;
+
+ for (i32 retriesLeft = 2; retriesLeft > 0; --retriesLeft) {
+ try {
+ tc.Runtime->ResetScheduledCount();
+ request.Reset(new TEvPersQueue::TEvRequest);
+ auto req = request->Record.MutablePartitionRequest();
+ req->SetPartition(partition);
+ auto read = req->MutableCmdRead();
+ read->SetOffset(offset);
+ read->SetClientId("user");
+ read->SetCount(count);
+ read->SetBytes(size);
+ if (maxTimeLagMs > 0) {
+ read->SetMaxTimeLagMs(maxTimeLagMs);
+ }
+ if (readTimestampMs > 0) {
+ read->SetReadTimestampMs(readTimestampMs);
+ }
+ req->SetCookie(123);
+
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
+
+
+ UNIT_ASSERT(result);
+ UNIT_ASSERT(result->Record.HasStatus());
+
+ UNIT_ASSERT(result->Record.HasPartitionResponse());
+ UNIT_ASSERT_EQUAL(result->Record.GetPartitionResponse().GetCookie(), 123);
+ if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
+ tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
+ retriesLeft = 3;
+ continue;
+ }
+ if (timeouted) {
+ UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
+ UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdReadResult());
+ auto res = result->Record.GetPartitionResponse().GetCmdReadResult();
+ UNIT_ASSERT_EQUAL(res.ResultSize(), 0);
+ break;
+ }
+ UNIT_ASSERT_EQUAL(result->Record.GetErrorCode(), NPersQueue::NErrorCode::OK);
+
+ UNIT_ASSERT(result->Record.GetPartitionResponse().HasCmdReadResult());
+ auto res = result->Record.GetPartitionResponse().GetCmdReadResult();
+
+ UNIT_ASSERT_EQUAL(res.ResultSize(), resCount);
+ ui64 off = offset;
+
+ for (ui32 i = 0; i < resCount; ++i) {
+
+ auto r = res.GetResult(i);
+ if (offsets.empty()) {
+ if (readTimestampMs == 0) {
+ UNIT_ASSERT_EQUAL((ui64)r.GetOffset(), off);
+ }
+ UNIT_ASSERT(r.GetSourceId().size() == 9 && r.GetSourceId().StartsWith("sourceid"));
+ UNIT_ASSERT_EQUAL(ui32(r.GetData()[0]), off);
+ UNIT_ASSERT_EQUAL(ui32((unsigned char)r.GetData().back()), r.GetSeqNo() % 256);
+ ++off;
+ } else {
+ UNIT_ASSERT(offsets[i] == (i64)r.GetOffset());
+ }
+ }
+ retriesLeft = 0;
+ } catch (NActors::TSchedulingLimitReachedException) {
+ UNIT_ASSERT_VALUES_EQUAL(retriesLeft, 2);
+ }
+ }
+}
+
+
+void FillUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, ui32 partition, ui64 offset) {
+ NPQ::TKeyPrefix ikey(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUser);
+ ikey.Append(client.c_str(), client.size());
+
+ NKikimrPQ::TUserInfo userInfo;
+ userInfo.SetOffset(offset);
+ userInfo.SetGeneration(1);
+ userInfo.SetStep(2);
+ userInfo.SetSession("test-session");
+ userInfo.SetOffsetRewindSum(10);
+ userInfo.SetReadRuleGeneration(1);
+ TString out;
+ Y_PROTOBUF_SUPPRESS_NODISCARD userInfo.SerializeToString(&out);
+
+ TBuffer idata;
+ idata.Append(out.c_str(), out.size());
+
+ write->SetKey(ikey.Data(), ikey.Size());
+ write->SetValue(idata.Data(), idata.Size());
+}
+
+void FillDeprecatedUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, ui32 partition, ui64 offset) {
+ TString session = "test-session";
+ ui32 gen = 1;
+ ui32 step = 2;
+ NPQ::TKeyPrefix ikeyDeprecated(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUserDeprecated);
+ ikeyDeprecated.Append(client.c_str(), client.size());
+
+ TBuffer idataDeprecated = NPQ::NDeprecatedUserData::Serialize(offset, gen, step, session);
+ write->SetKey(ikeyDeprecated.Data(), ikeyDeprecated.Size());
+ write->SetValue(idataDeprecated.Data(), idataDeprecated.Size());
+}
+
+} // namespace NKikimr
diff --git a/ydb/core/persqueue/ut/CMakeLists.darwin.txt b/ydb/core/persqueue/ut/CMakeLists.darwin.txt
index 4928770dfe..2ab8a0cbeb 100644
--- a/ydb/core/persqueue/ut/CMakeLists.darwin.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.darwin.txt
@@ -39,6 +39,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_ut_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/type_codecs_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/user_info_ut.cpp
diff --git a/ydb/core/persqueue/ut/CMakeLists.linux.txt b/ydb/core/persqueue/ut/CMakeLists.linux.txt
index 79d902da8f..bd73cffc77 100644
--- a/ydb/core/persqueue/ut/CMakeLists.linux.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.linux.txt
@@ -43,6 +43,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/metering_sink_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/mirrorer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_ut_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/type_codecs_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/user_info_ut.cpp
diff --git a/ydb/core/persqueue/ut_slow/CMakeLists.darwin.txt b/ydb/core/persqueue/ut_slow/CMakeLists.darwin.txt
index 5d657b27d5..b4a546fecf 100644
--- a/ydb/core/persqueue/ut_slow/CMakeLists.darwin.txt
+++ b/ydb/core/persqueue/ut_slow/CMakeLists.darwin.txt
@@ -34,6 +34,7 @@ target_link_options(ydb-core-persqueue-ut_slow PRIVATE
CoreFoundation
)
target_sources(ydb-core-persqueue-ut_slow PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_ut_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_ut_slow.cpp
)
add_test(
diff --git a/ydb/core/persqueue/ut_slow/CMakeLists.linux.txt b/ydb/core/persqueue/ut_slow/CMakeLists.linux.txt
index ca0df6ce16..038edb3538 100644
--- a/ydb/core/persqueue/ut_slow/CMakeLists.linux.txt
+++ b/ydb/core/persqueue/ut_slow/CMakeLists.linux.txt
@@ -38,6 +38,7 @@ target_link_options(ydb-core-persqueue-ut_slow PRIVATE
-ldl
)
target_sources(ydb-core-persqueue-ut_slow PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_ut_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_ut_slow.cpp
)
add_test(