summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <[email protected]>2024-04-09 14:14:18 +0500
committerGitHub <[email protected]>2024-04-09 14:14:18 +0500
commita6b5943f86c89ed27edecff76a6282a9b3bcf357 (patch)
tree4bd189a6ed6ec8ff1f27a20ec90817f30e282536
parentee5f09616821a1e008ff0864238bede4da4788cd (diff)
fix partition counter value (#3579)
-rw-r--r--ydb/core/persqueue/partition.cpp2
-rw-r--r--ydb/core/persqueue/pq_impl.cpp43
-rw-r--r--ydb/core/persqueue/transaction.cpp18
-rw-r--r--ydb/core/persqueue/utils.cpp13
4 files changed, 20 insertions, 56 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 24cee3ffe2f..5738c413d77 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -607,7 +607,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
PartitionCountersLabeled->GetCounters()[METRIC_INIT_TIME] = InitDuration.MilliSeconds();
PartitionCountersLabeled->GetCounters()[METRIC_LIFE_TIME] = CreationTime.MilliSeconds();
PartitionCountersLabeled->GetCounters()[METRIC_PARTITIONS] = 1;
- PartitionCountersLabeled->GetCounters()[METRIC_PARTITIONS_TOTAL] = Config.PartitionIdsSize();
+ PartitionCountersLabeled->GetCounters()[METRIC_PARTITIONS_TOTAL] = Config.PartitionsSize();
ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *PartitionCountersLabeled));
}
UpdateUserInfoEndOffset(ctx.Now());
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index aeced7ea3dc..1309f6b41fa 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -682,19 +682,7 @@ void TPersQueue::ReplyError(const TActorContext& ctx, const ui64 responseCookie,
void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
{
- THashSet<ui32> was;
- if (NewConfig.PartitionsSize()) {
- for (const auto& partition : NewConfig.GetPartitions()) {
- was.insert(partition.GetPartitionId());
- }
- } else {
- for (const auto partitionId : NewConfig.GetPartitionIds()) {
- was.insert(partitionId);
- }
- }
- for (const auto& partition : Config.GetPartitions()) {
- Y_VERIFY_S(was.contains(partition.GetPartitionId()), "New config is bad, missing partition " << partition.GetPartitionId());
- }
+ EnsurePartitionsAreNotDeleted(NewConfig);
// in order to answer only after all parts are ready to work
Y_ABORT_UNLESS(ConfigInited && AllOriginalPartitionsInited());
@@ -727,12 +715,6 @@ void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig,
{
Config = newConfig;
- if (!Config.PartitionsSize()) {
- for (const auto partitionId : Config.GetPartitionIds()) {
- Config.AddPartitions()->SetPartitionId(partitionId);
- }
- }
-
ui32 cacheSize = CACHE_SIZE;
if (Config.HasCacheSize()) {
cacheSize = Config.GetCacheSize();
@@ -1017,12 +999,6 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
Migrate(Config);
- if (!Config.PartitionsSize()) {
- for (const auto partitionId : Config.GetPartitionIds()) {
- Config.AddPartitions()->SetPartitionId(partitionId);
- }
- }
-
TopicName = Config.GetTopicName();
TopicPath = Config.GetTopicPath();
IsLocalDC = Config.GetLocalDC();
@@ -4250,12 +4226,6 @@ void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config,
Y_ABORT_UNLESS(ConfigInited && AllOriginalPartitionsInited());
- if (!config.PartitionsSize()) {
- for (const auto partitionId : config.GetPartitionIds()) {
- config.AddPartitions()->SetPartitionId(partitionId);
- }
- }
-
for (const auto& partition : config.GetPartitions()) {
const TPartitionId partitionId(partition.GetPartitionId());
if (Partitions.contains(partitionId)) {
@@ -4274,15 +4244,8 @@ void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config,
void TPersQueue::EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig& config) const
{
THashSet<ui32> was;
-
- if (config.PartitionsSize()) {
- for (const auto& partition : config.GetPartitions()) {
- was.insert(partition.GetPartitionId());
- }
- } else {
- for (const auto partitionId : config.GetPartitionIds()) {
- was.insert(partitionId);
- }
+ for (const auto& partition : config.GetPartitions()) {
+ was.insert(partition.GetPartitionId());
}
for (const auto& partition : Config.GetPartitions()) {
diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp
index ba9c9ba1fd5..5361d12b616 100644
--- a/ydb/core/persqueue/transaction.cpp
+++ b/ydb/core/persqueue/transaction.cpp
@@ -32,7 +32,7 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction&
case NKikimrPQ::TTransaction::KIND_UNKNOWN:
Y_FAIL_S("unknown transaction type");
}
-
+
if (tx.HasSelfPredicate()) {
SelfDecision =
tx.GetSelfPredicate() ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT;
@@ -79,14 +79,8 @@ void TDistributedTransaction::InitPartitions()
{
Partitions.clear();
- if (TabletConfig.PartitionsSize()) {
- for (const auto& partition : TabletConfig.GetPartitions()) {
- Partitions.emplace(partition.GetPartitionId());
- }
- } else {
- for (auto partitionId : TabletConfig.GetPartitionIds()) {
- Partitions.emplace(partitionId);
- }
+ for (const auto& partition : TabletConfig.GetPartitions()) {
+ Partitions.emplace(partition.GetPartitionId());
}
}
@@ -186,7 +180,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans
}
}
}
-
+
InitPartitions();
PartitionRepliesCount = 0;
@@ -380,7 +374,7 @@ TString TDistributedTransaction::GetKey() const
{
return GetTxKey(TxId);
}
-
+
void TDistributedTransaction::BindMsgToPipe(ui64 tabletId, const IEventBase& event)
{
Y_ABORT_UNLESS(event.IsSerializable());
@@ -396,7 +390,7 @@ void TDistributedTransaction::UnbindMsgsFromPipe(ui64 tabletId)
OutputMsgs.erase(tabletId);
}
-auto TDistributedTransaction::GetBindedMsgs(ui64 tabletId) -> const TVector<TSerializedMessage>&
+auto TDistributedTransaction::GetBindedMsgs(ui64 tabletId) -> const TVector<TSerializedMessage>&
{
if (auto p = OutputMsgs.find(tabletId); p != OutputMsgs.end()) {
return p->second;
diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp
index f9589c40416..d8b0a8e1b8f 100644
--- a/ydb/core/persqueue/utils.cpp
+++ b/ydb/core/persqueue/utils.cpp
@@ -40,9 +40,10 @@ static constexpr ui64 PUT_UNIT_SIZE = 40960u; // 40Kb
ui64 PutUnitsSize(const ui64 size) {
ui64 putUnitsCount = size / PUT_UNIT_SIZE;
- if (size % PUT_UNIT_SIZE != 0)
- ++putUnitsCount;
- return putUnitsCount;
+ if (size % PUT_UNIT_SIZE != 0) {
+ ++putUnitsCount;
+ }
+ return putUnitsCount;
}
bool IsImportantClient(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) {
@@ -88,6 +89,12 @@ void Migrate(NKikimrPQ::TPQTabletConfig& config) {
consumer->SetImportant(IsImportantClient(config, consumer->GetName()));
}
}
+
+ if (!config.PartitionsSize()) {
+ for (const auto partitionId : config.GetPartitionIds()) {
+ config.AddPartitions()->SetPartitionId(partitionId);
+ }
+ }
}
bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) {