diff options
| author | tesseract <[email protected]> | 2023-11-07 20:50:48 +0300 |
|---|---|---|
| committer | tesseract <[email protected]> | 2023-11-07 21:14:14 +0300 |
| commit | cac0310b68aefaabee0c522967ad7103dd635f6a (patch) | |
| tree | e8f32cc31cdfdcc49b7f908dd88d2e090d7b343c | |
| parent | f26b6b9e264d069c25367d223450acd4235417df (diff) | |
Fix partition initialization error
| -rw-r--r-- | ydb/core/persqueue/partition.cpp | 5 | ||||
| -rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 15 | ||||
| -rw-r--r-- | ydb/core/persqueue/pq_impl.h | 1 |
3 files changed, 7 insertions, 14 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index f7a039bb392..fa25484de79 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -473,9 +473,8 @@ void TPartition::InitComplete(const TActorContext& ctx) { ProcessTxsAndUserActs(ctx); ctx.Send(ctx.SelfID, new TEvents::TEvWakeup()); - if (!NewPartition) { - ctx.Send(Tablet, new TEvPQ::TEvInitComplete(Partition)); - } + ctx.Send(Tablet, new TEvPQ::TEvInitComplete(Partition)); + for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) { LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 8ef8502a661..b8f76ed2a43 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -34,10 +34,10 @@ static constexpr ui32 MAX_HEARTBEAT_SIZE = 2_KB; struct TPartitionInfo { TPartitionInfo(const TActorId& actor, TMaybe<TPartitionKeyRange>&& keyRange, - const bool initDone, const TTabletCountersBase& baseline) + const TTabletCountersBase& baseline) : Actor(actor) , KeyRange(std::move(keyRange)) - , InitDone(initDone) + , InitDone(false) { Baseline.Populate(baseline); } @@ -642,12 +642,8 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx) Partitions.emplace(partitionId, TPartitionInfo(ctx.Register(CreatePartitionActor(partitionId, TopicConverter, Config, true, ctx)), GetPartitionKeyRange(Config, partition), - true, *Counters )); - - // InitCompleted is true because this partition is empty - ++PartitionsInited; //newly created partition is empty and ready to work } } @@ -861,7 +857,6 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& Partitions.emplace(partitionId, TPartitionInfo( ctx.Register(CreatePartitionActor(partitionId, TopicConverter, Config, false, ctx)), GetPartitionKeyRange(Config, partition), - false, *Counters )); } @@ -1180,7 +1175,7 @@ void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& c ++PartitionsInited; Y_ABORT_UNLESS(ConfigInited);//partitions are inited only after config - if (PartitionsInited == Partitions.size()) { + if (!InitCompleted && PartitionsInited == Partitions.size()) { OnInitComplete(ctx); } @@ -3493,10 +3488,7 @@ void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config, std::forward_as_tuple(partitionId), std::forward_as_tuple(actorId, GetPartitionKeyRange(config, partition), - true, *Counters)); - - ++PartitionsInited; } } @@ -3566,6 +3558,7 @@ void TPersQueue::OnInitComplete(const TActorContext& ctx) { SignalTabletActive(ctx); TryStartTransaction(ctx); + InitCompleted = true; } ui64 TPersQueue::GetAllowedStep() const diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 589403c64cd..f30645995fa 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -173,6 +173,7 @@ public: private: bool ConfigInited; ui32 PartitionsInited; + bool InitCompleted = false; THashMap<ui32, TPartitionInfo> Partitions; THashMap<TString, TIntrusivePtr<TEvTabletCounters::TInFlightCookie>> CounterEventsInflight; |
