aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-04-07 15:42:48 +0300
committertesseract <tesseract@yandex-team.com>2023-04-07 15:42:48 +0300
commit200196d273767acf35f66bdb1be01bb80121179f (patch)
tree2017b3ceb73b621857e0bbd62a0e86683ca3de3b
parent13c3a8f6f9254bba5cadb36f1f1a6bf770bc64da (diff)
downloadydb-200196d273767acf35f66bdb1be01bb80121179f.tar.gz
Extract CreatePartitionActor method
-rw-r--r--ydb/core/persqueue/pq_impl.cpp40
-rw-r--r--ydb/core/persqueue/pq_impl.h5
2 files changed, 27 insertions, 18 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 78f8b6e927e..f8f8559d99e 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -623,9 +623,7 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
const auto partitionId = partition.GetPartitionId();
if (Partitions.find(partitionId) == Partitions.end()) {
Partitions.emplace(partitionId, TPartitionInfo(
- ctx.Register(new TPartition(TabletID(), partitionId, ctx.SelfID, CacheActor, TopicConverter,
- IsLocalDC, DCId, IsServerless, Config, *Counters, SubDomainOutOfSpace,
- true)),
+ ctx.Register(CreatePartitionActor(partitionId, Config, true, ctx)),
GetPartitionKeyRange(partition),
true,
*Counters
@@ -787,9 +785,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
for (const auto& partition : Config.GetPartitions()) { // no partitions will be created with empty config
const auto partitionId = partition.GetPartitionId();
Partitions.emplace(partitionId, TPartitionInfo(
- ctx.Register(new TPartition(TabletID(), partitionId, ctx.SelfID, CacheActor, TopicConverter,
- IsLocalDC, DCId, IsServerless, Config, *Counters, SubDomainOutOfSpace,
- false)),
+ ctx.Register(CreatePartitionActor(partitionId, Config, false, ctx)),
GetPartitionKeyRange(partition),
false,
*Counters
@@ -3061,6 +3057,25 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
tx.PartitionRepliesExpected = Partitions.size();
}
+TPartition* TPersQueue::CreatePartitionActor(ui32 partitionId,
+ const NKikimrPQ::TPQTabletConfig& config,
+ bool newPartition,
+ const TActorContext& ctx)
+{
+ return new TPartition(TabletID(),
+ partitionId,
+ ctx.SelfID,
+ CacheActor,
+ TopicConverter,
+ IsLocalDC,
+ DCId,
+ IsServerless,
+ config,
+ *Counters,
+ SubDomainOutOfSpace,
+ newPartition);
+}
+
void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config,
const TActorContext& ctx)
{
@@ -3080,18 +3095,7 @@ void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config,
continue;
}
- TActorId actorId = ctx.Register(new TPartition(TabletID(),
- partitionId,
- ctx.SelfID,
- CacheActor,
- TopicConverter,
- IsLocalDC,
- DCId,
- IsServerless,
- config,
- *Counters,
- SubDomainOutOfSpace,
- true));
+ TActorId actorId = ctx.Register(CreatePartitionActor(partitionId, config, true, ctx));
Partitions.emplace(std::piecewise_construct,
std::forward_as_tuple(partitionId),
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index 487101ef892..569121c7f90 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -20,6 +20,7 @@ struct TPartitionInfo;
struct TChangeNotification;
class TResponseBuilder;
+class TPartition;
//USES MAIN chanel for big blobs, INLINE or EXTRA for ZK-like load, EXTRA2 for small blob for logging (VDISK of type LOG is ok with EXTRA2)
@@ -294,6 +295,10 @@ private:
void SendEvProposePartitionConfig(const TActorContext& ctx,
TDistributedTransaction& tx);
+ TPartition* CreatePartitionActor(ui32 partitionId,
+ const NKikimrPQ::TPQTabletConfig& config,
+ bool newPartition,
+ const TActorContext& ctx);
void CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config,
const TActorContext& ctx);
void EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig& config) const;