aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-02-13 13:44:29 +0300
committertesseract <tesseract@yandex-team.com>2023-02-13 13:44:29 +0300
commit98391f41335ba5fea7d080215bd3f0c6905248b4 (patch)
treed2185deb505c5e6d06ef7432b80a58593ff7fcd2
parentaf02c23f8bb7a05fbf49235d327d8aa958507448 (diff)
downloadydb-98391f41335ba5fea7d080215bd3f0c6905248b4.tar.gz
Переименовать TPersQueueGroupInfo to TTopicInfo, TPQShardInfo to TTopicTabletInfo и TPersQueueInfo to TTopicPartitionInfo
rename PQInfos to Partitions rename PersQueueGroups to Topics rename TPersQueueGroupInfo to TTopicInfo rename TPQShardInfo to TTopicTabletInfo rename TPersQueueInfo to TTopicPartitionInfo
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp26
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init_root.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_allocate_pq.cpp22
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp24
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.h12
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp28
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp12
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp26
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.cpp6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h30
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.cpp14
13 files changed, 109 insertions, 103 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index d2972a87cbd..4554fc8ec78 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -2346,7 +2346,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
TLocalPathId localPathId = rowset.GetValue<Schema::PersQueueGroups::PathId>();
TPathId pathId(selfId, localPathId);
- TPersQueueGroupInfo::TPtr pqGroup = new TPersQueueGroupInfo();
+ TTopicInfo::TPtr pqGroup = new TTopicInfo();
pqGroup->TabletConfig = rowset.GetValue<Schema::PersQueueGroups::TabletConfig>();
pqGroup->MaxPartsPerTablet = rowset.GetValue<Schema::PersQueueGroups::MaxPQPerShard>();
pqGroup->AlterVersion = rowset.GetValue<Schema::PersQueueGroups::AlterVersion>();
@@ -2356,7 +2356,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
const bool ok = pqGroup->FillKeySchema(pqGroup->TabletConfig);
Y_VERIFY(ok);
- Self->PersQueueGroups[pathId] = pqGroup;
+ Self->Topics[pathId] = pqGroup;
Self->IncrementPathDbRefCount(pathId);
auto it = pqBalancers.find(pathId);
@@ -2379,7 +2379,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
if (!rowset.IsReady())
return false;
while (!rowset.EndOfSet()) {
- TPQShardInfo::TPersQueueInfo pqInfo;
+ TTopicTabletInfo::TTopicPartitionInfo pqInfo;
TLocalPathId localPathId = rowset.GetValue<Schema::PersQueues::PathId>();
TPathId pathId(selfId, localPathId);
pqInfo.PqId = rowset.GetValue<Schema::PersQueues::PqId>();
@@ -2404,10 +2404,10 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
pqInfo.KeyRange->ToBound = rowset.GetValue<Schema::PersQueues::RangeEnd>();
}
- auto it = Self->PersQueueGroups.find(pathId);
- Y_VERIFY(it != Self->PersQueueGroups.end());
+ auto it = Self->Topics.find(pathId);
+ Y_VERIFY(it != Self->Topics.end());
Y_VERIFY(it->second);
- TPersQueueGroupInfo::TPtr pqGroup = it->second;
+ TTopicInfo::TPtr pqGroup = it->second;
if (pqInfo.AlterVersion <= pqGroup->AlterVersion)
++pqGroup->TotalPartitionCount;
if (pqInfo.PqId >= pqGroup->NextPartitionId) {
@@ -2415,11 +2415,11 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
pqGroup->TotalGroupCount = pqInfo.PqId + 1;
}
- TPQShardInfo::TPtr& pqShard = pqGroup->Shards[shardIdx];
+ TTopicTabletInfo::TPtr& pqShard = pqGroup->Shards[shardIdx];
if (!pqShard) {
- pqShard.Reset(new TPQShardInfo());
+ pqShard.Reset(new TTopicTabletInfo());
}
- pqShard->PQInfos.push_back(pqInfo);
+ pqShard->Partitions.push_back(pqInfo);
if (!rowset.Next())
return false;
@@ -2435,7 +2435,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
TLocalPathId localPathId = rowset.GetValue<Schema::PersQueueGroupAlters::PathId>();
TPathId pathId(selfId, localPathId);
- TPersQueueGroupInfo::TPtr alterData = new TPersQueueGroupInfo();
+ TTopicInfo::TPtr alterData = new TTopicInfo();
alterData->TabletConfig = rowset.GetValue<Schema::PersQueueGroupAlters::TabletConfig>();
alterData->MaxPartsPerTablet = rowset.GetValue<Schema::PersQueueGroupAlters::MaxPQPerShard>();
alterData->AlterVersion = rowset.GetValue<Schema::PersQueueGroupAlters::AlterVersion>();
@@ -2446,8 +2446,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
const bool ok = alterData->FillKeySchema(alterData->TabletConfig);
Y_VERIFY(ok);
- auto it = Self->PersQueueGroups.find(pathId);
- Y_VERIFY(it != Self->PersQueueGroups.end());
+ auto it = Self->Topics.find(pathId);
+ Y_VERIFY(it != Self->Topics.end());
alterData->TotalPartitionCount = it->second->GetTotalPartitionCountWithAlter();
alterData->BalancerTabletID = it->second->BalancerTabletID;
@@ -3936,7 +3936,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
Self->TabletCounters->Simple()[COUNTER_USER_ATTRIBUTES_COUNT].Add(path->UserAttrs->Size());
if (path->IsPQGroup()) {
- auto pqGroup = Self->PersQueueGroups.at(path->PathId);
+ auto pqGroup = Self->Topics.at(path->PathId);
auto delta = pqGroup->AlterData ? pqGroup->AlterData->TotalPartitionCount : pqGroup->TotalPartitionCount;
auto tabletConfig = pqGroup->AlterData ? (pqGroup->AlterData->TabletConfig.empty() ? pqGroup->TabletConfig : pqGroup->AlterData->TabletConfig)
: pqGroup->TabletConfig;
diff --git a/ydb/core/tx/schemeshard/schemeshard__init_root.cpp b/ydb/core/tx/schemeshard/schemeshard__init_root.cpp
index 117c6bd91f9..47247d1f955 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init_root.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init_root.cpp
@@ -734,7 +734,7 @@ struct TSchemeShard::TTxMigrate : public TSchemeShard::TRwTxBase {
NIceDb::TUpdate<Schema::MigratedKesusInfos::Version>(kesusDescr.GetVersion()));
}
-// PersQueueGroups,
+// Topics,
// PersQueues,
// RtmrVolumes,
// RTMRPartitions,
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_allocate_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_allocate_pq.cpp
index 26636e2aeb6..c898dd3c8e9 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_allocate_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_allocate_pq.cpp
@@ -135,7 +135,7 @@ public:
return result;
}
- TPersQueueGroupInfo::TPtr pqGroupInfo = new TPersQueueGroupInfo;
+ TTopicInfo::TPtr pqGroupInfo = new TTopicInfo;
pqGroupInfo->TotalGroupCount = allocateDesc.GetTotalGroupCount();
if (pqGroupInfo->TotalGroupCount == 0 || pqGroupInfo->TotalGroupCount > TSchemeShard::MaxPQGroupPartitionsCount) {
@@ -355,8 +355,8 @@ public:
context.SS->TabletIdToShardIdx[tabletId] = idx;
- TPQShardInfo::TPtr pqShard = new TPQShardInfo();
- pqShard->PQInfos.reserve(pqGroupInfo->MaxPartsPerTablet);
+ TTopicTabletInfo::TPtr pqShard = new TTopicTabletInfo();
+ pqShard->Partitions.reserve(pqGroupInfo->MaxPartsPerTablet);
pqGroupInfo->Shards[idx] = pqShard;
}
@@ -366,13 +366,13 @@ public:
auto tabletId = item.second.second;
auto idx = context.SS->TabletIdToShardIdx.at(tabletId);
- TPQShardInfo::TPtr pqShard = pqGroupInfo->Shards.at(idx);
+ TTopicTabletInfo::TPtr pqShard = pqGroupInfo->Shards.at(idx);
- TPQShardInfo::TPersQueueInfo pqInfo;
+ TTopicTabletInfo::TTopicPartitionInfo pqInfo;
pqInfo.PqId = partId;
pqInfo.GroupId = groupId;
pqInfo.AlterVersion = pqGroupInfo->AlterVersion;
- pqShard->PQInfos.push_back(pqInfo);
+ pqShard->Partitions.push_back(pqInfo);
}
{
@@ -393,17 +393,17 @@ public:
for (auto& shard : pqGroupInfo->Shards) {
auto shardIdx = shard.first;
- for (const auto& pqInfo : shard.second->PQInfos) {
+ for (const auto& pqInfo : shard.second->Partitions) {
context.SS->PersistPersQueue(db, pathId, shardIdx, pqInfo);
}
}
- TPersQueueGroupInfo::TPtr emptyGroup = new TPersQueueGroupInfo;
+ TTopicInfo::TPtr emptyGroup = new TTopicInfo;
emptyGroup->Shards.swap(pqGroupInfo->Shards);
- context.SS->PersQueueGroups[pathId] = emptyGroup;
- context.SS->PersQueueGroups[pathId]->AlterData = pqGroupInfo;
- context.SS->PersQueueGroups[pathId]->AlterVersion = pqGroupInfo->AlterVersion;
+ context.SS->Topics[pathId] = emptyGroup;
+ context.SS->Topics[pathId]->AlterData = pqGroupInfo;
+ context.SS->Topics[pathId]->AlterVersion = pqGroupInfo->AlterVersion;
context.SS->IncrementPathDbRefCount(pathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
index d5d272e2e7d..729e50e4dc0 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
@@ -49,13 +49,13 @@ class TAlterPQ: public TSubOperation {
public:
using TSubOperation::TSubOperation;
- TPersQueueGroupInfo::TPtr ParseParams(
+ TTopicInfo::TPtr ParseParams(
TOperationContext& context,
NKikimrPQ::TPQTabletConfig* tabletConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& alter,
TString& errStr)
{
- TPersQueueGroupInfo::TPtr params = new TPersQueueGroupInfo();
+ TTopicInfo::TPtr params = new TTopicInfo();
const bool hasKeySchema = tabletConfig->PartitionKeySchemaSize();
if (alter.HasTotalGroupCount()) {
@@ -182,7 +182,7 @@ public:
TTxState& PrepareChanges(
TOperationId operationId,
const TPath& path,
- TPersQueueGroupInfo::TPtr pqGroup,
+ TTopicInfo::TPtr pqGroup,
ui64 shardsToCreate,
const TChannelsBindings& rbChannelsBinding,
const TChannelsBindings& pqChannelsBinding,
@@ -204,7 +204,7 @@ public:
for (auto& shard : pqGroup->Shards) {
auto shardIdx = shard.first;
- for (const auto& pqInfo : shard.second->PQInfos) {
+ for (const auto& pqInfo : shard.second->Partitions) {
context.SS->PersistPersQueue(db, item->PathId, shardIdx, pqInfo);
}
}
@@ -262,7 +262,7 @@ public:
bool ApplySharding(
TTxId txId,
const TPathId& pathId,
- TPersQueueGroupInfo::TPtr pqGroup,
+ TTopicInfo::TPtr pqGroup,
TTxState& txState,
const TChannelsBindings& rbBindedChannels,
const TChannelsBindings& pqBindedChannels,
@@ -318,7 +318,7 @@ public:
txState.Shards.emplace_back(idx, ETabletType::PersQueue, TTxState::CreateParts);
context.SS->RegisterShardInfo(idx, defaultShardInfo);
- pqGroup->Shards[idx] = new TPQShardInfo();
+ pqGroup->Shards[idx] = new TTopicTabletInfo();
}
if (!hasBalancer) {
@@ -344,7 +344,7 @@ public:
return shardsToCreate > 0;
}
- void ReassignIds(TPersQueueGroupInfo::TPtr pqGroup) {
+ void ReassignIds(TTopicInfo::TPtr pqGroup) {
Y_VERIFY(pqGroup->TotalPartitionCount >= pqGroup->TotalGroupCount);
ui32 numOld = pqGroup->TotalPartitionCount;
ui32 numNew = pqGroup->AlterData->PartitionsToAdd.size() + numOld;
@@ -357,15 +357,15 @@ public:
auto it = pqGroup->Shards.begin();
for (const auto& p : pqGroup->AlterData->PartitionsToAdd) {
- TPQShardInfo::TPersQueueInfo pqInfo;
+ TTopicTabletInfo::TTopicPartitionInfo pqInfo;
pqInfo.PqId = p.PartitionId;
pqInfo.GroupId = p.GroupId;
pqInfo.KeyRange = p.KeyRange;
pqInfo.AlterVersion = alterVersion;
- while (it->second->PQInfos.size() >= average) {
+ while (it->second->Partitions.size() >= average) {
++it;
}
- it->second->PQInfos.push_back(pqInfo);
+ it->second->Partitions.push_back(pqInfo);
}
}
@@ -421,7 +421,7 @@ public:
}
}
- TPersQueueGroupInfo::TPtr pqGroup = context.SS->PersQueueGroups.at(path.Base()->PathId);
+ TTopicInfo::TPtr pqGroup = context.SS->Topics.at(path.Base()->PathId);
Y_VERIFY(pqGroup);
if (pqGroup->AlterVersion == 0) {
@@ -440,7 +440,7 @@ public:
}
newTabletConfig = tabletConfig;
- TPersQueueGroupInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr);
+ TTopicInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr);
if (!alterData) {
result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
index 9ea35987a4a..3eb7ed8a4c7 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
@@ -634,7 +634,7 @@ public:
"topicName is empty"
<<", pathId: " << txState->TargetPathId);
- TPersQueueGroupInfo::TPtr pqGroup = context.SS->PersQueueGroups[txState->TargetPathId];
+ TTopicInfo::TPtr pqGroup = context.SS->Topics[txState->TargetPathId];
Y_VERIFY_S(pqGroup,
"pqGroup is null"
<< ", pathId " << txState->TargetPathId);
@@ -666,14 +666,14 @@ public:
TTabletId tabletId = context.SS->ShardInfos.at(idx).TabletID;
if (shard.TabletType == ETabletType::PersQueue) {
- TPQShardInfo::TPtr pqShard = pqGroup->Shards.at(idx);
+ TTopicTabletInfo::TPtr pqShard = pqGroup->Shards.at(idx);
Y_VERIFY_S(pqShard, "pqShard is null, idx is " << idx << " has was "<< THash<TShardIdx>()(idx));
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Propose configure PersQueue"
<< ", opId: " << OperationId
<< ", tabletId: " << tabletId
- << ", PQInfos size: " << pqShard->PQInfos.size()
+ << ", Partitions size: " << pqShard->Partitions.size()
<< ", at schemeshard: " << ssId);
TAutoPtr<TEvPersQueue::TEvUpdateConfig> event(new TEvPersQueue::TEvUpdateConfig());
@@ -693,7 +693,7 @@ public:
event->Record.MutableTabletConfig()->SetVersion(pqGroup->AlterData->AlterVersion);
- for (const auto& pq : pqShard->PQInfos) {
+ for (const auto& pq : pqShard->Partitions) {
event->Record.MutableTabletConfig()->AddPartitionIds(pq.PqId);
auto& partition = *event->Record.MutableTabletConfig()->AddPartitions();
@@ -763,7 +763,7 @@ public:
tablet->SetTabletId(ui64(tabletId));
tablet->SetOwner(context.SS->TabletID());
tablet->SetIdx(ui64(p.first.GetLocalId()));
- for (const auto& pq : pqShard->PQInfos) {
+ for (const auto& pq : pqShard->Partitions) {
auto info = event->Record.AddPartitions();
info->SetPartition(pq.PqId);
info->SetTabletId(ui64(tabletId));
@@ -841,7 +841,7 @@ public:
context.SS->ClearDescribePathCaches(path);
context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
- TPersQueueGroupInfo::TPtr pqGroup = context.SS->PersQueueGroups[pathId];
+ TTopicInfo::TPtr pqGroup = context.SS->Topics[pathId];
pqGroup->FinishAlter();
context.SS->PersistPersQueueGroup(db, pathId, pqGroup);
context.SS->PersistRemovePersQueueGroupAlter(db, pathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
index 7edabf405fb..907516c4006 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
@@ -12,11 +12,11 @@ namespace {
using namespace NKikimr;
using namespace NSchemeShard;
-TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context,
+TTopicInfo::TPtr CreatePersQueueGroup(TOperationContext& context,
const NKikimrSchemeOp::TPersQueueGroupDescription& op,
TEvSchemeShard::EStatus& status, TString& errStr)
{
- TPersQueueGroupInfo::TPtr pqGroupInfo = new TPersQueueGroupInfo;
+ TTopicInfo::TPtr pqGroupInfo = new TTopicInfo;
ui32 partitionCount = 0;
if (op.HasTotalGroupCount()) {
@@ -93,7 +93,7 @@ TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context,
TString prevBound;
for (ui32 i = 0; i < partitionCount; ++i) {
- using TKeyRange = TPQShardInfo::TKeyRange;
+ using TKeyRange = TTopicTabletInfo::TKeyRange;
TMaybe<TKeyRange> keyRange;
if (op.PartitionBoundariesSize()) {
@@ -176,7 +176,7 @@ TPersQueueGroupInfo::TPtr CreatePersQueueGroup(TOperationContext& context,
void ApplySharding(TTxId txId,
TPathId pathId,
- TPersQueueGroupInfo::TPtr pqGroup,
+ TTopicInfo::TPtr pqGroup,
TTxState& txState,
const TChannelsBindings& rbBindedChannels,
const TChannelsBindings& pqBindedChannels,
@@ -193,8 +193,8 @@ void ApplySharding(TTxId txId,
ss->RegisterShardInfo(idx, shardInfo);
txState.Shards.emplace_back(idx, ETabletType::PersQueue, TTxState::CreateParts);
- TPQShardInfo::TPtr pqShard = new TPQShardInfo();
- pqShard->PQInfos.reserve(pqGroup->MaxPartsPerTablet);
+ TTopicTabletInfo::TPtr pqShard = new TTopicTabletInfo();
+ pqShard->Partitions.reserve(pqGroup->MaxPartsPerTablet);
pqGroup->Shards[idx] = pqShard;
}
@@ -208,14 +208,14 @@ void ApplySharding(TTxId txId,
auto it = pqGroup->PartitionsToAdd.begin();
for (ui32 pqId = 0; pqId < pqGroup->TotalGroupCount; ++pqId, ++it) {
auto idx = ss->NextShardIdx(startShardIdx, pqId / pqGroup->MaxPartsPerTablet);
- TPQShardInfo::TPtr pqShard = pqGroup->Shards[idx];
+ TTopicTabletInfo::TPtr pqShard = pqGroup->Shards[idx];
- TPQShardInfo::TPersQueueInfo pqInfo;
+ TTopicTabletInfo::TTopicPartitionInfo pqInfo;
pqInfo.PqId = it->PartitionId;
pqInfo.GroupId = it->GroupId;
pqInfo.KeyRange = it->KeyRange;
pqInfo.AlterVersion = 1;
- pqShard->PQInfos.push_back(pqInfo);
+ pqShard->Partitions.push_back(pqInfo);
}
}
@@ -347,7 +347,7 @@ public:
return result;
}
- TPersQueueGroupInfo::TPtr pqGroup = CreatePersQueueGroup(
+ TTopicInfo::TPtr pqGroup = CreatePersQueueGroup(
context, createDEscription, status, errStr);
if (!pqGroup.Get()) {
@@ -447,16 +447,16 @@ public:
for (auto& shard : pqGroup->Shards) {
auto shardIdx = shard.first;
- for (const auto& pqInfo : shard.second->PQInfos) {
+ for (const auto& pqInfo : shard.second->Partitions) {
context.SS->PersistPersQueue(db, pathId, shardIdx, pqInfo);
}
}
- TPersQueueGroupInfo::TPtr emptyGroup = new TPersQueueGroupInfo;
+ TTopicInfo::TPtr emptyGroup = new TTopicInfo;
emptyGroup->Shards.swap(pqGroup->Shards);
- context.SS->PersQueueGroups[pathId] = emptyGroup;
- context.SS->PersQueueGroups[pathId]->AlterData = pqGroup;
+ context.SS->Topics[pathId] = emptyGroup;
+ context.SS->Topics[pathId]->AlterData = pqGroup;
context.SS->IncrementPathDbRefCount(pathId);
context.SS->PersistPersQueueGroup(db, pathId, emptyGroup);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp
index 324cf07429d..df47fff4cf2 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp
@@ -88,7 +88,7 @@ public:
}
auto pathId = path.Base()->PathId;
- TPersQueueGroupInfo::TPtr pqGroup = context.SS->PersQueueGroups.at(pathId);
+ TTopicInfo::TPtr pqGroup = context.SS->Topics.at(pathId);
Y_VERIFY(pqGroup);
if (pqGroup->AlterData) {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp
index 766fac5b249..d3c8b633bfe 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp
@@ -85,7 +85,7 @@ public:
TString topicName = context.SS->PathsById.at(txState->TargetPathId)->Name;
Y_VERIFY_S(topicName.size(), "topicName is empty. PathId: " << txState->TargetPathId);
- TPersQueueGroupInfo::TPtr pqGroup = context.SS->PersQueueGroups.at(txState->TargetPathId);
+ TTopicInfo::TPtr pqGroup = context.SS->Topics.at(txState->TargetPathId);
Y_VERIFY_S(pqGroup, "pqGroup is null. PathId: " << txState->TargetPathId);
bool haveWork = false;
@@ -167,7 +167,7 @@ public:
Y_VERIFY(!path->Dropped());
path->SetDropped(step, OperationId.GetTxId());
context.SS->PersistDropStep(db, pathId, step, OperationId);
- TPersQueueGroupInfo::TPtr pqGroup = context.SS->PersQueueGroups.at(pathId);
+ TTopicInfo::TPtr pqGroup = context.SS->Topics.at(pathId);
Y_VERIFY(pqGroup);
// KIKIMR-13173
@@ -274,7 +274,7 @@ class TDropPQ: public TSubOperation {
public:
using TSubOperation::TSubOperation;
- void SetPQBalancer(TPersQueueGroupInfo::TPtr pqGroup, TTxState& txState, TOperationContext& context) {
+ void SetPQBalancer(TTopicInfo::TPtr pqGroup, TTxState& txState, TOperationContext& context) {
auto shardId = pqGroup->BalancerShardIdx;
auto tabletId = pqGroup->BalancerTabletID;
@@ -290,11 +290,11 @@ public:
}
}
- void SetPQShards(TPersQueueGroupInfo::TPtr pqGroup, TTxState& txState, TOperationContext& context) {
+ void SetPQShards(TTopicInfo::TPtr pqGroup, TTxState& txState, TOperationContext& context) {
ui32 drops = 0;
for (auto shard : pqGroup->Shards) {
auto shardIdx = shard.first;
- TPQShardInfo::TPtr info = shard.second;
+ TTopicTabletInfo::TPtr info = shard.second;
auto tabletId = context.SS->ShardInfos[shardIdx].TabletID;
@@ -399,7 +399,7 @@ public:
return result;
}
- TPersQueueGroupInfo::TPtr pqGroup = context.SS->PersQueueGroups.at(path.Base()->PathId);
+ TTopicInfo::TPtr pqGroup = context.SS->Topics.at(path.Base()->PathId);
Y_VERIFY(pqGroup);
if (pqGroup->AlterData) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index aebd5be3062..7d02c3cdd4f 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -339,7 +339,7 @@ void TSchemeShard::Clear() {
ShardsWithBorrowed.clear();
ShardsWithLoaned.clear();
- PersQueueGroups.clear();
+ Topics.clear();
RtmrVolumes.clear();
SubDomains.clear();
BlockStoreVolumes.clear();
@@ -505,8 +505,8 @@ void TSchemeShard::ClearDescribePathCaches(const TPathElement::TPtr node, bool f
node->PreSerializedChildrenListing.clear();
if (node->PathType == NKikimrSchemeOp::EPathType::EPathTypePersQueueGroup) {
- Y_VERIFY(PersQueueGroups.contains(node->PathId));
- TPersQueueGroupInfo::TPtr pqGroup = PersQueueGroups.at(node->PathId);
+ Y_VERIFY(Topics.contains(node->PathId));
+ TTopicInfo::TPtr pqGroup = Topics.at(node->PathId);
pqGroup->PreSerializedPathDescription.clear();
pqGroup->PreSerializedPartitionsDescription.clear();
} else if (node->PathType == NKikimrSchemeOp::EPathType::EPathTypeTable) {
@@ -2466,7 +2466,7 @@ void TSchemeShard::PersistAddAlterTable(NIceDb::TNiceDb& db, TPathId pathId, con
}
}
-void TSchemeShard::PersistPersQueueGroup(NIceDb::TNiceDb& db, TPathId pathId, const TPersQueueGroupInfo::TPtr pqGroup) {
+void TSchemeShard::PersistPersQueueGroup(NIceDb::TNiceDb& db, TPathId pathId, const TTopicInfo::TPtr pqGroup) {
Y_VERIFY(IsLocalId(pathId));
db.Table<Schema::PersQueueGroups>().Key(pathId.LocalPathId).Update(
@@ -2480,28 +2480,28 @@ void TSchemeShard::PersistPersQueueGroup(NIceDb::TNiceDb& db, TPathId pathId, co
void TSchemeShard::PersistRemovePersQueueGroup(NIceDb::TNiceDb& db, TPathId pathId) {
Y_VERIFY(IsLocalId(pathId));
- auto it = PersQueueGroups.find(pathId);
- if (it != PersQueueGroups.end()) {
- TPersQueueGroupInfo::TPtr pqGroup = it->second;
+ auto it = Topics.find(pathId);
+ if (it != Topics.end()) {
+ TTopicInfo::TPtr pqGroup = it->second;
if (pqGroup->AlterData) {
PersistRemovePersQueueGroupAlter(db, pathId);
}
for (const auto& shard : pqGroup->Shards) {
- for (const auto& pqInfo : shard.second->PQInfos) {
+ for (const auto& pqInfo : shard.second->Partitions) {
PersistRemovePersQueue(db, pathId, pqInfo.PqId);
}
}
- PersQueueGroups.erase(it);
+ Topics.erase(it);
DecrementPathDbRefCount(pathId);
}
db.Table<Schema::PersQueueGroups>().Key(pathId.LocalPathId).Delete();
}
-void TSchemeShard::PersistAddPersQueueGroupAlter(NIceDb::TNiceDb& db, TPathId pathId, const TPersQueueGroupInfo::TPtr alterData) {
+void TSchemeShard::PersistAddPersQueueGroupAlter(NIceDb::TNiceDb& db, TPathId pathId, const TTopicInfo::TPtr alterData) {
Y_VERIFY(IsLocalId(pathId));
db.Table<Schema::PersQueueGroupAlters>().Key(pathId.LocalPathId).Update(
@@ -2519,7 +2519,7 @@ void TSchemeShard::PersistRemovePersQueueGroupAlter(NIceDb::TNiceDb& db, TPathId
db.Table<Schema::PersQueueGroupAlters>().Key(pathId.LocalPathId).Delete();
}
-void TSchemeShard::PersistPersQueue(NIceDb::TNiceDb &db, TPathId pathId, TShardIdx shardIdx, const TPQShardInfo::TPersQueueInfo& pqInfo) {
+void TSchemeShard::PersistPersQueue(NIceDb::TNiceDb &db, TPathId pathId, TShardIdx shardIdx, const TTopicTabletInfo::TTopicPartitionInfo& pqInfo) {
Y_VERIFY(IsLocalId(pathId));
db.Table<Schema::PersQueues>().Key(pathId.LocalPathId, pqInfo.PqId).Update(
@@ -3744,8 +3744,8 @@ NKikimrSchemeOp::TPathVersion TSchemeShard::GetPathVersion(const TPath& path) co
generalVersion += result.GetTablePartitionVersion();
break;
case NKikimrSchemeOp::EPathType::EPathTypePersQueueGroup:
- Y_VERIFY(PersQueueGroups.contains(pathId));
- result.SetPQVersion(PersQueueGroups.at(pathId)->AlterVersion);
+ Y_VERIFY(Topics.contains(pathId));
+ result.SetPQVersion(Topics.at(pathId)->AlterVersion);
generalVersion += result.GetPQVersion();
break;
case NKikimrSchemeOp::EPathType::EPathTypeBlockStoreVolume:
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 0d46ef9a7aa..4b98be5fdef 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -201,7 +201,7 @@ public:
THashMap<TPathId, TTxId> LockedPaths;
- THashMap<TPathId, TPersQueueGroupInfo::TPtr> PersQueueGroups;
+ THashMap<TPathId, TTopicInfo::TPtr> Topics;
THashMap<TPathId, TRtmrVolumeInfo::TPtr> RtmrVolumes;
THashMap<TPathId, TSolomonVolumeInfo::TPtr> SolomonVolumes;
THashMap<TPathId, TSubDomainInfo::TPtr> SubDomains;
@@ -621,11 +621,11 @@ public:
void PersistTableAlterVersion(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo);
void PersistTableAltered(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo);
void PersistAddAlterTable(NIceDb::TNiceDb& db, TPathId pathId, const TTableInfo::TAlterDataPtr alter);
- void PersistPersQueueGroup(NIceDb::TNiceDb &db, TPathId pathId, const TPersQueueGroupInfo::TPtr);
+ void PersistPersQueueGroup(NIceDb::TNiceDb &db, TPathId pathId, const TTopicInfo::TPtr);
void PersistRemovePersQueueGroup(NIceDb::TNiceDb &db, TPathId pathId);
- void PersistAddPersQueueGroupAlter(NIceDb::TNiceDb &db, TPathId pathId, const TPersQueueGroupInfo::TPtr);
+ void PersistAddPersQueueGroupAlter(NIceDb::TNiceDb &db, TPathId pathId, const TTopicInfo::TPtr);
void PersistRemovePersQueueGroupAlter(NIceDb::TNiceDb &db, TPathId pathId);
- void PersistPersQueue(NIceDb::TNiceDb &db, TPathId pathId, TShardIdx shardIdx, const TPQShardInfo::TPersQueueInfo& pqInfo);
+ void PersistPersQueue(NIceDb::TNiceDb &db, TPathId pathId, TShardIdx shardIdx, const TTopicTabletInfo::TTopicPartitionInfo& pqInfo);
void PersistRemovePersQueue(NIceDb::TNiceDb &db, TPathId pathId, ui32 pqId);
void PersistRtmrVolume(NIceDb::TNiceDb &db, TPathId pathId, const TRtmrVolumeInfo::TPtr rtmrVol);
void PersistRemoveRtmrVolume(NIceDb::TNiceDb &db, TPathId pathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
index e5c75e9e907..578110f292b 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
@@ -1862,7 +1862,7 @@ const TString &TColumnFamiliesMerger::CanonizeName(const TString &familyName) {
return familyName;
}
-void TPQShardInfo::TKeyRange::SerializeToProto(NKikimrPQ::TPartitionKeyRange& proto) const {
+void TTopicTabletInfo::TKeyRange::SerializeToProto(NKikimrPQ::TPartitionKeyRange& proto) const {
if (FromBound) {
proto.SetFromBound(*FromBound);
}
@@ -1872,7 +1872,7 @@ void TPQShardInfo::TKeyRange::SerializeToProto(NKikimrPQ::TPartitionKeyRange& pr
}
}
-bool TPersQueueGroupInfo::FillKeySchema(const NKikimrPQ::TPQTabletConfig& tabletConfig, TString& error) {
+bool TTopicInfo::FillKeySchema(const NKikimrPQ::TPQTabletConfig& tabletConfig, TString& error) {
KeySchema.clear();
KeySchema.reserve(tabletConfig.PartitionKeySchemaSize());
@@ -1891,7 +1891,7 @@ bool TPersQueueGroupInfo::FillKeySchema(const NKikimrPQ::TPQTabletConfig& tablet
return true;
}
-bool TPersQueueGroupInfo::FillKeySchema(const TString& tabletConfig) {
+bool TTopicInfo::FillKeySchema(const TString& tabletConfig) {
NKikimrPQ::TPQTabletConfig proto;
if (!proto.ParseFromString(tabletConfig)) {
return false;
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 9dbf42fa513..30daf3e98f6 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -985,8 +985,8 @@ struct TColumnTableInfo : TSimpleRefCount<TColumnTableInfo> {
}
};
-struct TPQShardInfo : TSimpleRefCount<TPQShardInfo> {
- using TPtr = TIntrusivePtr<TPQShardInfo>;
+struct TTopicTabletInfo : TSimpleRefCount<TTopicTabletInfo> {
+ using TPtr = TIntrusivePtr<TTopicTabletInfo>;
using TKeySchema = TVector<NScheme::TTypeInfo>;
struct TKeyRange {
@@ -996,17 +996,17 @@ struct TPQShardInfo : TSimpleRefCount<TPQShardInfo> {
void SerializeToProto(NKikimrPQ::TPartitionKeyRange& proto) const;
};
- struct TPersQueueInfo {
+ struct TTopicPartitionInfo {
ui32 PqId = 0;
ui32 GroupId = 0;
ui64 AlterVersion = 0;
TMaybe<TKeyRange> KeyRange;
};
- TVector<TPersQueueInfo> PQInfos;
+ TVector<TTopicPartitionInfo> Partitions;
size_t PartsCount() const {
- return PQInfos.size();
+ return Partitions.size();
}
};
@@ -1122,12 +1122,18 @@ struct TShardInfo {
}
};
-struct TPersQueueGroupInfo : TSimpleRefCount<TPersQueueGroupInfo> {
- using TPtr = TIntrusivePtr<TPersQueueGroupInfo>;
- using TKeySchema = TPQShardInfo::TKeySchema;
+/**
+ * TTopicInfo -> TTopicTabletInfo -> TTopicPartitionInfo
+ *
+ * Each topic may contains many tablets.
+ * Each tablet may serve many partitions.
+ */
+struct TTopicInfo : TSimpleRefCount<TTopicInfo> {
+ using TPtr = TIntrusivePtr<TTopicInfo>;
+ using TKeySchema = TTopicTabletInfo::TKeySchema;
struct TPartitionToAdd {
- using TKeyRange = TPQShardInfo::TKeyRange;
+ using TKeyRange = TTopicTabletInfo::TKeyRange;
ui32 PartitionId;
ui32 GroupId;
@@ -1162,9 +1168,9 @@ struct TPersQueueGroupInfo : TSimpleRefCount<TPersQueueGroupInfo> {
ui64 AlterVersion = 0;
TString TabletConfig;
TString BootstrapConfig;
- THashMap<TShardIdx, TPQShardInfo::TPtr> Shards; // key - shardIdx
+ THashMap<TShardIdx, TTopicTabletInfo::TPtr> Shards; // key - shardIdx
TKeySchema KeySchema;
- TPersQueueGroupInfo::TPtr AlterData; // changes to be applied
+ TTopicInfo::TPtr AlterData; // changes to be applied
TTabletId BalancerTabletID = InvalidTabletId;
TShardIdx BalancerShardIdx = InvalidShardIdx;
@@ -1201,7 +1207,7 @@ struct TPersQueueGroupInfo : TSimpleRefCount<TPersQueueGroupInfo> {
return Shards.size();
}
- void PrepareAlter(TPersQueueGroupInfo::TPtr alterData) {
+ void PrepareAlter(TTopicInfo::TPtr alterData) {
Y_VERIFY(alterData, "No alter data at Alter prepare");
alterData->AlterVersion = AlterVersion + 1;
Y_VERIFY(alterData->TotalGroupCount);
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
index f98fab70c6b..1c96351be54 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
@@ -77,10 +77,10 @@ void TPathDescriber::FillChildDescr(NKikimrSchemeOp::TDirEntry* descr, TPathElem
}
if (pathEl->PathType == NKikimrSchemeOp::EPathTypePersQueueGroup) {
- auto it = Self->PersQueueGroups.FindPtr(pathEl->PathId);
+ auto it = Self->Topics.FindPtr(pathEl->PathId);
Y_VERIFY(it, "PersQueueGroup is not found");
- TPersQueueGroupInfo::TPtr pqGroupInfo = *it;
+ TTopicInfo::TPtr pqGroupInfo = *it;
if (pqGroupInfo->HasBalancer()) {
descr->SetBalancerTabletID(ui64(pqGroupInfo->BalancerTabletID));
}
@@ -411,9 +411,9 @@ void TPathDescriber::DescribeColumnTable(TPathId pathId, TPathElement::TPtr path
}
void TPathDescriber::DescribePersQueueGroup(TPathId pathId, TPathElement::TPtr pathEl) {
- auto it = Self->PersQueueGroups.FindPtr(pathId);
+ auto it = Self->Topics.FindPtr(pathId);
Y_VERIFY(it, "PersQueueGroup is not found");
- TPersQueueGroupInfo::TPtr pqGroupInfo = *it;
+ TTopicInfo::TPtr pqGroupInfo = *it;
if (pqGroupInfo->PreSerializedPathDescription.empty()) {
NKikimrScheme::TEvDescribeSchemeResult preSerializedResult;
@@ -444,7 +444,7 @@ void TPathDescriber::DescribePersQueueGroup(TPathId pathId, TPathElement::TPtr p
struct TPartitionDesc {
TTabletId TabletId = InvalidTabletId;
- const TPQShardInfo::TPersQueueInfo* Info = nullptr;
+ const TTopicTabletInfo::TTopicPartitionInfo* Info = nullptr;
};
TVector<TPartitionDesc> descriptions; // index is pqId
@@ -454,7 +454,7 @@ void TPathDescriber::DescribePersQueueGroup(TPathId pathId, TPathElement::TPtr p
auto it = Self->ShardInfos.find(shardIdx);
Y_VERIFY_S(it != Self->ShardInfos.end(), "No shard with shardIdx: " << shardIdx);
- for (const auto& pq : pqShard->PQInfos) {
+ for (const auto& pq : pqShard->Partitions) {
if (pq.AlterVersion <= pqGroupInfo->AlterVersion) {
Y_VERIFY_S(pq.PqId < pqGroupInfo->NextPartitionId,
"Wrong pqId: " << pq.PqId << ", nextPqId: " << pqGroupInfo->NextPartitionId);
@@ -498,7 +498,7 @@ void TPathDescriber::DescribePersQueueGroup(TPathId pathId, TPathElement::TPtr p
for (const auto& [shardIdx, pqShard] : pqGroupInfo->Shards) {
const auto& shardInfo = Self->ShardInfos.at(shardIdx);
- for (const auto& pq : pqShard->PQInfos) {
+ for (const auto& pq : pqShard->Partitions) {
if (pq.AlterVersion <= pqGroupInfo->AlterVersion) {
auto partition = allocate->MutablePartitions()->Add();
partition->SetPartitionId(pq.PqId);