diff options
author | tesseract <tesseract@yandex-team.com> | 2023-04-24 14:10:41 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-04-24 14:10:41 +0300 |
commit | 84dc1e66400391c20f174febd166767c15799732 (patch) | |
tree | c18256f641e2790dd24dc12392cf31eaa7c6ab4c | |
parent | 66e4cab5936a4f6c6fdab9a96c9d49b560256a00 (diff) | |
download | ydb-84dc1e66400391c20f174febd166767c15799732.tar.gz |
Extract TopicName method
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 44 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 3 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_init.cpp | 10 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_read.cpp | 34 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_write.cpp | 18 |
5 files changed, 58 insertions, 51 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index e83a9f1e55..5aa9dc3275 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -48,6 +48,10 @@ struct TMirrorerInfo { TTabletCountersBase Baseline; }; +const TString& TPartition::TopicName() const { + return TopicConverter->GetClientsideName(); +} + ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 offset) { if (container.empty()) { return offset; @@ -63,7 +67,7 @@ ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp void TPartition::ReplyError(const TActorContext& ctx, const ui64 dst, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error) { ReplyPersQueueError( - dst == 0 ? ctx.SelfID : Tablet, ctx, TabletID, TopicConverter->GetClientsideName(), Partition, + dst == 0 ? ctx.SelfID : Tablet, ctx, TabletID, TopicName(), Partition, TabletCounters, NKikimrServices::PERSQUEUE, dst, errorCode, error, true ); } @@ -398,7 +402,7 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) // Reply to all outstanding requests in order to destroy corresponding actors TStringBuilder ss; - ss << "Tablet is restarting, topic '" << TopicConverter->GetClientsideName() << "'"; + ss << "Tablet is restarting, topic '" << TopicName() << "'"; for (const auto& ev : WaitToChangeOwner) { ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::INITIALIZING, ss); @@ -444,11 +448,11 @@ void TPartition::InitComplete(const TActorContext& ctx) { LOG_INFO_S( ctx, NKikimrServices::PERSQUEUE, - "init complete for topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " " << ctx.SelfID + "init complete for topic '" << TopicName() << "' partition " << Partition << " " << ctx.SelfID ); TStringBuilder ss; - ss << "SYNC INIT topic " << TopicConverter->GetClientsideName() << " partitition " << Partition + ss << "SYNC INIT topic " << TopicName() << " partitition " << Partition << " so " << StartOffset << " endOffset " << EndOffset << " Head " << Head << "\n"; for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) { ss << "SYNC INIT sourceId " << s.first << " seqNo " << s.second.SeqNo << " offset " << s.second.Offset << "\n"; @@ -479,7 +483,7 @@ void TPartition::InitComplete(const TActorContext& ctx) { for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) { LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "Init complete for topic '" << TopicConverter->GetClientsideName() << "' Partition: " << Partition + "Init complete for topic '" << TopicName() << "' Partition: " << Partition << " SourceId: " << s.first << " SeqNo: " << s.second.SeqNo << " offset: " << s.second.Offset << " MaxOffset: " << EndOffset ); @@ -729,7 +733,7 @@ void TPartition::LogAndCollectError(NKikimrServices::EServiceKikimr service, con NKikimrPQ::TStatusResponse::TErrorMessage error; error.SetTimestamp(ctx.Now().Seconds()); error.SetService(service); - error.SetMessage(TStringBuilder() << "topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " got error: " << msg); + error.SetMessage(TStringBuilder() << "topic '" << TopicName() << "' partition " << Partition << " got error: " << msg); LogAndCollectError(error, ctx); } @@ -927,7 +931,7 @@ void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) { LOG_ERROR_S( ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition + "Topic '" << TopicName() << "' partition " << Partition << " user " << ReadingForUser << " readTimeStamp error: " << ev->Get()->Error ); @@ -1208,7 +1212,7 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& if (response.GetStatus() != NMsgBusProxy::MSTATUS_OK) { LOG_ERROR_S( ctx, NKikimrServices::PERSQUEUE, - "OnWrite topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition + "OnWrite topic '" << TopicName() << "' partition " << Partition << " commands are not processed at all, reason: " << response.DebugString() ); ctx.Send(Tablet, new TEvents::TEvPoisonPill()); @@ -1220,7 +1224,7 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& if (response.GetDeleteRangeResult(i).GetStatus() != NKikimrProto::OK) { LOG_ERROR_S( ctx, NKikimrServices::PERSQUEUE, - "OnWrite topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition + "OnWrite topic '" << TopicName() << "' partition " << Partition << " delete range error" ); //TODO: if disk is full, could this be ok? delete must be ok, of course @@ -1236,7 +1240,7 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& if (response.GetWriteResult(i).GetStatus() != NKikimrProto::OK) { LOG_ERROR_S( ctx, NKikimrServices::PERSQUEUE, - "OnWrite topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition + "OnWrite topic '" << TopicName() << "' partition " << Partition << " write error" ); ctx.Send(Tablet, new TEvents::TEvPoisonPill()); @@ -1252,7 +1256,7 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& if (res.GetStatus() != NKikimrProto::OK) { LOG_ERROR_S( ctx, NKikimrServices::PERSQUEUE, - "OnWrite topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition + "OnWrite topic '" << TopicName() << "' partition " << Partition << " are not processed at all, got KV error in CmdGetStatus " << res.GetStatus() ); ctx.Send(Tablet, new TEvents::TEvPoisonPill()); @@ -1768,7 +1772,7 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act, if (act.Type == TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE) { LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition + "Topic '" << TopicName() << "' partition " << Partition << " user " << user << " drop request" ); @@ -1828,7 +1832,7 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act, offset = 0; LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition + "Topic '" << TopicName() << "' partition " << Partition << " user " << act.ClientId << " reinit request with generation " << readRuleGeneration ); } @@ -1846,7 +1850,7 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act, } LOG_WARN_S( ctx, NKikimrServices::PERSQUEUE, - "commit to future - topic " << TopicConverter->GetClientsideName() << " partition " << Partition + "commit to future - topic " << TopicName() << " partition " << Partition << " client " << act.ClientId << " EndOffset " << EndOffset << " offset " << offset ); act.Offset = EndOffset; @@ -1886,14 +1890,14 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act, LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " user " << user + "Topic '" << TopicName() << "' partition " << Partition << " user " << user << " drop done" ); PendingUsersInfo.erase(user); } else if (act.Type == TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE) { LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " user " << user + "Topic '" << TopicName() << "' partition " << Partition << " user " << user << " reinit with generation " << readRuleGeneration << " done" ); @@ -1931,7 +1935,7 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act, Y_VERIFY(offset <= (ui64)Max<i64>(), "Unexpected Offset: %" PRIu64, offset); LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " user " << user + "Topic '" << TopicName() << "' partition " << Partition << " user " << user << (setSession || dropSession ? " session" : " offset") << " is set to " << offset << " (startOffset " << StartOffset << ") session " << session ); @@ -2257,7 +2261,7 @@ void TPartition::Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& c LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, "Got quota." << - " Topic: \"" << TopicConverter->GetClientsideName() << "\"." << + " Topic: \"" << TopicName() << "\"." << " Partition: " << Partition << ": " << ev->Get()->Result << "." << " Cookie: " << cookie ); @@ -2268,7 +2272,7 @@ void TPartition::Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& c LOG_ERROR_S( ctx, NKikimrServices::PERSQUEUE, "Got quota error." << - " Topic: \"" << TopicConverter->GetClientsideName() << "\"." << + " Topic: \"" << TopicName() << "\"." << " Partition " << Partition << ": " << ev->Get()->Result ); ctx.Send(Tablet, new TEvents::TEvPoisonPill()); @@ -2336,7 +2340,7 @@ void TPartition::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext LOG_INFO_S( ctx, NKikimrServices::PERSQUEUE, "SubDomainOutOfSpace was changed." << - " Topic: \"" << TopicConverter->GetClientsideName() << "\"." << + " Topic: \"" << TopicName() << "\"." << " Partition: " << Partition << "." << " SubDomainOutOfSpace: " << SubDomainOutOfSpace ); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index a028dbbac2..c954659227 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -77,6 +77,9 @@ class TPartition : public TActorBootstrapped<TPartition> { friend TInitDataRangeStep; friend TInitDataStep; +public: + const TString& TopicName() const; + private: static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10; diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index 1c2ea6051c..cac950b082 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -55,7 +55,7 @@ void TInitializer::Next(const TActorContext& ctx) { } void TInitializer::Done(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Initializing topic '" << Partition->TopicConverter->GetClientsideName() + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Initializing topic '" << Partition->TopicName() << "' partition " << Partition->Partition << ". Completed."); InProgress = false; @@ -77,7 +77,7 @@ void TInitializer::DoNext(const TActorContext& ctx) { } } - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Initializing topic '" << Partition->TopicConverter->GetClientsideName() + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Initializing topic '" << Partition->TopicName() << "' partition " << Partition->Partition << ". Step " << CurrentStep->Get()->Name); CurrentStep->Get()->Execute(ctx); @@ -117,7 +117,7 @@ void TInitializerStep::PoisonPill(const TActorContext& ctx) { } TString TInitializerStep::TopicName() const { - return Partition()->TopicConverter->GetClientsideName(); + return Partition()->TopicName(); } @@ -700,11 +700,11 @@ void TPartition::Initialize(const TActorContext& ctx) { } if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) { - PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(EscapeBadChars(TopicConverter->GetClientsideName()), + PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(EscapeBadChars(TopicName()), Partition, Config.GetYdbDatabasePath())); } else { - PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(TopicConverter->GetClientsideName(), + PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(TopicName(), Partition)); } diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index fa4bb0f708..2f70e6f3c7 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -31,10 +31,10 @@ static const ui32 MAX_USER_ACTS = 1000; void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) { TSet<TString> hasReadRule; - for (auto& userInfo : UsersInfoStorage->GetAll()) { - userInfo.second.ReadFromTimestamp = TInstant::Zero(); - userInfo.second.HasReadRule = false; - hasReadRule.insert(userInfo.first); + for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) { + userInfo.ReadFromTimestamp = TInstant::Zero(); + userInfo.HasReadRule = false; + hasReadRule.insert(consumer); } for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { const auto& consumer = config.GetReadRules(i); @@ -128,15 +128,15 @@ void TPartition::UpdateAvailableSize(const TActorContext& ctx) { auto now = ctx.Now(); WriteQuota->Update(now); - for (auto &c : UsersInfoStorage->GetAll()) { + for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) { while (true) { - if (!c.second.ReadQuota.CanExaust(now) && !c.second.ReadRequests.empty()) { + if (!userInfo.ReadQuota.CanExaust(now) && !userInfo.ReadRequests.empty()) { break; } - if (!c.second.ReadRequests.empty()) { - auto ri(std::move(c.second.ReadRequests.front().first)); - auto cookie = c.second.ReadRequests.front().second; - c.second.ReadRequests.pop_front(); + if (!userInfo.ReadRequests.empty()) { + auto ri(std::move(userInfo.ReadRequests.front().first)); + auto cookie = userInfo.ReadRequests.front().second; + userInfo.ReadRequests.pop_front(); ProcessRead(ctx, std::move(ri), cookie, false); } else break; @@ -202,13 +202,13 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { userInfo->Offset = StartOffset; ReadTimestampForOffset(importantUser, *userInfo, ctx); } - for (auto& userInfoPair : UsersInfoStorage->GetAll()) { - if (!important.contains(userInfoPair.first) && userInfoPair.second.Important && userInfoPair.second.LabeledCounters) { + for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) { + if (!important.contains(consumer) && userInfo.Important && userInfo.LabeledCounters) { ctx.Send( Tablet, - new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfoPair.second.LabeledCounters->GetGroup()) + new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo.LabeledCounters->GetGroup()) ); - userInfoPair.second.SetImportant(false); + userInfo.SetImportant(false); } } } @@ -823,9 +823,9 @@ void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo } void TPartition::ProcessTimestampsForNewData(const ui64 prevEndOffset, const TActorContext& ctx) { - for (auto& userInfoPair : UsersInfoStorage->GetAll()) { - if (userInfoPair.second.Offset >= (i64)prevEndOffset && userInfoPair.second.Offset < (i64)EndOffset) { - ReadTimestampForOffset(userInfoPair.first, userInfoPair.second, ctx); + for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) { + if (userInfo.Offset >= (i64)prevEndOffset && userInfo.Offset < (i64)EndOffset) { + ReadTimestampForOffset(consumer, userInfo, ctx); } } } diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index b74d1a6132..12ed674b7d 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -136,7 +136,7 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c Y_VERIFY(ReservedSize >= it->second.ReservedSize); ReservedSize -= it->second.ReservedSize; - it->second.GenerateCookie(owner, ev->PipeClient, ev->Sender, TopicConverter->GetClientsideName(), Partition, ctx);//will change OwnerCookie + it->second.GenerateCookie(owner, ev->PipeClient, ev->Sender, TopicName(), Partition, ctx);//will change OwnerCookie //cookie is generated. but answer will be sent when all inflight writes will be done - they in the same queue 'Requests' EmplaceRequest(TOwnershipMsg{ev->Cookie, it->second.OwnerCookie}, ctx); TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize); @@ -301,7 +301,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { ctx, NKikimrServices::PERSQUEUE, "Answering for message sourceid: '" << EscapeC(s) << - "', Topic: '" << TopicConverter->GetClientsideName() << + "', Topic: '" << TopicName() << "', Partition: " << Partition << ", SeqNo: " << seqNo << ", partNo: " << partNo << ", Offset: " << offset << " is " << (already ? "already written" : "stored on disk") @@ -849,7 +849,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const if (poffset >= curOffset) { LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "Already written message. Topic: '" << TopicConverter->GetClientsideName() + "Already written message. Topic: '" << TopicName() << "' Partition: " << Partition << " SourceId: '" << EscapeC(p.Msg.SourceId) << "'. Message seqNo = " << p.Msg.SeqNo << ". Committed seqNo = " << (isCommitted ? it_inMemory->second.SeqNo : 0) @@ -922,7 +922,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition + "Topic '" << TopicName() << "' partition " << Partition << " part blob processing sourceId '" << EscapeC(p.Msg.SourceId) << "' seqNo " << p.Msg.SeqNo << " partNo " << p.Msg.PartNo ); @@ -984,7 +984,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicConverter->GetClientsideName() << + "Topic '" << TopicName() << "' partition " << Partition << " part blob sourceId '" << EscapeC(p.Msg.SourceId) << "' seqNo " << p.Msg.SeqNo << " partNo " << p.Msg.PartNo << @@ -1021,7 +1021,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const } LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "writing blob: topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition + "writing blob: topic '" << TopicName() << "' partition " << Partition << " " << x.first.ToString() << " size " << x.second << " WTime " << ctx.Now().MilliSeconds() ); @@ -1060,7 +1060,7 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition + "Topic '" << TopicName() << "' partition " << Partition << " part blob complete sourceId '" << EscapeC(p.Msg.SourceId) << "' seqNo " << p.Msg.SeqNo << " partNo " << p.Msg.PartNo << " FormedBlobsCount " << PartitionedBlob.GetFormedBlobs().size() << " NewHead: " << NewHead @@ -1292,7 +1292,7 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "writing blob: topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition + "writing blob: topic '" << TopicName() << "' partition " << Partition << " compactOffset " << key.GetOffset() << "," << key.GetCount() << " HeadOffset " << Head.Offset << " endOffset " << EndOffset << " curOffset " << NewHead.GetNextOffset() << " " << key.ToString() @@ -1372,7 +1372,7 @@ void TPartition::RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie) { LOG_DEBUG_S( TActivationContext::AsActorContext(), NKikimrServices::PERSQUEUE, "Send write quota request." << - " Topic: \"" << TopicConverter->GetClientsideName() << "\"." << + " Topic: \"" << TopicName() << "\"." << " Partition: " << Partition << "." << " Amount: " << dataSize << "." << " Cookie: " << cookie |