aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-04-24 14:10:41 +0300
committertesseract <tesseract@yandex-team.com>2023-04-24 14:10:41 +0300
commit84dc1e66400391c20f174febd166767c15799732 (patch)
treec18256f641e2790dd24dc12392cf31eaa7c6ab4c
parent66e4cab5936a4f6c6fdab9a96c9d49b560256a00 (diff)
downloadydb-84dc1e66400391c20f174febd166767c15799732.tar.gz
Extract TopicName method
-rw-r--r--ydb/core/persqueue/partition.cpp44
-rw-r--r--ydb/core/persqueue/partition.h3
-rw-r--r--ydb/core/persqueue/partition_init.cpp10
-rw-r--r--ydb/core/persqueue/partition_read.cpp34
-rw-r--r--ydb/core/persqueue/partition_write.cpp18
5 files changed, 58 insertions, 51 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index e83a9f1e55..5aa9dc3275 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -48,6 +48,10 @@ struct TMirrorerInfo {
TTabletCountersBase Baseline;
};
+const TString& TPartition::TopicName() const {
+ return TopicConverter->GetClientsideName();
+}
+
ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 offset) {
if (container.empty()) {
return offset;
@@ -63,7 +67,7 @@ ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp
void TPartition::ReplyError(const TActorContext& ctx, const ui64 dst, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error) {
ReplyPersQueueError(
- dst == 0 ? ctx.SelfID : Tablet, ctx, TabletID, TopicConverter->GetClientsideName(), Partition,
+ dst == 0 ? ctx.SelfID : Tablet, ctx, TabletID, TopicName(), Partition,
TabletCounters, NKikimrServices::PERSQUEUE, dst, errorCode, error, true
);
}
@@ -398,7 +402,7 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx)
// Reply to all outstanding requests in order to destroy corresponding actors
TStringBuilder ss;
- ss << "Tablet is restarting, topic '" << TopicConverter->GetClientsideName() << "'";
+ ss << "Tablet is restarting, topic '" << TopicName() << "'";
for (const auto& ev : WaitToChangeOwner) {
ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::INITIALIZING, ss);
@@ -444,11 +448,11 @@ void TPartition::InitComplete(const TActorContext& ctx) {
LOG_INFO_S(
ctx, NKikimrServices::PERSQUEUE,
- "init complete for topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " " << ctx.SelfID
+ "init complete for topic '" << TopicName() << "' partition " << Partition << " " << ctx.SelfID
);
TStringBuilder ss;
- ss << "SYNC INIT topic " << TopicConverter->GetClientsideName() << " partitition " << Partition
+ ss << "SYNC INIT topic " << TopicName() << " partitition " << Partition
<< " so " << StartOffset << " endOffset " << EndOffset << " Head " << Head << "\n";
for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) {
ss << "SYNC INIT sourceId " << s.first << " seqNo " << s.second.SeqNo << " offset " << s.second.Offset << "\n";
@@ -479,7 +483,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) {
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
- "Init complete for topic '" << TopicConverter->GetClientsideName() << "' Partition: " << Partition
+ "Init complete for topic '" << TopicName() << "' Partition: " << Partition
<< " SourceId: " << s.first << " SeqNo: " << s.second.SeqNo << " offset: " << s.second.Offset
<< " MaxOffset: " << EndOffset
);
@@ -729,7 +733,7 @@ void TPartition::LogAndCollectError(NKikimrServices::EServiceKikimr service, con
NKikimrPQ::TStatusResponse::TErrorMessage error;
error.SetTimestamp(ctx.Now().Seconds());
error.SetService(service);
- error.SetMessage(TStringBuilder() << "topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " got error: " << msg);
+ error.SetMessage(TStringBuilder() << "topic '" << TopicName() << "' partition " << Partition << " got error: " << msg);
LogAndCollectError(error, ctx);
}
@@ -927,7 +931,7 @@ void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) {
LOG_ERROR_S(
ctx, NKikimrServices::PERSQUEUE,
- "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
+ "Topic '" << TopicName() << "' partition " << Partition
<< " user " << ReadingForUser << " readTimeStamp error: " << ev->Get()->Error
);
@@ -1208,7 +1212,7 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
if (response.GetStatus() != NMsgBusProxy::MSTATUS_OK) {
LOG_ERROR_S(
ctx, NKikimrServices::PERSQUEUE,
- "OnWrite topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
+ "OnWrite topic '" << TopicName() << "' partition " << Partition
<< " commands are not processed at all, reason: " << response.DebugString()
);
ctx.Send(Tablet, new TEvents::TEvPoisonPill());
@@ -1220,7 +1224,7 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
if (response.GetDeleteRangeResult(i).GetStatus() != NKikimrProto::OK) {
LOG_ERROR_S(
ctx, NKikimrServices::PERSQUEUE,
- "OnWrite topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
+ "OnWrite topic '" << TopicName() << "' partition " << Partition
<< " delete range error"
);
//TODO: if disk is full, could this be ok? delete must be ok, of course
@@ -1236,7 +1240,7 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
if (response.GetWriteResult(i).GetStatus() != NKikimrProto::OK) {
LOG_ERROR_S(
ctx, NKikimrServices::PERSQUEUE,
- "OnWrite topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
+ "OnWrite topic '" << TopicName() << "' partition " << Partition
<< " write error"
);
ctx.Send(Tablet, new TEvents::TEvPoisonPill());
@@ -1252,7 +1256,7 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
if (res.GetStatus() != NKikimrProto::OK) {
LOG_ERROR_S(
ctx, NKikimrServices::PERSQUEUE,
- "OnWrite topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
+ "OnWrite topic '" << TopicName() << "' partition " << Partition
<< " are not processed at all, got KV error in CmdGetStatus " << res.GetStatus()
);
ctx.Send(Tablet, new TEvents::TEvPoisonPill());
@@ -1768,7 +1772,7 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act,
if (act.Type == TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE) {
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
- "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
+ "Topic '" << TopicName() << "' partition " << Partition
<< " user " << user << " drop request"
);
@@ -1828,7 +1832,7 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act,
offset = 0;
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
- "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
+ "Topic '" << TopicName() << "' partition " << Partition
<< " user " << act.ClientId << " reinit request with generation " << readRuleGeneration
);
}
@@ -1846,7 +1850,7 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act,
}
LOG_WARN_S(
ctx, NKikimrServices::PERSQUEUE,
- "commit to future - topic " << TopicConverter->GetClientsideName() << " partition " << Partition
+ "commit to future - topic " << TopicName() << " partition " << Partition
<< " client " << act.ClientId << " EndOffset " << EndOffset << " offset " << offset
);
act.Offset = EndOffset;
@@ -1886,14 +1890,14 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
- "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " user " << user
+ "Topic '" << TopicName() << "' partition " << Partition << " user " << user
<< " drop done"
);
PendingUsersInfo.erase(user);
} else if (act.Type == TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE) {
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
- "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " user " << user
+ "Topic '" << TopicName() << "' partition " << Partition << " user " << user
<< " reinit with generation " << readRuleGeneration << " done"
);
@@ -1931,7 +1935,7 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
Y_VERIFY(offset <= (ui64)Max<i64>(), "Unexpected Offset: %" PRIu64, offset);
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
- "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " user " << user
+ "Topic '" << TopicName() << "' partition " << Partition << " user " << user
<< (setSession || dropSession ? " session" : " offset")
<< " is set to " << offset << " (startOffset " << StartOffset << ") session " << session
);
@@ -2257,7 +2261,7 @@ void TPartition::Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& c
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"Got quota." <<
- " Topic: \"" << TopicConverter->GetClientsideName() << "\"." <<
+ " Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition << ": " << ev->Get()->Result << "." <<
" Cookie: " << cookie
);
@@ -2268,7 +2272,7 @@ void TPartition::Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& c
LOG_ERROR_S(
ctx, NKikimrServices::PERSQUEUE,
"Got quota error." <<
- " Topic: \"" << TopicConverter->GetClientsideName() << "\"." <<
+ " Topic: \"" << TopicName() << "\"." <<
" Partition " << Partition << ": " << ev->Get()->Result
);
ctx.Send(Tablet, new TEvents::TEvPoisonPill());
@@ -2336,7 +2340,7 @@ void TPartition::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext
LOG_INFO_S(
ctx, NKikimrServices::PERSQUEUE,
"SubDomainOutOfSpace was changed." <<
- " Topic: \"" << TopicConverter->GetClientsideName() << "\"." <<
+ " Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition << "." <<
" SubDomainOutOfSpace: " << SubDomainOutOfSpace
);
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index a028dbbac2..c954659227 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -77,6 +77,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
friend TInitDataRangeStep;
friend TInitDataStep;
+public:
+ const TString& TopicName() const;
+
private:
static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10;
diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp
index 1c2ea6051c..cac950b082 100644
--- a/ydb/core/persqueue/partition_init.cpp
+++ b/ydb/core/persqueue/partition_init.cpp
@@ -55,7 +55,7 @@ void TInitializer::Next(const TActorContext& ctx) {
}
void TInitializer::Done(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Initializing topic '" << Partition->TopicConverter->GetClientsideName()
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Initializing topic '" << Partition->TopicName()
<< "' partition " << Partition->Partition
<< ". Completed.");
InProgress = false;
@@ -77,7 +77,7 @@ void TInitializer::DoNext(const TActorContext& ctx) {
}
}
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Initializing topic '" << Partition->TopicConverter->GetClientsideName()
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Initializing topic '" << Partition->TopicName()
<< "' partition " << Partition->Partition
<< ". Step " << CurrentStep->Get()->Name);
CurrentStep->Get()->Execute(ctx);
@@ -117,7 +117,7 @@ void TInitializerStep::PoisonPill(const TActorContext& ctx) {
}
TString TInitializerStep::TopicName() const {
- return Partition()->TopicConverter->GetClientsideName();
+ return Partition()->TopicName();
}
@@ -700,11 +700,11 @@ void TPartition::Initialize(const TActorContext& ctx) {
}
if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) {
- PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(EscapeBadChars(TopicConverter->GetClientsideName()),
+ PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(EscapeBadChars(TopicName()),
Partition,
Config.GetYdbDatabasePath()));
} else {
- PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(TopicConverter->GetClientsideName(),
+ PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(TopicName(),
Partition));
}
diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp
index fa4bb0f708..2f70e6f3c7 100644
--- a/ydb/core/persqueue/partition_read.cpp
+++ b/ydb/core/persqueue/partition_read.cpp
@@ -31,10 +31,10 @@ static const ui32 MAX_USER_ACTS = 1000;
void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) {
TSet<TString> hasReadRule;
- for (auto& userInfo : UsersInfoStorage->GetAll()) {
- userInfo.second.ReadFromTimestamp = TInstant::Zero();
- userInfo.second.HasReadRule = false;
- hasReadRule.insert(userInfo.first);
+ for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
+ userInfo.ReadFromTimestamp = TInstant::Zero();
+ userInfo.HasReadRule = false;
+ hasReadRule.insert(consumer);
}
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
const auto& consumer = config.GetReadRules(i);
@@ -128,15 +128,15 @@ void TPartition::UpdateAvailableSize(const TActorContext& ctx) {
auto now = ctx.Now();
WriteQuota->Update(now);
- for (auto &c : UsersInfoStorage->GetAll()) {
+ for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
while (true) {
- if (!c.second.ReadQuota.CanExaust(now) && !c.second.ReadRequests.empty()) {
+ if (!userInfo.ReadQuota.CanExaust(now) && !userInfo.ReadRequests.empty()) {
break;
}
- if (!c.second.ReadRequests.empty()) {
- auto ri(std::move(c.second.ReadRequests.front().first));
- auto cookie = c.second.ReadRequests.front().second;
- c.second.ReadRequests.pop_front();
+ if (!userInfo.ReadRequests.empty()) {
+ auto ri(std::move(userInfo.ReadRequests.front().first));
+ auto cookie = userInfo.ReadRequests.front().second;
+ userInfo.ReadRequests.pop_front();
ProcessRead(ctx, std::move(ri), cookie, false);
} else
break;
@@ -202,13 +202,13 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
userInfo->Offset = StartOffset;
ReadTimestampForOffset(importantUser, *userInfo, ctx);
}
- for (auto& userInfoPair : UsersInfoStorage->GetAll()) {
- if (!important.contains(userInfoPair.first) && userInfoPair.second.Important && userInfoPair.second.LabeledCounters) {
+ for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
+ if (!important.contains(consumer) && userInfo.Important && userInfo.LabeledCounters) {
ctx.Send(
Tablet,
- new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfoPair.second.LabeledCounters->GetGroup())
+ new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo.LabeledCounters->GetGroup())
);
- userInfoPair.second.SetImportant(false);
+ userInfo.SetImportant(false);
}
}
}
@@ -823,9 +823,9 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo
}
void TPartition::ProcessTimestampsForNewData(const ui64 prevEndOffset, const TActorContext& ctx) {
- for (auto& userInfoPair : UsersInfoStorage->GetAll()) {
- if (userInfoPair.second.Offset >= (i64)prevEndOffset && userInfoPair.second.Offset < (i64)EndOffset) {
- ReadTimestampForOffset(userInfoPair.first, userInfoPair.second, ctx);
+ for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
+ if (userInfo.Offset >= (i64)prevEndOffset && userInfo.Offset < (i64)EndOffset) {
+ ReadTimestampForOffset(consumer, userInfo, ctx);
}
}
}
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index b74d1a6132..12ed674b7d 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -136,7 +136,7 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c
Y_VERIFY(ReservedSize >= it->second.ReservedSize);
ReservedSize -= it->second.ReservedSize;
- it->second.GenerateCookie(owner, ev->PipeClient, ev->Sender, TopicConverter->GetClientsideName(), Partition, ctx);//will change OwnerCookie
+ it->second.GenerateCookie(owner, ev->PipeClient, ev->Sender, TopicName(), Partition, ctx);//will change OwnerCookie
//cookie is generated. but answer will be sent when all inflight writes will be done - they in the same queue 'Requests'
EmplaceRequest(TOwnershipMsg{ev->Cookie, it->second.OwnerCookie}, ctx);
TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize);
@@ -301,7 +301,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
ctx,
NKikimrServices::PERSQUEUE,
"Answering for message sourceid: '" << EscapeC(s) <<
- "', Topic: '" << TopicConverter->GetClientsideName() <<
+ "', Topic: '" << TopicName() <<
"', Partition: " << Partition <<
", SeqNo: " << seqNo << ", partNo: " << partNo <<
", Offset: " << offset << " is " << (already ? "already written" : "stored on disk")
@@ -849,7 +849,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
if (poffset >= curOffset) {
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
- "Already written message. Topic: '" << TopicConverter->GetClientsideName()
+ "Already written message. Topic: '" << TopicName()
<< "' Partition: " << Partition << " SourceId: '" << EscapeC(p.Msg.SourceId)
<< "'. Message seqNo = " << p.Msg.SeqNo
<< ". Committed seqNo = " << (isCommitted ? it_inMemory->second.SeqNo : 0)
@@ -922,7 +922,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
- "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
+ "Topic '" << TopicName() << "' partition " << Partition
<< " part blob processing sourceId '" << EscapeC(p.Msg.SourceId) <<
"' seqNo " << p.Msg.SeqNo << " partNo " << p.Msg.PartNo
);
@@ -984,7 +984,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
- "Topic '" << TopicConverter->GetClientsideName() <<
+ "Topic '" << TopicName() <<
"' partition " << Partition <<
" part blob sourceId '" << EscapeC(p.Msg.SourceId) <<
"' seqNo " << p.Msg.SeqNo << " partNo " << p.Msg.PartNo <<
@@ -1021,7 +1021,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
}
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
- "writing blob: topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
+ "writing blob: topic '" << TopicName() << "' partition " << Partition
<< " " << x.first.ToString() << " size " << x.second << " WTime " << ctx.Now().MilliSeconds()
);
@@ -1060,7 +1060,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
- "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
+ "Topic '" << TopicName() << "' partition " << Partition
<< " part blob complete sourceId '" << EscapeC(p.Msg.SourceId) << "' seqNo " << p.Msg.SeqNo
<< " partNo " << p.Msg.PartNo << " FormedBlobsCount " << PartitionedBlob.GetFormedBlobs().size()
<< " NewHead: " << NewHead
@@ -1292,7 +1292,7 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
- "writing blob: topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
+ "writing blob: topic '" << TopicName() << "' partition " << Partition
<< " compactOffset " << key.GetOffset() << "," << key.GetCount()
<< " HeadOffset " << Head.Offset << " endOffset " << EndOffset << " curOffset "
<< NewHead.GetNextOffset() << " " << key.ToString()
@@ -1372,7 +1372,7 @@ void TPartition::RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie) {
LOG_DEBUG_S(
TActivationContext::AsActorContext(), NKikimrServices::PERSQUEUE,
"Send write quota request." <<
- " Topic: \"" << TopicConverter->GetClientsideName() << "\"." <<
+ " Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition << "." <<
" Amount: " << dataSize << "." <<
" Cookie: " << cookie