aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-12-05 11:46:53 +0300
committerabcdef <akotov@ydb.tech>2023-12-05 13:02:03 +0300
commit4ff6a4d9aec2ce6fe4297433c90df34fd86a6b12 (patch)
tree0517ed0be14e49f26e9a8bb762503738de1e1aad
parent7b2103d4721f993008eef0733e217ed1f5a427a2 (diff)
downloadydb-4ff6a4d9aec2ce6fe4297433c90df34fd86a6b12.tar.gz
references instead of pointers
- заменил указатели на ссылки - убрал копирование
-rw-r--r--ydb/core/persqueue/partition.cpp2
-rw-r--r--ydb/core/persqueue/partition_sourcemanager.cpp56
-rw-r--r--ydb/core/persqueue/partition_sourcemanager.h14
-rw-r--r--ydb/core/persqueue/sourceid.cpp4
-rw-r--r--ydb/core/persqueue/sourceid.h2
5 files changed, 39 insertions, 39 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 5d3d8d4c97..bdea4fea1a 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -151,7 +151,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
, IsLocalDC(TabletConfig.GetLocalDC())
, DCId(std::move(dcId))
, PartitionGraph()
- , SourceManager(this)
+ , SourceManager(*this)
, StartOffset(0)
, EndOffset(0)
, WriteInflightSize(0)
diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp
index 62598305ec..d6aabef3e8 100644
--- a/ydb/core/persqueue/partition_sourcemanager.cpp
+++ b/ydb/core/persqueue/partition_sourcemanager.cpp
@@ -14,7 +14,7 @@ bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node);
// TPartitionSourceManager
//
-TPartitionSourceManager::TPartitionSourceManager(TPartition* partition)
+TPartitionSourceManager::TPartitionSourceManager(TPartition& partition)
: Partition(partition) {
}
@@ -41,7 +41,7 @@ void TPartitionSourceManager::ScheduleBatch() {
PendingCookies.insert(++Cookie);
TActorId actorId = PartitionRequester(parent->Id, parent->TabletId);
- Partition->Send(actorId, CreateRequest(parent->Id).release(), 0, Cookie);
+ Partition.Send(actorId, CreateRequest(parent->Id).release(), 0, Cookie);
}
}
@@ -106,7 +106,7 @@ TPartitionSourceManager::TModificationBatch TPartitionSourceManager::CreateModif
const auto format = AppData(ctx)->PQConfig.GetEnableProtoSourceIdInfo()
? ESourceIdFormat::Proto
: ESourceIdFormat::Raw;
- return TModificationBatch(this, format);
+ return TModificationBatch(*this, format);
}
void TPartitionSourceManager::Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const TActorContext& ctx) {
@@ -142,7 +142,7 @@ void TPartitionSourceManager::Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const
}
TPartitionSourceManager::TPartitionNode TPartitionSourceManager::GetPartitionNode() const {
- return Partition->PartitionGraph.GetPartition(Partition->Partition);
+ return Partition.PartitionGraph.GetPartition(Partition.Partition);
}
void TPartitionSourceManager::FinishBatch(const TActorContext& ctx) {
@@ -167,10 +167,10 @@ void TPartitionSourceManager::FinishBatch(const TActorContext& ctx) {
Responses.clear();
PendingSourceIds.clear();
- if (Partition->CurrentStateFunc() == &TPartition::StateIdle) {
- Partition->HandleWrites(ctx);
+ if (Partition.CurrentStateFunc() == &TPartition::StateIdle) {
+ Partition.HandleWrites(ctx);
}
- Partition->ProcessMaxSeqNoRequest(ctx);
+ Partition.ProcessMaxSeqNoRequest(ctx);
}
bool TPartitionSourceManager::RequireEnqueue(const TString& sourceId) {
@@ -180,11 +180,11 @@ bool TPartitionSourceManager::RequireEnqueue(const TString& sourceId) {
}
TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const {
- return Partition->SourceIdStorage;
+ return Partition.SourceIdStorage;
}
bool TPartitionSourceManager::HasParents() const {
- auto node = Partition->PartitionGraph.GetPartition(Partition->Partition);
+ auto node = Partition.PartitionGraph.GetPartition(Partition.Partition);
return node && !node.value()->Parents.empty();
}
@@ -194,7 +194,7 @@ TActorId TPartitionSourceManager::PartitionRequester(TPartitionId id, ui64 table
return it->second;
}
- TActorId actorId = Partition->RegisterWithSameMailbox(CreateRequester(Partition->SelfId(),
+ TActorId actorId = Partition.RegisterWithSameMailbox(CreateRequester(Partition.SelfId(),
id,
tabletId));
RequesterActors[id] = actorId;
@@ -213,7 +213,7 @@ std::unique_ptr<TEvPQ::TEvSourceIdRequest> TPartitionSourceManager::CreateReques
void TPartitionSourceManager::PassAway() {
for(const auto [_, actorId] : RequesterActors) {
- Partition->Send(actorId, new TEvents::TEvPoison());
+ Partition.Send(actorId, new TEvents::TEvPoison());
}
RequesterActors.clear();
}
@@ -223,16 +223,16 @@ void TPartitionSourceManager::PassAway() {
// TPartitionSourceManager::TModificationBatch
//
-TPartitionSourceManager::TModificationBatch::TModificationBatch(TPartitionSourceManager* manager, ESourceIdFormat format)
+TPartitionSourceManager::TModificationBatch::TModificationBatch(TPartitionSourceManager& manager, ESourceIdFormat format)
: Manager(manager)
- , Node(manager->GetPartitionNode())
+ , Node(Manager.GetPartitionNode())
, SourceIdWriter(format)
- , HeartbeatEmitter(manager->Partition->SourceIdStorage) {
+ , HeartbeatEmitter(Manager.Partition.SourceIdStorage) {
}
TPartitionSourceManager::TModificationBatch::~TModificationBatch() {
for(auto& [k, _] : SourceIdWriter.GetSourceIdsToWrite()) {
- Manager->Sources.erase(k);
+ Manager.Sources.erase(k);
}
}
@@ -241,7 +241,7 @@ TMaybe<THeartbeat> TPartitionSourceManager::TModificationBatch::CanEmit() const
}
TPartitionSourceManager::TSourceManager TPartitionSourceManager::TModificationBatch::GetSource(const TString& id) {
- return TPartitionSourceManager::TSourceManager(this, id);
+ return TPartitionSourceManager::TSourceManager(*this, id);
}
void TPartitionSourceManager::TModificationBatch::Cancel() {
@@ -253,14 +253,14 @@ bool TPartitionSourceManager::TModificationBatch::HasModifications() const {
}
void TPartitionSourceManager::TModificationBatch::FillRequest(TEvKeyValue::TEvRequest* request) {
- SourceIdWriter.FillRequest(request, Manager->Partition->Partition);
+ SourceIdWriter.FillRequest(request, Manager.Partition.Partition);
}
void TPartitionSourceManager::TModificationBatch::DeregisterSourceId(const TString& sourceId) {
SourceIdWriter.DeregisterSourceId(sourceId);
}
-TPartitionSourceManager* TPartitionSourceManager::TModificationBatch::GetManager() const {
+TPartitionSourceManager& TPartitionSourceManager::TModificationBatch::GetManager() const {
return Manager;
}
@@ -278,12 +278,12 @@ TPartitionSourceManager::TSourceInfo Convert(TSourceIdInfo value) {
return result;
}
-TPartitionSourceManager::TSourceManager::TSourceManager(TModificationBatch* batch, const TString& id)
+TPartitionSourceManager::TSourceManager::TSourceManager(TModificationBatch& batch, const TString& id)
: Batch(batch)
, SourceId(id) {
auto& memory = MemoryStorage();
auto& writer = WriteStorage();
- auto& sources = batch->GetManager()->Sources;
+ auto& sources = Batch.GetManager().Sources;
InMemory = memory.find(id);
InWriter = writer.find(id);
@@ -304,7 +304,7 @@ TPartitionSourceManager::TSourceManager::TSourceManager(TModificationBatch* batc
return;
}
- Info.Pending = IsResearchRequires(batch->Node);
+ Info.Pending = IsResearchRequires(Batch.Node);
}
std::optional<ui64> TPartitionSourceManager::TSourceManager::SeqNo() const {
@@ -325,18 +325,18 @@ std::optional<ui64> TPartitionSourceManager::TSourceManager::UpdatedSeqNo() cons
void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp) {
if (InMemory == MemoryStorage().end()) {
- Batch->SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp);
+ Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp);
} else {
- Batch->SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp));
+ Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp));
}
}
void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp, THeartbeat&& heartbeat) {
- Batch->HeartbeatEmitter.Process(SourceId, heartbeat);
+ Batch.HeartbeatEmitter.Process(SourceId, heartbeat);
if (InMemory == MemoryStorage().end()) {
- Batch->SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp, heartbeat);
+ Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp, std::move(heartbeat));
} else {
- Batch->SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp, std::move(heartbeat)));
+ Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp, std::move(heartbeat)));
}
}
@@ -345,11 +345,11 @@ TPartitionSourceManager::TSourceManager::operator bool() const {
}
const TSourceIdMap& TPartitionSourceManager::TSourceManager::MemoryStorage() const {
- return Batch->GetManager()->GetSourceIdStorage().GetInMemorySourceIds();
+ return Batch.GetManager().GetSourceIdStorage().GetInMemorySourceIds();
}
const TSourceIdMap& TPartitionSourceManager::TSourceManager::WriteStorage() const {
- return Batch->SourceIdWriter.GetSourceIdsToWrite();
+ return Batch.SourceIdWriter.GetSourceIdsToWrite();
}
diff --git a/ydb/core/persqueue/partition_sourcemanager.h b/ydb/core/persqueue/partition_sourcemanager.h
index 9fe09d958d..4899e1598a 100644
--- a/ydb/core/persqueue/partition_sourcemanager.h
+++ b/ydb/core/persqueue/partition_sourcemanager.h
@@ -36,7 +36,7 @@ public:
class TSourceManager {
public:
- TSourceManager(TModificationBatch* batch, const TString& id);
+ TSourceManager(TModificationBatch& batch, const TString& id);
// Checks whether a message with the specified Sourceid can be processed.
// The message can be processed if it is not required to receive information
@@ -60,7 +60,7 @@ public:
private:
- TModificationBatch* Batch;
+ TModificationBatch& Batch;
const TString SourceId;
TSourceInfo Info;
@@ -74,7 +74,7 @@ public:
class TModificationBatch {
friend TSourceManager;
public:
- TModificationBatch(TPartitionSourceManager* manager, ESourceIdFormat format);
+ TModificationBatch(TPartitionSourceManager& manager, ESourceIdFormat format);
~TModificationBatch();
TMaybe<THeartbeat> CanEmit() const;
@@ -91,17 +91,17 @@ public:
void DeregisterSourceId(const TString& sourceId);
private:
- TPartitionSourceManager* GetManager() const;
+ TPartitionSourceManager& GetManager() const;
private:
- TPartitionSourceManager* Manager;
+ TPartitionSourceManager& Manager;
TPartitionNode Node;
TSourceIdWriter SourceIdWriter;
THeartbeatEmitter HeartbeatEmitter;
};
- TPartitionSourceManager(TPartition* partition);
+ explicit TPartitionSourceManager(TPartition& partition);
// For a partition obtained as a result of a merge or split, it requests
// information about the consumer's parameters from the parent partitions.
@@ -134,7 +134,7 @@ private:
private:
- TPartition* Partition;
+ TPartition& Partition;
TSourceIds UnknownSourceIds;
TSourceIds PendingSourceIds;
diff --git a/ydb/core/persqueue/sourceid.cpp b/ydb/core/persqueue/sourceid.cpp
index 69216b487e..4d124f7eda 100644
--- a/ydb/core/persqueue/sourceid.cpp
+++ b/ydb/core/persqueue/sourceid.cpp
@@ -116,12 +116,12 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs)
{
}
-TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat& heartbeat)
+TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat)
: SeqNo(seqNo)
, Offset(offset)
, WriteTimestamp(createTs)
, CreateTimestamp(createTs)
- , LastHeartbeat(heartbeat)
+ , LastHeartbeat(std::move(heartbeat))
{
}
diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h
index c5ac29cd81..9ddf494f38 100644
--- a/ydb/core/persqueue/sourceid.h
+++ b/ydb/core/persqueue/sourceid.h
@@ -34,7 +34,7 @@ struct TSourceIdInfo {
TSourceIdInfo() = default;
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs);
- TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat& heartbeat);
+ TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat);
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit = false);
TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const;