diff options
author | tesseract <tesseract@yandex-team.com> | 2023-02-13 13:44:29 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-02-13 13:44:29 +0300 |
commit | 98391f41335ba5fea7d080215bd3f0c6905248b4 (patch) | |
tree | d2185deb505c5e6d06ef7432b80a58593ff7fcd2 | |
parent | af02c23f8bb7a05fbf49235d327d8aa958507448 (diff) | |
download | ydb-98391f41335ba5fea7d080215bd3f0c6905248b4.tar.gz |
Переименовать TPersQueueGroupInfo to TTopicInfo, TPQShardInfo to TTopicTabletInfo и TPersQueueInfo to TTopicPartitionInfo
rename PQInfos to Partitions
rename PersQueueGroups to Topics
rename TPersQueueGroupInfo to TTopicInfo
rename TPQShardInfo to TTopicTabletInfo
rename TPersQueueInfo to TTopicPartitionInfo
13 files changed, 109 insertions, 103 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index d2972a87cbd..4554fc8ec78 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -2346,7 +2346,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { TLocalPathId localPathId = rowset.GetValue<Schema::PersQueueGroups::PathId>(); TPathId pathId(selfId, localPathId); - TPersQueueGroupInfo::TPtr pqGroup = new TPersQueueGroupInfo(); + TTopicInfo::TPtr pqGroup = new TTopicInfo(); pqGroup->TabletConfig = rowset.GetValue<Schema::PersQueueGroups::TabletConfig>(); pqGroup->MaxPartsPerTablet = rowset.GetValue<Schema::PersQueueGroups::MaxPQPerShard>(); pqGroup->AlterVersion = rowset.GetValue<Schema::PersQueueGroups::AlterVersion>(); @@ -2356,7 +2356,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { const bool ok = pqGroup->FillKeySchema(pqGroup->TabletConfig); Y_VERIFY(ok); - Self->PersQueueGroups[pathId] = pqGroup; + Self->Topics[pathId] = pqGroup; Self->IncrementPathDbRefCount(pathId); auto it = pqBalancers.find(pathId); @@ -2379,7 +2379,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { if (!rowset.IsReady()) return false; while (!rowset.EndOfSet()) { - TPQShardInfo::TPersQueueInfo pqInfo; + TTopicTabletInfo::TTopicPartitionInfo pqInfo; TLocalPathId localPathId = rowset.GetValue<Schema::PersQueues::PathId>(); TPathId pathId(selfId, localPathId); pqInfo.PqId = rowset.GetValue<Schema::PersQueues::PqId>(); @@ -2404,10 +2404,10 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { pqInfo.KeyRange->ToBound = rowset.GetValue<Schema::PersQueues::RangeEnd>(); } - auto it = Self->PersQueueGroups.find(pathId); - Y_VERIFY(it != Self->PersQueueGroups.end()); + auto it = Self->Topics.find(pathId); + Y_VERIFY(it != Self->Topics.end()); Y_VERIFY(it->second); - TPersQueueGroupInfo::TPtr pqGroup = it->second; + TTopicInfo::TPtr pqGroup = it->second; if (pqInfo.AlterVersion <= pqGroup->AlterVersion) ++pqGroup->TotalPartitionCount; if (pqInfo.PqId >= pqGroup->NextPartitionId) { @@ -2415,11 +2415,11 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { pqGroup->TotalGroupCount = pqInfo.PqId + 1; } - TPQShardInfo::TPtr& pqShard = pqGroup->Shards[shardIdx]; + TTopicTabletInfo::TPtr& pqShard = pqGroup->Shards[shardIdx]; if (!pqShard) { - pqShard.Reset(new TPQShardInfo()); + pqShard.Reset(new TTopicTabletInfo()); } - pqShard->PQInfos.push_back(pqInfo); + pqShard->Partitions.push_back(pqInfo); if (!rowset.Next()) return false; @@ -2435,7 +2435,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { TLocalPathId localPathId = rowset.GetValue<Schema::PersQueueGroupAlters::PathId>(); TPathId pathId(selfId, localPathId); - TPersQueueGroupInfo::TPtr alterData = new TPersQueueGroupInfo(); + TTopicInfo::TPtr alterData = new TTopicInfo(); alterData->TabletConfig = rowset.GetValue<Schema::PersQueueGroupAlters::TabletConfig>(); alterData->MaxPartsPerTablet = rowset.GetValue<Schema::PersQueueGroupAlters::MaxPQPerShard>(); alterData->AlterVersion = rowset.GetValue<Schema::PersQueueGroupAlters::AlterVersion>(); @@ -2446,8 +2446,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { const bool ok = alterData->FillKeySchema(alterData->TabletConfig); Y_VERIFY(ok); - auto it = Self->PersQueueGroups.find(pathId); - Y_VERIFY(it != Self->PersQueueGroups.end()); + auto it = Self->Topics.find(pathId); + Y_VERIFY(it != Self->Topics.end()); alterData->TotalPartitionCount = it->second->GetTotalPartitionCountWithAlter(); alterData->BalancerTabletID = it->second->BalancerTabletID; @@ -3936,7 +3936,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { Self->TabletCounters->Simple()[COUNTER_USER_ATTRIBUTES_COUNT].Add(path->UserAttrs->Size()); if (path->IsPQGroup()) { - auto pqGroup = Self->PersQueueGroups.at(path->PathId); + auto pqGroup = Self->Topics.at(path->PathId); auto delta = pqGroup->AlterData ? pqGroup->AlterData->TotalPartitionCount : pqGroup->TotalPartitionCount; auto tabletConfig = pqGroup->AlterData ? (pqGroup->AlterData->TabletConfig.empty() ? pqGroup->TabletConfig : pqGroup->AlterData->TabletConfig) : pqGroup->TabletConfig; diff --git a/ydb/core/tx/schemeshard/schemeshard__init_root.cpp b/ydb/core/tx/schemeshard/schemeshard__init_root.cpp index 117c6bd91f9..47247d1f955 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init_root.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init_root.cpp @@ -734,7 +734,7 @@ struct TSchemeShard::TTxMigrate : public TSchemeShard::TRwTxBase { NIceDb::TUpdate<Schema::MigratedKesusInfos::Version>(kesusDescr.GetVersion())); } -// PersQueueGroups, +// Topics, // PersQueues, // RtmrVolumes, // RTMRPartitions, diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_allocate_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_allocate_pq.cpp index 26636e2aeb6..c898dd3c8e9 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_allocate_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_allocate_pq.cpp @@ -135,7 +135,7 @@ public: return result; } - TPersQueueGroupInfo::TPtr pqGroupInfo = new TPersQueueGroupInfo; + TTopicInfo::TPtr pqGroupInfo = new TTopicInfo; pqGroupInfo->TotalGroupCount = allocateDesc.GetTotalGroupCount(); if (pqGroupInfo->TotalGroupCount == 0 || pqGroupInfo->TotalGroupCount > TSchemeShard::MaxPQGroupPartitionsCount) { @@ -355,8 +355,8 @@ public: context.SS->TabletIdToShardIdx[tabletId] = idx; - TPQShardInfo::TPtr pqShard = new TPQShardInfo(); - pqShard->PQInfos.reserve(pqGroupInfo->MaxPartsPerTablet); + TTopicTabletInfo::TPtr pqShard = new TTopicTabletInfo(); + pqShard->Partitions.reserve(pqGroupInfo->MaxPartsPerTablet); pqGroupInfo->Shards[idx] = pqShard; } @@ -366,13 +366,13 @@ public: auto tabletId = item.second.second; auto idx = context.SS->TabletIdToShardIdx.at(tabletId); - TPQShardInfo::TPtr pqShard = pqGroupInfo->Shards.at(idx); + TTopicTabletInfo::TPtr pqShard = pqGroupInfo->Shards.at(idx); - TPQShardInfo::TPersQueueInfo pqInfo; + TTopicTabletInfo::TTopicPartitionInfo pqInfo; pqInfo.PqId = partId; pqInfo.GroupId = groupId; pqInfo.AlterVersion = pqGroupInfo->AlterVersion; - pqShard->PQInfos.push_back(pqInfo); + pqShard->Partitions.push_back(pqInfo); } { @@ -393,17 +393,17 @@ public: for (auto& shard : pqGroupInfo->Shards) { auto shardIdx = shard.first; - for (const auto& pqInfo : shard.second->PQInfos) { + for (const auto& pqInfo : shard.second->Partitions) { context.SS->PersistPersQueue(db, pathId, shardIdx, pqInfo); } } - TPersQueueGroupInfo::TPtr emptyGroup = new TPersQueueGroupInfo; + TTopicInfo::TPtr emptyGroup = new TTopicInfo; emptyGroup->Shards.swap(pqGroupInfo->Shards); - context.SS->PersQueueGroups[pathId] = emptyGroup; - context.SS->PersQueueGroups[pathId]->AlterData = pqGroupInfo; - context.SS->PersQueueGroups[pathId]->AlterVersion = pqGroupInfo->AlterVersion; + context.SS->Topics[pathId] = emptyGroup; + context.SS->Topics[pathId]->AlterData = pqGroupInfo; + context.SS->Topics[pathId]->AlterVersion = pqGroupInfo->AlterVersion; context.SS->IncrementPathDbRefCount(pathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp index d5d272e2e7d..729e50e4dc0 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp @@ -49,13 +49,13 @@ class TAlterPQ: public TSubOperation { public: using TSubOperation::TSubOperation; - TPersQueueGroupInfo::TPtr ParseParams( + TTopicInfo::TPtr ParseParams( TOperationContext& context, NKikimrPQ::TPQTabletConfig* tabletConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& alter, TString& errStr) { - TPersQueueGroupInfo::TPtr params = new TPersQueueGroupInfo(); + TTopicInfo::TPtr params = new TTopicInfo(); const bool hasKeySchema = tabletConfig->PartitionKeySchemaSize(); if (alter.HasTotalGroupCount()) { @@ -182,7 +182,7 @@ public: TTxState& PrepareChanges( TOperationId operationId, const TPath& path, - TPersQueueGroupInfo::TPtr pqGroup, + TTopicInfo::TPtr pqGroup, ui64 shardsToCreate, const TChannelsBindings& rbChannelsBinding, const TChannelsBindings& pqChannelsBinding, @@ -204,7 +204,7 @@ public: for (auto& shard : pqGroup->Shards) { auto shardIdx = shard.first; - for (const auto& pqInfo : shard.second->PQInfos) { + for (const auto& pqInfo : shard.second->Partitions) { context.SS->PersistPersQueue(db, item->PathId, shardIdx, pqInfo); } } @@ -262,7 +262,7 @@ public: bool ApplySharding( TTxId txId, const TPathId& pathId, - TPersQueueGroupInfo::TPtr pqGroup, + TTopicInfo::TPtr pqGroup, TTxState& txState, const TChannelsBindings& rbBindedChannels, const TChannelsBindings& pqBindedChannels, @@ -318,7 +318,7 @@ public: txState.Shards.emplace_back(idx, ETabletType::PersQueue, TTxState::CreateParts); context.SS->RegisterShardInfo(idx, defaultShardInfo); - pqGroup->Shards[idx] = new TPQShardInfo(); + pqGroup->Shards[idx] = new TTopicTabletInfo(); } if (!hasBalancer) { @@ -344,7 +344,7 @@ public: return shardsToCreate > 0; } - void ReassignIds(TPersQueueGroupInfo::TPtr pqGroup) { + void ReassignIds(TTopicInfo::TPtr pqGroup) { Y_VERIFY(pqGroup->TotalPartitionCount >= pqGroup->TotalGroupCount); ui32 numOld = pqGroup->TotalPartitionCount; ui32 numNew = pqGroup->AlterData->PartitionsToAdd.size() + numOld; @@ -357,15 +357,15 @@ public: auto it = pqGroup->Shards.begin(); for (const auto& p : pqGroup->AlterData->PartitionsToAdd) { - TPQShardInfo::TPersQueueInfo pqInfo; + TTopicTabletInfo::TTopicPartitionInfo pqInfo; pqInfo.PqId = p.PartitionId; pqInfo.GroupId = p.GroupId; pqInfo.KeyRange = p.KeyRange; pqInfo.AlterVersion = alterVersion; - while (it->second->PQInfos.size() >= average) { + while (it->second->Partitions.size() >= average) { ++it; } - it->second->PQInfos.push_back(pqInfo); + it->second->Partitions.push_back(pqInfo); } } @@ -421,7 +421,7 @@ public: } } - TPersQueueGroupInfo::TPtr pqGroup = context.SS->PersQueueGroups.at(path.Base()->PathId); + TTopicInfo::TPtr pqGroup = context.SS->Topics.at(path.Base()->PathId); Y_VERIFY(pqGroup); if (pqGroup->AlterVersion == 0) { @@ -440,7 +440,7 @@ public: } newTabletConfig = tabletConfig; - TPersQueueGroupInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr); + TTopicInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr); if (!alterData) { result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index 9ea35987a4a..3eb7ed8a4c7 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -634,7 +634,7 @@ public: "topicName is empty" <<", pathId: " << txState->TargetPathId); - TPersQueueGroupInfo::TPtr pqGroup = context.SS->PersQueueGroups[txState->TargetPathId]; + TTopicInfo::TPtr pqGroup = context.SS->Topics[txState->TargetPathId]; Y_VERIFY_S(pqGroup, "pqGroup is null" << ", pathId " << txState->TargetPathId); @@ -666,14 +666,14 @@ public: TTabletId tabletId = context.SS->ShardInfos.at(idx).TabletID; if (shard.TabletType == ETabletType::PersQueue) { - TPQShardInfo::TPtr pqShard = pqGroup->Shards.at(idx); + TTopicTabletInfo::TPtr pqShard = pqGroup->Shards.at(idx); Y_VERIFY_S(pqShard, "pqShard is null, idx is " << idx << " has was "<< THash<TShardIdx>()(idx)); LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Propose configure PersQueue" << ", opId: " << OperationId << ", tabletId: " << tabletId - << ", PQInfos size: " << pqShard->PQInfos.size() + << ", Partitions size: " << pqShard->Partitions.size() << ", at schemeshard: " << ssId); TAutoPtr<TEvPersQueue::TEvUpdateConfig> event(new TEvPersQueue::TEvUpdateConfig()); @@ -693,7 +693,7 @@ public: event->Record.MutableTabletConfig()->SetVersion(pqGroup->AlterData->AlterVersion); - for (const auto& pq : pqShard->PQInfos) { + for (const auto& pq : pqShard->Partitions) { event->Record.MutableTabletConfig()->AddPartitionIds(pq.PqId); auto& partition = *event->Record.MutableTabletConfig()->AddPartitions(); @@ -763,7 +763,7 @@ public: tablet->SetTabletId(ui64(tabletId)); tablet->SetOwner(context.SS->TabletID()); tablet->SetIdx(ui64(p.first.GetLocalId())); - for (const auto& pq : pqShard->PQInfos) { + for (const auto& pq : pqShard->Partitions) { auto info = event->Record.AddPartitions(); info->SetPartition(pq.PqId); info->SetTabletId(ui64(tabletId)); @@ -841,7 +841,7 @@ public: context.SS->ClearDescribePathCaches(path); context.OnComplete.PublishToSchemeBoard(OperationId, pathId); - TPersQueueGroupInfo::TPtr pqGroup = context.SS->PersQueueGroups[pathId]; + TTopicInfo::TPtr pqGroup = context.SS->Topics[pathId]; pqGroup->FinishAlter(); context.SS->PersistPersQueueGroup(db, pathId, pqGroup); context.SS->PersistRemovePersQueueGroupAlter(db, pathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp index 7edabf405fb..907516c4006 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp @@ -12,11 +12,11 @@ namespace { using namespace NKikimr; using namespace NSchemeShard; -TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context, +TTopicInfo::TPtr CreatePersQueueGroup(TOperationContext& context, const NKikimrSchemeOp::TPersQueueGroupDescription& op, TEvSchemeShard::EStatus& status, TString& errStr) { - TPersQueueGroupInfo::TPtr pqGroupInfo = new TPersQueueGroupInfo; + TTopicInfo::TPtr pqGroupInfo = new TTopicInfo; ui32 partitionCount = 0; if (op.HasTotalGroupCount()) { @@ -93,7 +93,7 @@ TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context, TString prevBound; for (ui32 i = 0; i < partitionCount; ++i) { - using TKeyRange = TPQShardInfo::TKeyRange; + using TKeyRange = TTopicTabletInfo::TKeyRange; TMaybe<TKeyRange> keyRange; if (op.PartitionBoundariesSize()) { @@ -176,7 +176,7 @@ TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context, void ApplySharding(TTxId txId, TPathId pathId, - TPersQueueGroupInfo::TPtr pqGroup, + TTopicInfo::TPtr pqGroup, TTxState& txState, const TChannelsBindings& rbBindedChannels, const TChannelsBindings& pqBindedChannels, @@ -193,8 +193,8 @@ void ApplySharding(TTxId txId, ss->RegisterShardInfo(idx, shardInfo); txState.Shards.emplace_back(idx, ETabletType::PersQueue, TTxState::CreateParts); - TPQShardInfo::TPtr pqShard = new TPQShardInfo(); - pqShard->PQInfos.reserve(pqGroup->MaxPartsPerTablet); + TTopicTabletInfo::TPtr pqShard = new TTopicTabletInfo(); + pqShard->Partitions.reserve(pqGroup->MaxPartsPerTablet); pqGroup->Shards[idx] = pqShard; } @@ -208,14 +208,14 @@ void ApplySharding(TTxId txId, auto it = pqGroup->PartitionsToAdd.begin(); for (ui32 pqId = 0; pqId < pqGroup->TotalGroupCount; ++pqId, ++it) { auto idx = ss->NextShardIdx(startShardIdx, pqId / pqGroup->MaxPartsPerTablet); - TPQShardInfo::TPtr pqShard = pqGroup->Shards[idx]; + TTopicTabletInfo::TPtr pqShard = pqGroup->Shards[idx]; - TPQShardInfo::TPersQueueInfo pqInfo; + TTopicTabletInfo::TTopicPartitionInfo pqInfo; pqInfo.PqId = it->PartitionId; pqInfo.GroupId = it->GroupId; pqInfo.KeyRange = it->KeyRange; pqInfo.AlterVersion = 1; - pqShard->PQInfos.push_back(pqInfo); + pqShard->Partitions.push_back(pqInfo); } } @@ -347,7 +347,7 @@ public: return result; } - TPersQueueGroupInfo::TPtr pqGroup = CreatePersQueueGroup( + TTopicInfo::TPtr pqGroup = CreatePersQueueGroup( context, createDEscription, status, errStr); if (!pqGroup.Get()) { @@ -447,16 +447,16 @@ public: for (auto& shard : pqGroup->Shards) { auto shardIdx = shard.first; - for (const auto& pqInfo : shard.second->PQInfos) { + for (const auto& pqInfo : shard.second->Partitions) { context.SS->PersistPersQueue(db, pathId, shardIdx, pqInfo); } } - TPersQueueGroupInfo::TPtr emptyGroup = new TPersQueueGroupInfo; + TTopicInfo::TPtr emptyGroup = new TTopicInfo; emptyGroup->Shards.swap(pqGroup->Shards); - context.SS->PersQueueGroups[pathId] = emptyGroup; - context.SS->PersQueueGroups[pathId]->AlterData = pqGroup; + context.SS->Topics[pathId] = emptyGroup; + context.SS->Topics[pathId]->AlterData = pqGroup; context.SS->IncrementPathDbRefCount(pathId); context.SS->PersistPersQueueGroup(db, pathId, emptyGroup); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp index 324cf07429d..df47fff4cf2 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp @@ -88,7 +88,7 @@ public: } auto pathId = path.Base()->PathId; - TPersQueueGroupInfo::TPtr pqGroup = context.SS->PersQueueGroups.at(pathId); + TTopicInfo::TPtr pqGroup = context.SS->Topics.at(pathId); Y_VERIFY(pqGroup); if (pqGroup->AlterData) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp index 766fac5b249..d3c8b633bfe 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp @@ -85,7 +85,7 @@ public: TString topicName = context.SS->PathsById.at(txState->TargetPathId)->Name; Y_VERIFY_S(topicName.size(), "topicName is empty. PathId: " << txState->TargetPathId); - TPersQueueGroupInfo::TPtr pqGroup = context.SS->PersQueueGroups.at(txState->TargetPathId); + TTopicInfo::TPtr pqGroup = context.SS->Topics.at(txState->TargetPathId); Y_VERIFY_S(pqGroup, "pqGroup is null. PathId: " << txState->TargetPathId); bool haveWork = false; @@ -167,7 +167,7 @@ public: Y_VERIFY(!path->Dropped()); path->SetDropped(step, OperationId.GetTxId()); context.SS->PersistDropStep(db, pathId, step, OperationId); - TPersQueueGroupInfo::TPtr pqGroup = context.SS->PersQueueGroups.at(pathId); + TTopicInfo::TPtr pqGroup = context.SS->Topics.at(pathId); Y_VERIFY(pqGroup); // KIKIMR-13173 @@ -274,7 +274,7 @@ class TDropPQ: public TSubOperation { public: using TSubOperation::TSubOperation; - void SetPQBalancer(TPersQueueGroupInfo::TPtr pqGroup, TTxState& txState, TOperationContext& context) { + void SetPQBalancer(TTopicInfo::TPtr pqGroup, TTxState& txState, TOperationContext& context) { auto shardId = pqGroup->BalancerShardIdx; auto tabletId = pqGroup->BalancerTabletID; @@ -290,11 +290,11 @@ public: } } - void SetPQShards(TPersQueueGroupInfo::TPtr pqGroup, TTxState& txState, TOperationContext& context) { + void SetPQShards(TTopicInfo::TPtr pqGroup, TTxState& txState, TOperationContext& context) { ui32 drops = 0; for (auto shard : pqGroup->Shards) { auto shardIdx = shard.first; - TPQShardInfo::TPtr info = shard.second; + TTopicTabletInfo::TPtr info = shard.second; auto tabletId = context.SS->ShardInfos[shardIdx].TabletID; @@ -399,7 +399,7 @@ public: return result; } - TPersQueueGroupInfo::TPtr pqGroup = context.SS->PersQueueGroups.at(path.Base()->PathId); + TTopicInfo::TPtr pqGroup = context.SS->Topics.at(path.Base()->PathId); Y_VERIFY(pqGroup); if (pqGroup->AlterData) { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index aebd5be3062..7d02c3cdd4f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -339,7 +339,7 @@ void TSchemeShard::Clear() { ShardsWithBorrowed.clear(); ShardsWithLoaned.clear(); - PersQueueGroups.clear(); + Topics.clear(); RtmrVolumes.clear(); SubDomains.clear(); BlockStoreVolumes.clear(); @@ -505,8 +505,8 @@ void TSchemeShard::ClearDescribePathCaches(const TPathElement::TPtr node, bool f node->PreSerializedChildrenListing.clear(); if (node->PathType == NKikimrSchemeOp::EPathType::EPathTypePersQueueGroup) { - Y_VERIFY(PersQueueGroups.contains(node->PathId)); - TPersQueueGroupInfo::TPtr pqGroup = PersQueueGroups.at(node->PathId); + Y_VERIFY(Topics.contains(node->PathId)); + TTopicInfo::TPtr pqGroup = Topics.at(node->PathId); pqGroup->PreSerializedPathDescription.clear(); pqGroup->PreSerializedPartitionsDescription.clear(); } else if (node->PathType == NKikimrSchemeOp::EPathType::EPathTypeTable) { @@ -2466,7 +2466,7 @@ void TSchemeShard::PersistAddAlterTable(NIceDb::TNiceDb& db, TPathId pathId, con } } -void TSchemeShard::PersistPersQueueGroup(NIceDb::TNiceDb& db, TPathId pathId, const TPersQueueGroupInfo::TPtr pqGroup) { +void TSchemeShard::PersistPersQueueGroup(NIceDb::TNiceDb& db, TPathId pathId, const TTopicInfo::TPtr pqGroup) { Y_VERIFY(IsLocalId(pathId)); db.Table<Schema::PersQueueGroups>().Key(pathId.LocalPathId).Update( @@ -2480,28 +2480,28 @@ void TSchemeShard::PersistPersQueueGroup(NIceDb::TNiceDb& db, TPathId pathId, co void TSchemeShard::PersistRemovePersQueueGroup(NIceDb::TNiceDb& db, TPathId pathId) { Y_VERIFY(IsLocalId(pathId)); - auto it = PersQueueGroups.find(pathId); - if (it != PersQueueGroups.end()) { - TPersQueueGroupInfo::TPtr pqGroup = it->second; + auto it = Topics.find(pathId); + if (it != Topics.end()) { + TTopicInfo::TPtr pqGroup = it->second; if (pqGroup->AlterData) { PersistRemovePersQueueGroupAlter(db, pathId); } for (const auto& shard : pqGroup->Shards) { - for (const auto& pqInfo : shard.second->PQInfos) { + for (const auto& pqInfo : shard.second->Partitions) { PersistRemovePersQueue(db, pathId, pqInfo.PqId); } } - PersQueueGroups.erase(it); + Topics.erase(it); DecrementPathDbRefCount(pathId); } db.Table<Schema::PersQueueGroups>().Key(pathId.LocalPathId).Delete(); } -void TSchemeShard::PersistAddPersQueueGroupAlter(NIceDb::TNiceDb& db, TPathId pathId, const TPersQueueGroupInfo::TPtr alterData) { +void TSchemeShard::PersistAddPersQueueGroupAlter(NIceDb::TNiceDb& db, TPathId pathId, const TTopicInfo::TPtr alterData) { Y_VERIFY(IsLocalId(pathId)); db.Table<Schema::PersQueueGroupAlters>().Key(pathId.LocalPathId).Update( @@ -2519,7 +2519,7 @@ void TSchemeShard::PersistRemovePersQueueGroupAlter(NIceDb::TNiceDb& db, TPathId db.Table<Schema::PersQueueGroupAlters>().Key(pathId.LocalPathId).Delete(); } -void TSchemeShard::PersistPersQueue(NIceDb::TNiceDb &db, TPathId pathId, TShardIdx shardIdx, const TPQShardInfo::TPersQueueInfo& pqInfo) { +void TSchemeShard::PersistPersQueue(NIceDb::TNiceDb &db, TPathId pathId, TShardIdx shardIdx, const TTopicTabletInfo::TTopicPartitionInfo& pqInfo) { Y_VERIFY(IsLocalId(pathId)); db.Table<Schema::PersQueues>().Key(pathId.LocalPathId, pqInfo.PqId).Update( @@ -3744,8 +3744,8 @@ NKikimrSchemeOp::TPathVersion TSchemeShard::GetPathVersion(const TPath& path) co generalVersion += result.GetTablePartitionVersion(); break; case NKikimrSchemeOp::EPathType::EPathTypePersQueueGroup: - Y_VERIFY(PersQueueGroups.contains(pathId)); - result.SetPQVersion(PersQueueGroups.at(pathId)->AlterVersion); + Y_VERIFY(Topics.contains(pathId)); + result.SetPQVersion(Topics.at(pathId)->AlterVersion); generalVersion += result.GetPQVersion(); break; case NKikimrSchemeOp::EPathType::EPathTypeBlockStoreVolume: diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 0d46ef9a7aa..4b98be5fdef 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -201,7 +201,7 @@ public: THashMap<TPathId, TTxId> LockedPaths; - THashMap<TPathId, TPersQueueGroupInfo::TPtr> PersQueueGroups; + THashMap<TPathId, TTopicInfo::TPtr> Topics; THashMap<TPathId, TRtmrVolumeInfo::TPtr> RtmrVolumes; THashMap<TPathId, TSolomonVolumeInfo::TPtr> SolomonVolumes; THashMap<TPathId, TSubDomainInfo::TPtr> SubDomains; @@ -621,11 +621,11 @@ public: void PersistTableAlterVersion(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo); void PersistTableAltered(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo); void PersistAddAlterTable(NIceDb::TNiceDb& db, TPathId pathId, const TTableInfo::TAlterDataPtr alter); - void PersistPersQueueGroup(NIceDb::TNiceDb &db, TPathId pathId, const TPersQueueGroupInfo::TPtr); + void PersistPersQueueGroup(NIceDb::TNiceDb &db, TPathId pathId, const TTopicInfo::TPtr); void PersistRemovePersQueueGroup(NIceDb::TNiceDb &db, TPathId pathId); - void PersistAddPersQueueGroupAlter(NIceDb::TNiceDb &db, TPathId pathId, const TPersQueueGroupInfo::TPtr); + void PersistAddPersQueueGroupAlter(NIceDb::TNiceDb &db, TPathId pathId, const TTopicInfo::TPtr); void PersistRemovePersQueueGroupAlter(NIceDb::TNiceDb &db, TPathId pathId); - void PersistPersQueue(NIceDb::TNiceDb &db, TPathId pathId, TShardIdx shardIdx, const TPQShardInfo::TPersQueueInfo& pqInfo); + void PersistPersQueue(NIceDb::TNiceDb &db, TPathId pathId, TShardIdx shardIdx, const TTopicTabletInfo::TTopicPartitionInfo& pqInfo); void PersistRemovePersQueue(NIceDb::TNiceDb &db, TPathId pathId, ui32 pqId); void PersistRtmrVolume(NIceDb::TNiceDb &db, TPathId pathId, const TRtmrVolumeInfo::TPtr rtmrVol); void PersistRemoveRtmrVolume(NIceDb::TNiceDb &db, TPathId pathId); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index e5c75e9e907..578110f292b 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -1862,7 +1862,7 @@ const TString &TColumnFamiliesMerger::CanonizeName(const TString &familyName) { return familyName; } -void TPQShardInfo::TKeyRange::SerializeToProto(NKikimrPQ::TPartitionKeyRange& proto) const { +void TTopicTabletInfo::TKeyRange::SerializeToProto(NKikimrPQ::TPartitionKeyRange& proto) const { if (FromBound) { proto.SetFromBound(*FromBound); } @@ -1872,7 +1872,7 @@ void TPQShardInfo::TKeyRange::SerializeToProto(NKikimrPQ::TPartitionKeyRange& pr } } -bool TPersQueueGroupInfo::FillKeySchema(const NKikimrPQ::TPQTabletConfig& tabletConfig, TString& error) { +bool TTopicInfo::FillKeySchema(const NKikimrPQ::TPQTabletConfig& tabletConfig, TString& error) { KeySchema.clear(); KeySchema.reserve(tabletConfig.PartitionKeySchemaSize()); @@ -1891,7 +1891,7 @@ bool TPersQueueGroupInfo::FillKeySchema(const NKikimrPQ::TPQTabletConfig& tablet return true; } -bool TPersQueueGroupInfo::FillKeySchema(const TString& tabletConfig) { +bool TTopicInfo::FillKeySchema(const TString& tabletConfig) { NKikimrPQ::TPQTabletConfig proto; if (!proto.ParseFromString(tabletConfig)) { return false; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 9dbf42fa513..30daf3e98f6 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -985,8 +985,8 @@ struct TColumnTableInfo : TSimpleRefCount<TColumnTableInfo> { } }; -struct TPQShardInfo : TSimpleRefCount<TPQShardInfo> { - using TPtr = TIntrusivePtr<TPQShardInfo>; +struct TTopicTabletInfo : TSimpleRefCount<TTopicTabletInfo> { + using TPtr = TIntrusivePtr<TTopicTabletInfo>; using TKeySchema = TVector<NScheme::TTypeInfo>; struct TKeyRange { @@ -996,17 +996,17 @@ struct TPQShardInfo : TSimpleRefCount<TPQShardInfo> { void SerializeToProto(NKikimrPQ::TPartitionKeyRange& proto) const; }; - struct TPersQueueInfo { + struct TTopicPartitionInfo { ui32 PqId = 0; ui32 GroupId = 0; ui64 AlterVersion = 0; TMaybe<TKeyRange> KeyRange; }; - TVector<TPersQueueInfo> PQInfos; + TVector<TTopicPartitionInfo> Partitions; size_t PartsCount() const { - return PQInfos.size(); + return Partitions.size(); } }; @@ -1122,12 +1122,18 @@ struct TShardInfo { } }; -struct TPersQueueGroupInfo : TSimpleRefCount<TPersQueueGroupInfo> { - using TPtr = TIntrusivePtr<TPersQueueGroupInfo>; - using TKeySchema = TPQShardInfo::TKeySchema; +/** + * TTopicInfo -> TTopicTabletInfo -> TTopicPartitionInfo + * + * Each topic may contains many tablets. + * Each tablet may serve many partitions. + */ +struct TTopicInfo : TSimpleRefCount<TTopicInfo> { + using TPtr = TIntrusivePtr<TTopicInfo>; + using TKeySchema = TTopicTabletInfo::TKeySchema; struct TPartitionToAdd { - using TKeyRange = TPQShardInfo::TKeyRange; + using TKeyRange = TTopicTabletInfo::TKeyRange; ui32 PartitionId; ui32 GroupId; @@ -1162,9 +1168,9 @@ struct TPersQueueGroupInfo : TSimpleRefCount<TPersQueueGroupInfo> { ui64 AlterVersion = 0; TString TabletConfig; TString BootstrapConfig; - THashMap<TShardIdx, TPQShardInfo::TPtr> Shards; // key - shardIdx + THashMap<TShardIdx, TTopicTabletInfo::TPtr> Shards; // key - shardIdx TKeySchema KeySchema; - TPersQueueGroupInfo::TPtr AlterData; // changes to be applied + TTopicInfo::TPtr AlterData; // changes to be applied TTabletId BalancerTabletID = InvalidTabletId; TShardIdx BalancerShardIdx = InvalidShardIdx; @@ -1201,7 +1207,7 @@ struct TPersQueueGroupInfo : TSimpleRefCount<TPersQueueGroupInfo> { return Shards.size(); } - void PrepareAlter(TPersQueueGroupInfo::TPtr alterData) { + void PrepareAlter(TTopicInfo::TPtr alterData) { Y_VERIFY(alterData, "No alter data at Alter prepare"); alterData->AlterVersion = AlterVersion + 1; Y_VERIFY(alterData->TotalGroupCount); diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index f98fab70c6b..1c96351be54 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -77,10 +77,10 @@ void TPathDescriber::FillChildDescr(NKikimrSchemeOp::TDirEntry* descr, TPathElem } if (pathEl->PathType == NKikimrSchemeOp::EPathTypePersQueueGroup) { - auto it = Self->PersQueueGroups.FindPtr(pathEl->PathId); + auto it = Self->Topics.FindPtr(pathEl->PathId); Y_VERIFY(it, "PersQueueGroup is not found"); - TPersQueueGroupInfo::TPtr pqGroupInfo = *it; + TTopicInfo::TPtr pqGroupInfo = *it; if (pqGroupInfo->HasBalancer()) { descr->SetBalancerTabletID(ui64(pqGroupInfo->BalancerTabletID)); } @@ -411,9 +411,9 @@ void TPathDescriber::DescribeColumnTable(TPathId pathId, TPathElement::TPtr path } void TPathDescriber::DescribePersQueueGroup(TPathId pathId, TPathElement::TPtr pathEl) { - auto it = Self->PersQueueGroups.FindPtr(pathId); + auto it = Self->Topics.FindPtr(pathId); Y_VERIFY(it, "PersQueueGroup is not found"); - TPersQueueGroupInfo::TPtr pqGroupInfo = *it; + TTopicInfo::TPtr pqGroupInfo = *it; if (pqGroupInfo->PreSerializedPathDescription.empty()) { NKikimrScheme::TEvDescribeSchemeResult preSerializedResult; @@ -444,7 +444,7 @@ void TPathDescriber::DescribePersQueueGroup(TPathId pathId, TPathElement::TPtr p struct TPartitionDesc { TTabletId TabletId = InvalidTabletId; - const TPQShardInfo::TPersQueueInfo* Info = nullptr; + const TTopicTabletInfo::TTopicPartitionInfo* Info = nullptr; }; TVector<TPartitionDesc> descriptions; // index is pqId @@ -454,7 +454,7 @@ void TPathDescriber::DescribePersQueueGroup(TPathId pathId, TPathElement::TPtr p auto it = Self->ShardInfos.find(shardIdx); Y_VERIFY_S(it != Self->ShardInfos.end(), "No shard with shardIdx: " << shardIdx); - for (const auto& pq : pqShard->PQInfos) { + for (const auto& pq : pqShard->Partitions) { if (pq.AlterVersion <= pqGroupInfo->AlterVersion) { Y_VERIFY_S(pq.PqId < pqGroupInfo->NextPartitionId, "Wrong pqId: " << pq.PqId << ", nextPqId: " << pqGroupInfo->NextPartitionId); @@ -498,7 +498,7 @@ void TPathDescriber::DescribePersQueueGroup(TPathId pathId, TPathElement::TPtr p for (const auto& [shardIdx, pqShard] : pqGroupInfo->Shards) { const auto& shardInfo = Self->ShardInfos.at(shardIdx); - for (const auto& pq : pqShard->PQInfos) { + for (const auto& pq : pqShard->Partitions) { if (pq.AlterVersion <= pqGroupInfo->AlterVersion) { auto partition = allocate->MutablePartitions()->Add(); partition->SetPartitionId(pq.PqId); |