aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-06-29 09:57:03 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-06-29 09:57:03 +0300
commit726057070f9c5a91fc10fde0d5024913d10f1ab9 (patch)
treead15023e87cedf48412ceb4050a6fccf02db7397
parentd85eacbce6dcac02fe1492192939f19678a6f42f (diff)
downloadydb-726057070f9c5a91fc10fde0d5024913d10f1ab9.tar.gz
Additional logs
-rw-r--r--ydb/core/persqueue/partition.cpp10
-rw-r--r--ydb/core/persqueue/partition.h2
-rw-r--r--ydb/core/persqueue/partition_write.cpp70
3 files changed, 67 insertions, 15 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 8164720707..4cacb876e8 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -282,17 +282,17 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) {
bool TPartition::CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx) {
bool haveChanges = CleanUpBlobs(request, ctx);
- LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " <<
- request->Record.CmdDeleteRangeSize() << " items to delete old stuff");
+
+ LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, "Have " << request->Record.CmdDeleteRangeSize() << " items to delete old stuff");
haveChanges |= SourceIdStorage.DropOldSourceIds(request, ctx.Now(), StartOffset, Partition,
Config.GetPartitionConfig());
if (haveChanges) {
SourceIdStorage.MarkOwnersForDeletedSourceId(Owners);
}
- LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " <<
- request->Record.CmdDeleteRangeSize() << " items to delete all stuff");
- LOG_TRACE(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Delete command " << request->ToString());
+
+ LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, "Have " << request->Record.CmdDeleteRangeSize() << " items to delete all stuff. "
+ << "Delete command " << request->ToString());
return haveChanges;
}
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 4aab43956a..1dc8a8cb01 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -189,7 +189,7 @@ private:
void CheckHeadConsistency() const;
void HandleWrites(const TActorContext& ctx);
void RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie);
- void WriteBlobWithQuota(THolder<TEvKeyValue::TEvRequest>&& request);
+ void WriteBlobWithQuota(const TActorContext& ctx, THolder<TEvKeyValue::TEvRequest>&& request);
void UpdateUserInfoEndOffset(const TInstant& now);
void UpdateWriteBufferIsFullState(const TInstant& now);
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index a8b540a88a..232c50230a 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -27,6 +27,8 @@ static const ui32 MAX_WRITE_CYCLE_SIZE = 16_MB;
static const ui32 MAX_INLINE_SIZE = 1000;
void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition);
+
THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst);
NKikimrClient::TResponse& resp = response->Response;
resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
@@ -37,8 +39,11 @@ void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TS
void TPartition::ReplyWrite(
const TActorContext& ctx, const ui64 dst, const TString& sourceId, const ui64 seqNo, const ui16 partNo, const ui16 totalParts,
- const ui64 offset, const TInstant writeTimestamp, bool already, const ui64 maxSeqNo,
+ const ui64 offset, const TInstant writeTimestamp, bool already, const ui64 maxSeqNo,
const TDuration partitionQuotedTime, const TDuration topicQuotedTime, const TDuration queueTime, const TDuration writeTime) {
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyWrite. Partition: " << Partition);
+
Y_VERIFY(offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, offset);
Y_VERIFY(seqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, seqNo);
@@ -71,10 +76,14 @@ void TPartition::HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActor
}
void TPartition::HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvUpdateAvailableSize. Partition: " << Partition);
+
UpdateAvailableSize(ctx);
}
void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::CancelAllWritesOnIdle. Partition: " << Partition);
+
for (const auto& w : Requests) {
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::WRITE_ERROR_DISK_IS_FULL, "Disk is full");
if (w.IsWrite()) {
@@ -95,6 +104,8 @@ void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx) {
}
void TPartition::FailBadClient(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::FailBadClient. Partition: " << Partition);
+
for (auto it = Owners.begin(); it != Owners.end();) {
it = DropOwner(it, ctx);
}
@@ -125,6 +136,7 @@ void TPartition::FailBadClient(const TActorContext& ctx) {
}
void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ProcessChangeOwnerRequest. Partition: " << Partition);
auto &owner = ev->Owner;
auto it = Owners.find(owner);
@@ -149,6 +161,8 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c
THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator& it, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::DropOwner. Partition: " << Partition);
+
Y_VERIFY(ReservedSize >= it->second.ReservedSize);
ReservedSize -= it->second.ReservedSize;
UpdateWriteBufferIsFullState(ctx.Now());
@@ -163,6 +177,8 @@ THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THas
}
void TPartition::Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvChangeOwner. Partition: " << Partition);
+
bool res = OwnerPipes.insert(ev->Get()->PipeClient).second;
Y_VERIFY(res);
WaitToChangeOwner.push_back(ev->Release());
@@ -170,6 +186,8 @@ void TPartition::Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ct
}
void TPartition::ProcessReserveRequests(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ProcessReserveRequests. Partition: " << Partition);
+
const ui64 maxWriteInflightSize = Config.GetPartitionConfig().GetMaxWriteInflightSize();
while (!ReserveRequests.empty()) {
@@ -188,12 +206,12 @@ void TPartition::ProcessReserveRequests(const TActorContext& ctx) {
const ui64 currentSize = ReservedSize + WriteInflightSize + WriteCycleSize;
if (currentSize != 0 && currentSize + size > maxWriteInflightSize) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: maxWriteInflightSize riched");
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: maxWriteInflightSize riched. Partition: " << Partition);
break;
}
if (WaitingForSubDomainQuota(ctx, currentSize)) {
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: SubDomainOutOfSpace");
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: SubDomainOutOfSpace. Partition: " << Partition);
break;
}
@@ -216,6 +234,8 @@ void TPartition::UpdateWriteBufferIsFullState(const TInstant& now) {
void TPartition::Handle(TEvPQ::TEvReserveBytes::TPtr& ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvReserveBytes. Partition: " << Partition);
+
const TString& ownerCookie = ev->Get()->OwnerCookie;
TStringBuf owner = TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie);
const ui64& messageNo = ev->Get()->MessageNo;
@@ -245,6 +265,8 @@ void TPartition::HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ct
}
void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AnswerCurrentWrites. Partition: " << Partition);
+
ui64 offset = EndOffset;
while (!Responses.empty()) {
const auto& response = Responses.front();
@@ -363,6 +385,8 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
}
void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::SyncMemoryStateWithKVState. Partition: " << Partition);
+
if (!CompactedKeys.empty())
HeadKeys.clear();
@@ -429,10 +453,13 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
}
void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvHandleWriteResponse. Partition: " << Partition);
+
HandleWriteResponse(ctx);
}
void TPartition::HandleWriteResponse(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleWriteResponse. Partition: " << Partition);
Y_VERIFY(CurrentStateFunc() == &TThis::StateWrite);
ui64 prevEndOffset = EndOffset;
@@ -505,9 +532,11 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
}
void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx) {
- ui32 sz = std::accumulate(ev->Get()->Msgs.begin(), ev->Get()->Msgs.end(), 0u, [](ui32 sum, const TEvPQ::TEvWrite::TMsg& msg){
- return sum + msg.Data.size();
- });
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvWrite. Partition: " << Partition);
+
+ ui32 sz = std::accumulate(ev->Get()->Msgs.begin(), ev->Get()->Msgs.end(), 0u, [](ui32 sum, const TEvPQ::TEvWrite::TMsg& msg) {
+ return sum + msg.Data.size();
+ });
bool mirroredPartition = Config.GetPartitionConfig().HasMirrorFrom();
@@ -628,6 +657,8 @@ void TPartition::HandleOnIdle(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TA
}
void TPartition::HandleOnWrite(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvRegisterMessageGroup. Partition: " << Partition);
+
const auto& body = ev->Get()->Body;
auto it = SourceIdStorage.GetInMemorySourceIds().find(body.SourceId);
@@ -664,6 +695,8 @@ void TPartition::HandleOnIdle(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const
}
void TPartition::HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvDeregisterMessageGroup. Partition: " << Partition);
+
const auto& body = ev->Get()->Body;
auto it = SourceIdStorage.GetInMemorySourceIds().find(body.SourceId);
@@ -681,6 +714,8 @@ void TPartition::HandleOnIdle(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActo
}
void TPartition::HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvSplitMessageGroup. Partition: " << Partition);
+
if (ev->Get()->Deregistrations.size() > 1) {
return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST,
TStringBuilder() << "Currently, single deregistrations are supported");
@@ -736,6 +771,8 @@ std::pair<TKey, ui32> TPartition::Compact(const TKey& key, const ui32 size, bool
void TPartition::ProcessChangeOwnerRequests(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ProcessChangeOwnerRequests. Partition: " << Partition);
+
while (!WaitToChangeOwner.empty()) {
auto &ev = WaitToChangeOwner.front();
if (OwnerPipes.find(ev->PipeClient) != OwnerPipes.end()) { //this is not request from dead pipe
@@ -751,6 +788,8 @@ void TPartition::ProcessChangeOwnerRequests(const TActorContext& ctx) {
}
void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::CancelAllWritesOnWrite. Partition: " << Partition);
+
ReplyError(ctx, p.Cookie, errorCode, errorStr);
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1);
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(p.Msg.Data.size() + p.Msg.SourceId.size());
@@ -767,6 +806,7 @@ void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::T
bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx,
TSourceIdWriter& sourceIdWriter) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AppendHeadWithNewWrites. Partition: " << Partition);
ui64 curOffset = PartitionedBlob.IsInited() ? PartitionedBlob.GetOffset() : EndOffset;
@@ -1136,6 +1176,8 @@ std::pair<TKey, ui32> TPartition::GetNewWriteKey(bool headCleared) {
}
void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AddNewWriteBlob. Partition: " << Partition);
+
const auto& key = res.first;
TString valueD;
@@ -1222,8 +1264,9 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq
}
void TPartition::SetDeadlinesForWrites(const TActorContext& ctx) {
- if (AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs() > 0 && QuotaDeadline == TInstant::Zero()) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::SetDeadlinesForWrites. Partition: " << Partition);
+ if (AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs() > 0 && QuotaDeadline == TInstant::Zero()) {
QuotaDeadline = ctx.Now() + TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs());
ctx.Schedule(QuotaDeadline, new TEvPQ::TEvQuotaDeadlineCheck());
@@ -1231,10 +1274,13 @@ void TPartition::SetDeadlinesForWrites(const TActorContext& ctx) {
}
void TPartition::Handle(TEvPQ::TEvQuotaDeadlineCheck::TPtr&, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::Handle TEvQuotaDeadlineCheck. Partition: " << Partition);
+
FilterDeadlinedWrites(ctx);
}
bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ProcessWrites. Partition: " << Partition);
FilterDeadlinedWrites(ctx);
@@ -1307,6 +1353,8 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) {
if (QuotaDeadline == TInstant::Zero() || QuotaDeadline > ctx.Now())
return;
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::FilterDeadlinedWrites. Partition: " << Partition);
+
std::deque<TMessage> newRequests;
for (auto& w : Requests) {
if (!w.IsWrite() || w.GetWrite().Msg.IgnoreQuotaDeadline) {
@@ -1331,6 +1379,8 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) {
void TPartition::HandleWrites(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleWrites. Partition: " << Partition);
+
Become(&TThis::StateWrite);
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
@@ -1365,7 +1415,7 @@ void TPartition::HandleWrites(const TActorContext& ctx) {
}
WritesTotal.Inc();
- WriteBlobWithQuota(std::move(request));
+ WriteBlobWithQuota(ctx, std::move(request));
}
void TPartition::RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie) {
@@ -1395,7 +1445,9 @@ bool TPartition::WaitingForSubDomainQuota(const TActorContext& ctx, const ui64 w
return SubDomainOutOfSpace && AppData()->FeatureFlags.GetEnableTopicDiskSubDomainQuota() && MeteringDataSize(ctx) + withSize > ReserveSize();
}
-void TPartition::WriteBlobWithQuota(THolder<TEvKeyValue::TEvRequest>&& request) {
+void TPartition::WriteBlobWithQuota(const TActorContext& ctx, THolder<TEvKeyValue::TEvRequest>&& request) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::WriteBlobWithQuota. Partition: " << Partition);
+
// Request quota and write blob.
// Mirrored topics are not quoted in local dc.
const bool skip = !IsQuotingEnabled() || TopicWriteQuotaResourcePath.empty();