diff options
author | abcdef <akotov@ydb.tech> | 2023-12-05 11:46:53 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-12-05 13:02:03 +0300 |
commit | 4ff6a4d9aec2ce6fe4297433c90df34fd86a6b12 (patch) | |
tree | 0517ed0be14e49f26e9a8bb762503738de1e1aad | |
parent | 7b2103d4721f993008eef0733e217ed1f5a427a2 (diff) | |
download | ydb-4ff6a4d9aec2ce6fe4297433c90df34fd86a6b12.tar.gz |
references instead of pointers
- заменил указатели на ссылки
- убрал копирование
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_sourcemanager.cpp | 56 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_sourcemanager.h | 14 | ||||
-rw-r--r-- | ydb/core/persqueue/sourceid.cpp | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/sourceid.h | 2 |
5 files changed, 39 insertions, 39 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 5d3d8d4c97..bdea4fea1a 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -151,7 +151,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co , IsLocalDC(TabletConfig.GetLocalDC()) , DCId(std::move(dcId)) , PartitionGraph() - , SourceManager(this) + , SourceManager(*this) , StartOffset(0) , EndOffset(0) , WriteInflightSize(0) diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp index 62598305ec..d6aabef3e8 100644 --- a/ydb/core/persqueue/partition_sourcemanager.cpp +++ b/ydb/core/persqueue/partition_sourcemanager.cpp @@ -14,7 +14,7 @@ bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node); // TPartitionSourceManager // -TPartitionSourceManager::TPartitionSourceManager(TPartition* partition) +TPartitionSourceManager::TPartitionSourceManager(TPartition& partition) : Partition(partition) { } @@ -41,7 +41,7 @@ void TPartitionSourceManager::ScheduleBatch() { PendingCookies.insert(++Cookie); TActorId actorId = PartitionRequester(parent->Id, parent->TabletId); - Partition->Send(actorId, CreateRequest(parent->Id).release(), 0, Cookie); + Partition.Send(actorId, CreateRequest(parent->Id).release(), 0, Cookie); } } @@ -106,7 +106,7 @@ TPartitionSourceManager::TModificationBatch TPartitionSourceManager::CreateModif const auto format = AppData(ctx)->PQConfig.GetEnableProtoSourceIdInfo() ? ESourceIdFormat::Proto : ESourceIdFormat::Raw; - return TModificationBatch(this, format); + return TModificationBatch(*this, format); } void TPartitionSourceManager::Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const TActorContext& ctx) { @@ -142,7 +142,7 @@ void TPartitionSourceManager::Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const } TPartitionSourceManager::TPartitionNode TPartitionSourceManager::GetPartitionNode() const { - return Partition->PartitionGraph.GetPartition(Partition->Partition); + return Partition.PartitionGraph.GetPartition(Partition.Partition); } void TPartitionSourceManager::FinishBatch(const TActorContext& ctx) { @@ -167,10 +167,10 @@ void TPartitionSourceManager::FinishBatch(const TActorContext& ctx) { Responses.clear(); PendingSourceIds.clear(); - if (Partition->CurrentStateFunc() == &TPartition::StateIdle) { - Partition->HandleWrites(ctx); + if (Partition.CurrentStateFunc() == &TPartition::StateIdle) { + Partition.HandleWrites(ctx); } - Partition->ProcessMaxSeqNoRequest(ctx); + Partition.ProcessMaxSeqNoRequest(ctx); } bool TPartitionSourceManager::RequireEnqueue(const TString& sourceId) { @@ -180,11 +180,11 @@ bool TPartitionSourceManager::RequireEnqueue(const TString& sourceId) { } TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const { - return Partition->SourceIdStorage; + return Partition.SourceIdStorage; } bool TPartitionSourceManager::HasParents() const { - auto node = Partition->PartitionGraph.GetPartition(Partition->Partition); + auto node = Partition.PartitionGraph.GetPartition(Partition.Partition); return node && !node.value()->Parents.empty(); } @@ -194,7 +194,7 @@ TActorId TPartitionSourceManager::PartitionRequester(TPartitionId id, ui64 table return it->second; } - TActorId actorId = Partition->RegisterWithSameMailbox(CreateRequester(Partition->SelfId(), + TActorId actorId = Partition.RegisterWithSameMailbox(CreateRequester(Partition.SelfId(), id, tabletId)); RequesterActors[id] = actorId; @@ -213,7 +213,7 @@ std::unique_ptr<TEvPQ::TEvSourceIdRequest> TPartitionSourceManager::CreateReques void TPartitionSourceManager::PassAway() { for(const auto [_, actorId] : RequesterActors) { - Partition->Send(actorId, new TEvents::TEvPoison()); + Partition.Send(actorId, new TEvents::TEvPoison()); } RequesterActors.clear(); } @@ -223,16 +223,16 @@ void TPartitionSourceManager::PassAway() { // TPartitionSourceManager::TModificationBatch // -TPartitionSourceManager::TModificationBatch::TModificationBatch(TPartitionSourceManager* manager, ESourceIdFormat format) +TPartitionSourceManager::TModificationBatch::TModificationBatch(TPartitionSourceManager& manager, ESourceIdFormat format) : Manager(manager) - , Node(manager->GetPartitionNode()) + , Node(Manager.GetPartitionNode()) , SourceIdWriter(format) - , HeartbeatEmitter(manager->Partition->SourceIdStorage) { + , HeartbeatEmitter(Manager.Partition.SourceIdStorage) { } TPartitionSourceManager::TModificationBatch::~TModificationBatch() { for(auto& [k, _] : SourceIdWriter.GetSourceIdsToWrite()) { - Manager->Sources.erase(k); + Manager.Sources.erase(k); } } @@ -241,7 +241,7 @@ TMaybe<THeartbeat> TPartitionSourceManager::TModificationBatch::CanEmit() const } TPartitionSourceManager::TSourceManager TPartitionSourceManager::TModificationBatch::GetSource(const TString& id) { - return TPartitionSourceManager::TSourceManager(this, id); + return TPartitionSourceManager::TSourceManager(*this, id); } void TPartitionSourceManager::TModificationBatch::Cancel() { @@ -253,14 +253,14 @@ bool TPartitionSourceManager::TModificationBatch::HasModifications() const { } void TPartitionSourceManager::TModificationBatch::FillRequest(TEvKeyValue::TEvRequest* request) { - SourceIdWriter.FillRequest(request, Manager->Partition->Partition); + SourceIdWriter.FillRequest(request, Manager.Partition.Partition); } void TPartitionSourceManager::TModificationBatch::DeregisterSourceId(const TString& sourceId) { SourceIdWriter.DeregisterSourceId(sourceId); } -TPartitionSourceManager* TPartitionSourceManager::TModificationBatch::GetManager() const { +TPartitionSourceManager& TPartitionSourceManager::TModificationBatch::GetManager() const { return Manager; } @@ -278,12 +278,12 @@ TPartitionSourceManager::TSourceInfo Convert(TSourceIdInfo value) { return result; } -TPartitionSourceManager::TSourceManager::TSourceManager(TModificationBatch* batch, const TString& id) +TPartitionSourceManager::TSourceManager::TSourceManager(TModificationBatch& batch, const TString& id) : Batch(batch) , SourceId(id) { auto& memory = MemoryStorage(); auto& writer = WriteStorage(); - auto& sources = batch->GetManager()->Sources; + auto& sources = Batch.GetManager().Sources; InMemory = memory.find(id); InWriter = writer.find(id); @@ -304,7 +304,7 @@ TPartitionSourceManager::TSourceManager::TSourceManager(TModificationBatch* batc return; } - Info.Pending = IsResearchRequires(batch->Node); + Info.Pending = IsResearchRequires(Batch.Node); } std::optional<ui64> TPartitionSourceManager::TSourceManager::SeqNo() const { @@ -325,18 +325,18 @@ std::optional<ui64> TPartitionSourceManager::TSourceManager::UpdatedSeqNo() cons void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp) { if (InMemory == MemoryStorage().end()) { - Batch->SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp); + Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp); } else { - Batch->SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp)); + Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp)); } } void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp, THeartbeat&& heartbeat) { - Batch->HeartbeatEmitter.Process(SourceId, heartbeat); + Batch.HeartbeatEmitter.Process(SourceId, heartbeat); if (InMemory == MemoryStorage().end()) { - Batch->SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp, heartbeat); + Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp, std::move(heartbeat)); } else { - Batch->SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp, std::move(heartbeat))); + Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp, std::move(heartbeat))); } } @@ -345,11 +345,11 @@ TPartitionSourceManager::TSourceManager::operator bool() const { } const TSourceIdMap& TPartitionSourceManager::TSourceManager::MemoryStorage() const { - return Batch->GetManager()->GetSourceIdStorage().GetInMemorySourceIds(); + return Batch.GetManager().GetSourceIdStorage().GetInMemorySourceIds(); } const TSourceIdMap& TPartitionSourceManager::TSourceManager::WriteStorage() const { - return Batch->SourceIdWriter.GetSourceIdsToWrite(); + return Batch.SourceIdWriter.GetSourceIdsToWrite(); } diff --git a/ydb/core/persqueue/partition_sourcemanager.h b/ydb/core/persqueue/partition_sourcemanager.h index 9fe09d958d..4899e1598a 100644 --- a/ydb/core/persqueue/partition_sourcemanager.h +++ b/ydb/core/persqueue/partition_sourcemanager.h @@ -36,7 +36,7 @@ public: class TSourceManager { public: - TSourceManager(TModificationBatch* batch, const TString& id); + TSourceManager(TModificationBatch& batch, const TString& id); // Checks whether a message with the specified Sourceid can be processed. // The message can be processed if it is not required to receive information @@ -60,7 +60,7 @@ public: private: - TModificationBatch* Batch; + TModificationBatch& Batch; const TString SourceId; TSourceInfo Info; @@ -74,7 +74,7 @@ public: class TModificationBatch { friend TSourceManager; public: - TModificationBatch(TPartitionSourceManager* manager, ESourceIdFormat format); + TModificationBatch(TPartitionSourceManager& manager, ESourceIdFormat format); ~TModificationBatch(); TMaybe<THeartbeat> CanEmit() const; @@ -91,17 +91,17 @@ public: void DeregisterSourceId(const TString& sourceId); private: - TPartitionSourceManager* GetManager() const; + TPartitionSourceManager& GetManager() const; private: - TPartitionSourceManager* Manager; + TPartitionSourceManager& Manager; TPartitionNode Node; TSourceIdWriter SourceIdWriter; THeartbeatEmitter HeartbeatEmitter; }; - TPartitionSourceManager(TPartition* partition); + explicit TPartitionSourceManager(TPartition& partition); // For a partition obtained as a result of a merge or split, it requests // information about the consumer's parameters from the parent partitions. @@ -134,7 +134,7 @@ private: private: - TPartition* Partition; + TPartition& Partition; TSourceIds UnknownSourceIds; TSourceIds PendingSourceIds; diff --git a/ydb/core/persqueue/sourceid.cpp b/ydb/core/persqueue/sourceid.cpp index 69216b487e..4d124f7eda 100644 --- a/ydb/core/persqueue/sourceid.cpp +++ b/ydb/core/persqueue/sourceid.cpp @@ -116,12 +116,12 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs) { } -TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat& heartbeat) +TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat) : SeqNo(seqNo) , Offset(offset) , WriteTimestamp(createTs) , CreateTimestamp(createTs) - , LastHeartbeat(heartbeat) + , LastHeartbeat(std::move(heartbeat)) { } diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h index c5ac29cd81..9ddf494f38 100644 --- a/ydb/core/persqueue/sourceid.h +++ b/ydb/core/persqueue/sourceid.h @@ -34,7 +34,7 @@ struct TSourceIdInfo { TSourceIdInfo() = default; TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs); - TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat& heartbeat); + TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat); TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit = false); TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const; |