diff options
| author | Nikolay Shestakov <[email protected]> | 2024-04-09 14:14:18 +0500 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-04-09 14:14:18 +0500 |
| commit | a6b5943f86c89ed27edecff76a6282a9b3bcf357 (patch) | |
| tree | 4bd189a6ed6ec8ff1f27a20ec90817f30e282536 | |
| parent | ee5f09616821a1e008ff0864238bede4da4788cd (diff) | |
fix partition counter value (#3579)
| -rw-r--r-- | ydb/core/persqueue/partition.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 43 | ||||
| -rw-r--r-- | ydb/core/persqueue/transaction.cpp | 18 | ||||
| -rw-r--r-- | ydb/core/persqueue/utils.cpp | 13 |
4 files changed, 20 insertions, 56 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 24cee3ffe2f..5738c413d77 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -607,7 +607,7 @@ void TPartition::InitComplete(const TActorContext& ctx) { PartitionCountersLabeled->GetCounters()[METRIC_INIT_TIME] = InitDuration.MilliSeconds(); PartitionCountersLabeled->GetCounters()[METRIC_LIFE_TIME] = CreationTime.MilliSeconds(); PartitionCountersLabeled->GetCounters()[METRIC_PARTITIONS] = 1; - PartitionCountersLabeled->GetCounters()[METRIC_PARTITIONS_TOTAL] = Config.PartitionIdsSize(); + PartitionCountersLabeled->GetCounters()[METRIC_PARTITIONS_TOTAL] = Config.PartitionsSize(); ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *PartitionCountersLabeled)); } UpdateUserInfoEndOffset(ctx.Now()); diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index aeced7ea3dc..1309f6b41fa 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -682,19 +682,7 @@ void TPersQueue::ReplyError(const TActorContext& ctx, const ui64 responseCookie, void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx) { - THashSet<ui32> was; - if (NewConfig.PartitionsSize()) { - for (const auto& partition : NewConfig.GetPartitions()) { - was.insert(partition.GetPartitionId()); - } - } else { - for (const auto partitionId : NewConfig.GetPartitionIds()) { - was.insert(partitionId); - } - } - for (const auto& partition : Config.GetPartitions()) { - Y_VERIFY_S(was.contains(partition.GetPartitionId()), "New config is bad, missing partition " << partition.GetPartitionId()); - } + EnsurePartitionsAreNotDeleted(NewConfig); // in order to answer only after all parts are ready to work Y_ABORT_UNLESS(ConfigInited && AllOriginalPartitionsInited()); @@ -727,12 +715,6 @@ void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig, { Config = newConfig; - if (!Config.PartitionsSize()) { - for (const auto partitionId : Config.GetPartitionIds()) { - Config.AddPartitions()->SetPartitionId(partitionId); - } - } - ui32 cacheSize = CACHE_SIZE; if (Config.HasCacheSize()) { cacheSize = Config.GetCacheSize(); @@ -1017,12 +999,6 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& Migrate(Config); - if (!Config.PartitionsSize()) { - for (const auto partitionId : Config.GetPartitionIds()) { - Config.AddPartitions()->SetPartitionId(partitionId); - } - } - TopicName = Config.GetTopicName(); TopicPath = Config.GetTopicPath(); IsLocalDC = Config.GetLocalDC(); @@ -4250,12 +4226,6 @@ void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config, Y_ABORT_UNLESS(ConfigInited && AllOriginalPartitionsInited()); - if (!config.PartitionsSize()) { - for (const auto partitionId : config.GetPartitionIds()) { - config.AddPartitions()->SetPartitionId(partitionId); - } - } - for (const auto& partition : config.GetPartitions()) { const TPartitionId partitionId(partition.GetPartitionId()); if (Partitions.contains(partitionId)) { @@ -4274,15 +4244,8 @@ void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config, void TPersQueue::EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig& config) const { THashSet<ui32> was; - - if (config.PartitionsSize()) { - for (const auto& partition : config.GetPartitions()) { - was.insert(partition.GetPartitionId()); - } - } else { - for (const auto partitionId : config.GetPartitionIds()) { - was.insert(partitionId); - } + for (const auto& partition : config.GetPartitions()) { + was.insert(partition.GetPartitionId()); } for (const auto& partition : Config.GetPartitions()) { diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index ba9c9ba1fd5..5361d12b616 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -32,7 +32,7 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction& case NKikimrPQ::TTransaction::KIND_UNKNOWN: Y_FAIL_S("unknown transaction type"); } - + if (tx.HasSelfPredicate()) { SelfDecision = tx.GetSelfPredicate() ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT; @@ -79,14 +79,8 @@ void TDistributedTransaction::InitPartitions() { Partitions.clear(); - if (TabletConfig.PartitionsSize()) { - for (const auto& partition : TabletConfig.GetPartitions()) { - Partitions.emplace(partition.GetPartitionId()); - } - } else { - for (auto partitionId : TabletConfig.GetPartitionIds()) { - Partitions.emplace(partitionId); - } + for (const auto& partition : TabletConfig.GetPartitions()) { + Partitions.emplace(partition.GetPartitionId()); } } @@ -186,7 +180,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans } } } - + InitPartitions(); PartitionRepliesCount = 0; @@ -380,7 +374,7 @@ TString TDistributedTransaction::GetKey() const { return GetTxKey(TxId); } - + void TDistributedTransaction::BindMsgToPipe(ui64 tabletId, const IEventBase& event) { Y_ABORT_UNLESS(event.IsSerializable()); @@ -396,7 +390,7 @@ void TDistributedTransaction::UnbindMsgsFromPipe(ui64 tabletId) OutputMsgs.erase(tabletId); } -auto TDistributedTransaction::GetBindedMsgs(ui64 tabletId) -> const TVector<TSerializedMessage>& +auto TDistributedTransaction::GetBindedMsgs(ui64 tabletId) -> const TVector<TSerializedMessage>& { if (auto p = OutputMsgs.find(tabletId); p != OutputMsgs.end()) { return p->second; diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index f9589c40416..d8b0a8e1b8f 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -40,9 +40,10 @@ static constexpr ui64 PUT_UNIT_SIZE = 40960u; // 40Kb ui64 PutUnitsSize(const ui64 size) { ui64 putUnitsCount = size / PUT_UNIT_SIZE; - if (size % PUT_UNIT_SIZE != 0) - ++putUnitsCount; - return putUnitsCount; + if (size % PUT_UNIT_SIZE != 0) { + ++putUnitsCount; + } + return putUnitsCount; } bool IsImportantClient(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) { @@ -88,6 +89,12 @@ void Migrate(NKikimrPQ::TPQTabletConfig& config) { consumer->SetImportant(IsImportantClient(config, consumer->GetName())); } } + + if (!config.PartitionsSize()) { + for (const auto partitionId : config.GetPartitionIds()) { + config.AddPartitions()->SetPartitionId(partitionId); + } + } } bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) { |
