summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <[email protected]>2023-11-07 20:50:48 +0300
committertesseract <[email protected]>2023-11-07 21:14:14 +0300
commitcac0310b68aefaabee0c522967ad7103dd635f6a (patch)
treee8f32cc31cdfdcc49b7f908dd88d2e090d7b343c
parentf26b6b9e264d069c25367d223450acd4235417df (diff)
Fix partition initialization error
-rw-r--r--ydb/core/persqueue/partition.cpp5
-rw-r--r--ydb/core/persqueue/pq_impl.cpp15
-rw-r--r--ydb/core/persqueue/pq_impl.h1
3 files changed, 7 insertions, 14 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index f7a039bb392..fa25484de79 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -473,9 +473,8 @@ void TPartition::InitComplete(const TActorContext& ctx) {
ProcessTxsAndUserActs(ctx);
ctx.Send(ctx.SelfID, new TEvents::TEvWakeup());
- if (!NewPartition) {
- ctx.Send(Tablet, new TEvPQ::TEvInitComplete(Partition));
- }
+ ctx.Send(Tablet, new TEvPQ::TEvInitComplete(Partition));
+
for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) {
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 8ef8502a661..b8f76ed2a43 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -34,10 +34,10 @@ static constexpr ui32 MAX_HEARTBEAT_SIZE = 2_KB;
struct TPartitionInfo {
TPartitionInfo(const TActorId& actor, TMaybe<TPartitionKeyRange>&& keyRange,
- const bool initDone, const TTabletCountersBase& baseline)
+ const TTabletCountersBase& baseline)
: Actor(actor)
, KeyRange(std::move(keyRange))
- , InitDone(initDone)
+ , InitDone(false)
{
Baseline.Populate(baseline);
}
@@ -642,12 +642,8 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
Partitions.emplace(partitionId,
TPartitionInfo(ctx.Register(CreatePartitionActor(partitionId, TopicConverter, Config, true, ctx)),
GetPartitionKeyRange(Config, partition),
- true,
*Counters
));
-
- // InitCompleted is true because this partition is empty
- ++PartitionsInited; //newly created partition is empty and ready to work
}
}
@@ -861,7 +857,6 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
Partitions.emplace(partitionId, TPartitionInfo(
ctx.Register(CreatePartitionActor(partitionId, TopicConverter, Config, false, ctx)),
GetPartitionKeyRange(Config, partition),
- false,
*Counters
));
}
@@ -1180,7 +1175,7 @@ void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& c
++PartitionsInited;
Y_ABORT_UNLESS(ConfigInited);//partitions are inited only after config
- if (PartitionsInited == Partitions.size()) {
+ if (!InitCompleted && PartitionsInited == Partitions.size()) {
OnInitComplete(ctx);
}
@@ -3493,10 +3488,7 @@ void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config,
std::forward_as_tuple(partitionId),
std::forward_as_tuple(actorId,
GetPartitionKeyRange(config, partition),
- true,
*Counters));
-
- ++PartitionsInited;
}
}
@@ -3566,6 +3558,7 @@ void TPersQueue::OnInitComplete(const TActorContext& ctx)
{
SignalTabletActive(ctx);
TryStartTransaction(ctx);
+ InitCompleted = true;
}
ui64 TPersQueue::GetAllowedStep() const
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index 589403c64cd..f30645995fa 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -173,6 +173,7 @@ public:
private:
bool ConfigInited;
ui32 PartitionsInited;
+ bool InitCompleted = false;
THashMap<ui32, TPartitionInfo> Partitions;
THashMap<TString, TIntrusivePtr<TEvTabletCounters::TInFlightCookie>> CounterEventsInflight;