diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-06-29 09:57:03 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-06-29 09:57:03 +0300 |
commit | 726057070f9c5a91fc10fde0d5024913d10f1ab9 (patch) | |
tree | ad15023e87cedf48412ceb4050a6fccf02db7397 | |
parent | d85eacbce6dcac02fe1492192939f19678a6f42f (diff) | |
download | ydb-726057070f9c5a91fc10fde0d5024913d10f1ab9.tar.gz |
Additional logs
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 10 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_write.cpp | 70 |
3 files changed, 67 insertions, 15 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 8164720707..4cacb876e8 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -282,17 +282,17 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) { bool TPartition::CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx) { bool haveChanges = CleanUpBlobs(request, ctx); - LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << - request->Record.CmdDeleteRangeSize() << " items to delete old stuff"); + + LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, "Have " << request->Record.CmdDeleteRangeSize() << " items to delete old stuff"); haveChanges |= SourceIdStorage.DropOldSourceIds(request, ctx.Now(), StartOffset, Partition, Config.GetPartitionConfig()); if (haveChanges) { SourceIdStorage.MarkOwnersForDeletedSourceId(Owners); } - LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << - request->Record.CmdDeleteRangeSize() << " items to delete all stuff"); - LOG_TRACE(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Delete command " << request->ToString()); + + LOG_TRACE_S(ctx, NKikimrServices::PERSQUEUE, "Have " << request->Record.CmdDeleteRangeSize() << " items to delete all stuff. " + << "Delete command " << request->ToString()); return haveChanges; } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 4aab43956a..1dc8a8cb01 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -189,7 +189,7 @@ private: void CheckHeadConsistency() const; void HandleWrites(const TActorContext& ctx); void RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie); - void WriteBlobWithQuota(THolder<TEvKeyValue::TEvRequest>&& request); + void WriteBlobWithQuota(const TActorContext& ctx, THolder<TEvKeyValue::TEvRequest>&& request); void UpdateUserInfoEndOffset(const TInstant& now); void UpdateWriteBufferIsFullState(const TInstant& now); diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index a8b540a88a..232c50230a 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -27,6 +27,8 @@ static const ui32 MAX_WRITE_CYCLE_SIZE = 16_MB; static const ui32 MAX_INLINE_SIZE = 1000; void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition); + THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst); NKikimrClient::TResponse& resp = response->Response; resp.SetStatus(NMsgBusProxy::MSTATUS_OK); @@ -37,8 +39,11 @@ void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TS void TPartition::ReplyWrite( const TActorContext& ctx, const ui64 dst, const TString& sourceId, const ui64 seqNo, const ui16 partNo, const ui16 totalParts, - const ui64 offset, const TInstant writeTimestamp, bool already, const ui64 maxSeqNo, + const ui64 offset, const TInstant writeTimestamp, bool already, const ui64 maxSeqNo, const TDuration partitionQuotedTime, const TDuration topicQuotedTime, const TDuration queueTime, const TDuration writeTime) { + + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyWrite. Partition: " << Partition); + Y_VERIFY(offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, offset); Y_VERIFY(seqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, seqNo); @@ -71,10 +76,14 @@ void TPartition::HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActor } void TPartition::HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr&, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvUpdateAvailableSize. Partition: " << Partition); + UpdateAvailableSize(ctx); } void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::CancelAllWritesOnIdle. Partition: " << Partition); + for (const auto& w : Requests) { ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::WRITE_ERROR_DISK_IS_FULL, "Disk is full"); if (w.IsWrite()) { @@ -95,6 +104,8 @@ void TPartition::CancelAllWritesOnIdle(const TActorContext& ctx) { } void TPartition::FailBadClient(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::FailBadClient. Partition: " << Partition); + for (auto it = Owners.begin(); it != Owners.end();) { it = DropOwner(it, ctx); } @@ -125,6 +136,7 @@ void TPartition::FailBadClient(const TActorContext& ctx) { } void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ProcessChangeOwnerRequest. Partition: " << Partition); auto &owner = ev->Owner; auto it = Owners.find(owner); @@ -149,6 +161,8 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator& it, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::DropOwner. Partition: " << Partition); + Y_VERIFY(ReservedSize >= it->second.ReservedSize); ReservedSize -= it->second.ReservedSize; UpdateWriteBufferIsFullState(ctx.Now()); @@ -163,6 +177,8 @@ THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THas } void TPartition::Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvChangeOwner. Partition: " << Partition); + bool res = OwnerPipes.insert(ev->Get()->PipeClient).second; Y_VERIFY(res); WaitToChangeOwner.push_back(ev->Release()); @@ -170,6 +186,8 @@ void TPartition::Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ct } void TPartition::ProcessReserveRequests(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ProcessReserveRequests. Partition: " << Partition); + const ui64 maxWriteInflightSize = Config.GetPartitionConfig().GetMaxWriteInflightSize(); while (!ReserveRequests.empty()) { @@ -188,12 +206,12 @@ void TPartition::ProcessReserveRequests(const TActorContext& ctx) { const ui64 currentSize = ReservedSize + WriteInflightSize + WriteCycleSize; if (currentSize != 0 && currentSize + size > maxWriteInflightSize) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: maxWriteInflightSize riched"); + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: maxWriteInflightSize riched. Partition: " << Partition); break; } if (WaitingForSubDomainQuota(ctx, currentSize)) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: SubDomainOutOfSpace"); + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: SubDomainOutOfSpace. Partition: " << Partition); break; } @@ -216,6 +234,8 @@ void TPartition::UpdateWriteBufferIsFullState(const TInstant& now) { void TPartition::Handle(TEvPQ::TEvReserveBytes::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvReserveBytes. Partition: " << Partition); + const TString& ownerCookie = ev->Get()->OwnerCookie; TStringBuf owner = TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie); const ui64& messageNo = ev->Get()->MessageNo; @@ -245,6 +265,8 @@ void TPartition::HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ct } void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AnswerCurrentWrites. Partition: " << Partition); + ui64 offset = EndOffset; while (!Responses.empty()) { const auto& response = Responses.front(); @@ -363,6 +385,8 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { } void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::SyncMemoryStateWithKVState. Partition: " << Partition); + if (!CompactedKeys.empty()) HeadKeys.clear(); @@ -429,10 +453,13 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) { } void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvHandleWriteResponse. Partition: " << Partition); + HandleWriteResponse(ctx); } void TPartition::HandleWriteResponse(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleWriteResponse. Partition: " << Partition); Y_VERIFY(CurrentStateFunc() == &TThis::StateWrite); ui64 prevEndOffset = EndOffset; @@ -505,9 +532,11 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { } void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx) { - ui32 sz = std::accumulate(ev->Get()->Msgs.begin(), ev->Get()->Msgs.end(), 0u, [](ui32 sum, const TEvPQ::TEvWrite::TMsg& msg){ - return sum + msg.Data.size(); - }); + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvWrite. Partition: " << Partition); + + ui32 sz = std::accumulate(ev->Get()->Msgs.begin(), ev->Get()->Msgs.end(), 0u, [](ui32 sum, const TEvPQ::TEvWrite::TMsg& msg) { + return sum + msg.Data.size(); + }); bool mirroredPartition = Config.GetPartitionConfig().HasMirrorFrom(); @@ -628,6 +657,8 @@ void TPartition::HandleOnIdle(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TA } void TPartition::HandleOnWrite(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvRegisterMessageGroup. Partition: " << Partition); + const auto& body = ev->Get()->Body; auto it = SourceIdStorage.GetInMemorySourceIds().find(body.SourceId); @@ -664,6 +695,8 @@ void TPartition::HandleOnIdle(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const } void TPartition::HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvDeregisterMessageGroup. Partition: " << Partition); + const auto& body = ev->Get()->Body; auto it = SourceIdStorage.GetInMemorySourceIds().find(body.SourceId); @@ -681,6 +714,8 @@ void TPartition::HandleOnIdle(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActo } void TPartition::HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleOnWrite TEvSplitMessageGroup. Partition: " << Partition); + if (ev->Get()->Deregistrations.size() > 1) { return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, TStringBuilder() << "Currently, single deregistrations are supported"); @@ -736,6 +771,8 @@ std::pair<TKey, ui32> TPartition::Compact(const TKey& key, const ui32 size, bool void TPartition::ProcessChangeOwnerRequests(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ProcessChangeOwnerRequests. Partition: " << Partition); + while (!WaitToChangeOwner.empty()) { auto &ev = WaitToChangeOwner.front(); if (OwnerPipes.find(ev->PipeClient) != OwnerPipes.end()) { //this is not request from dead pipe @@ -751,6 +788,8 @@ void TPartition::ProcessChangeOwnerRequests(const TActorContext& ctx) { } void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TSourceIdWriter& sourceIdWriter, NPersQueue::NErrorCode::EErrorCode errorCode) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::CancelAllWritesOnWrite. Partition: " << Partition); + ReplyError(ctx, p.Cookie, errorCode, errorStr); TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1); TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(p.Msg.Data.size() + p.Msg.SourceId.size()); @@ -767,6 +806,7 @@ void TPartition::CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::T bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx, TSourceIdWriter& sourceIdWriter) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AppendHeadWithNewWrites. Partition: " << Partition); ui64 curOffset = PartitionedBlob.IsInited() ? PartitionedBlob.GetOffset() : EndOffset; @@ -1136,6 +1176,8 @@ std::pair<TKey, ui32> TPartition::GetNewWriteKey(bool headCleared) { } void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::AddNewWriteBlob. Partition: " << Partition); + const auto& key = res.first; TString valueD; @@ -1222,8 +1264,9 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq } void TPartition::SetDeadlinesForWrites(const TActorContext& ctx) { - if (AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs() > 0 && QuotaDeadline == TInstant::Zero()) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::SetDeadlinesForWrites. Partition: " << Partition); + if (AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs() > 0 && QuotaDeadline == TInstant::Zero()) { QuotaDeadline = ctx.Now() + TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs()); ctx.Schedule(QuotaDeadline, new TEvPQ::TEvQuotaDeadlineCheck()); @@ -1231,10 +1274,13 @@ void TPartition::SetDeadlinesForWrites(const TActorContext& ctx) { } void TPartition::Handle(TEvPQ::TEvQuotaDeadlineCheck::TPtr&, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::Handle TEvQuotaDeadlineCheck. Partition: " << Partition); + FilterDeadlinedWrites(ctx); } bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ProcessWrites. Partition: " << Partition); FilterDeadlinedWrites(ctx); @@ -1307,6 +1353,8 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) { if (QuotaDeadline == TInstant::Zero() || QuotaDeadline > ctx.Now()) return; + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::FilterDeadlinedWrites. Partition: " << Partition); + std::deque<TMessage> newRequests; for (auto& w : Requests) { if (!w.IsWrite() || w.GetWrite().Msg.IgnoreQuotaDeadline) { @@ -1331,6 +1379,8 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx) { void TPartition::HandleWrites(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::HandleWrites. Partition: " << Partition); + Become(&TThis::StateWrite); THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); @@ -1365,7 +1415,7 @@ void TPartition::HandleWrites(const TActorContext& ctx) { } WritesTotal.Inc(); - WriteBlobWithQuota(std::move(request)); + WriteBlobWithQuota(ctx, std::move(request)); } void TPartition::RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie) { @@ -1395,7 +1445,9 @@ bool TPartition::WaitingForSubDomainQuota(const TActorContext& ctx, const ui64 w return SubDomainOutOfSpace && AppData()->FeatureFlags.GetEnableTopicDiskSubDomainQuota() && MeteringDataSize(ctx) + withSize > ReserveSize(); } -void TPartition::WriteBlobWithQuota(THolder<TEvKeyValue::TEvRequest>&& request) { +void TPartition::WriteBlobWithQuota(const TActorContext& ctx, THolder<TEvKeyValue::TEvRequest>&& request) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::WriteBlobWithQuota. Partition: " << Partition); + // Request quota and write blob. // Mirrored topics are not quoted in local dc. const bool skip = !IsQuotingEnabled() || TopicWriteQuotaResourcePath.empty(); |