diff options
author | abcdef <akotov@ydb.tech> | 2023-03-28 15:03:21 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-03-28 15:03:21 +0300 |
commit | e0596228fcbe46ce0c5933242294c0228e9bd03d (patch) | |
tree | 8ff23ee8b34815ddf9e4571b973e891e52ba1b1d | |
parent | 33421d638103cc382ba851d2491740e2db576307 (diff) | |
download | ydb-e0596228fcbe46ce0c5933242294c0228e9bd03d.tar.gz |
functions for creating PQ configs
Выделил функции для создания конфигов PQ. Они понадобятся позже для события `TEvPersQueue::TEvProposeTransaction`
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_common.h | 114 |
1 files changed, 75 insertions, 39 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index df3d99dccf..21c651c5da 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -639,12 +639,6 @@ public: "pqGroup is null" << ", pathId " << txState->TargetPathId); - TString* tabletConfig = &pqGroup->TabletConfig; - if (pqGroup->AlterData) { - if (!pqGroup->AlterData->TabletConfig.empty()) - tabletConfig = &pqGroup->AlterData->TabletConfig; - } - const TPathElement::TPtr dbRootEl = context.SS->PathsById.at(context.SS->RootPathId()); TString cloudId; if (dbRootEl->UserAttrs->Attrs.contains("cloud_id")) { @@ -660,6 +654,7 @@ public: } TString databasePath = TPath::Init(context.SS->RootPathId(), context.SS).PathString(); + TString topicPath = TPath::Init(txState->TargetPathId, context.SS).PathString(); for (auto shard : txState->Shards) { TShardIdx idx = shard.Idx; @@ -678,36 +673,19 @@ public: TAutoPtr<TEvPersQueue::TEvUpdateConfig> event(new TEvPersQueue::TEvUpdateConfig()); event->Record.SetTxId(ui64(OperationId.GetTxId())); - if (!tabletConfig->empty()) { - bool parseOk = ParseFromStringNoSizeLimit(*event->Record.MutableTabletConfig(), *tabletConfig); - Y_VERIFY(parseOk); - } - event->Record.MutableTabletConfig()->SetTopicName(topicName); - event->Record.MutableTabletConfig()->SetTopicPath(TPath::Init(txState->TargetPathId, context.SS).PathString()); - event->Record.MutableTabletConfig()->MutablePartitionConfig()->SetTotalPartitions(pqGroup->AlterData ? pqGroup->AlterData->TotalGroupCount : pqGroup->TotalGroupCount); - - event->Record.MutableTabletConfig()->SetYdbDatabaseId(databaseId); - event->Record.MutableTabletConfig()->SetYcCloudId(cloudId); - event->Record.MutableTabletConfig()->SetYcFolderId(folderId); - event->Record.MutableTabletConfig()->SetYdbDatabasePath(databasePath); - - event->Record.MutableTabletConfig()->SetVersion(pqGroup->AlterData->AlterVersion); - for (const auto& pq : pqShard->Partitions) { - event->Record.MutableTabletConfig()->AddPartitionIds(pq.PqId); - - auto& partition = *event->Record.MutableTabletConfig()->AddPartitions(); - partition.SetPartitionId(pq.PqId); - if (pq.KeyRange) { - pq.KeyRange->SerializeToProto(*partition.MutableKeyRange()); - } - } - - if (pqGroup->AlterData && pqGroup->AlterData->BootstrapConfig) { - Y_VERIFY(txState->TxType == TTxState::TxCreatePQGroup); - const bool ok = ParseFromStringNoSizeLimit(*event->Record.MutableBootstrapConfig(), pqGroup->AlterData->BootstrapConfig); - Y_VERIFY(ok); - } + MakePQTabletConfig(*event->Record.MutableTabletConfig(), + *pqGroup, + *pqShard, + topicName, + topicPath, + cloudId, + folderId, + databaseId, + databasePath); + MakeBootstrapConfig(*event->Record.MutableBootstrapConfig(), + *pqGroup, + txState->TxType); LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Propose configure PersQueue" @@ -738,10 +716,7 @@ public: TAutoPtr<TEvPersQueue::TEvUpdateBalancerConfig> event(new TEvPersQueue::TEvUpdateBalancerConfig()); event->Record.SetTxId(ui64(OperationId.GetTxId())); - if (!tabletConfig->empty()) { - bool parseOk = ParseFromStringNoSizeLimit(*event->Record.MutableTabletConfig(), *tabletConfig); - Y_VERIFY(parseOk); - } + ParsePQTabletConfig(*event->Record.MutableTabletConfig(), *pqGroup); Y_VERIFY(pqGroup->AlterData); @@ -789,6 +764,67 @@ public: txState->UpdateShardsInProgress(); return false; } + +private: + static void MakePQTabletConfig(NKikimrPQ::TPQTabletConfig& config, + const TTopicInfo& pqGroup, + const TTopicTabletInfo& pqShard, + const TString& topicName, + const TString& topicPath, + const TString& cloudId, + const TString& folderId, + const TString& databaseId, + const TString& databasePath) + { + ParsePQTabletConfig(config, pqGroup); + + config.SetTopicName(topicName); + config.SetTopicPath(topicPath); + config.MutablePartitionConfig()->SetTotalPartitions(pqGroup.AlterData ? pqGroup.AlterData->TotalGroupCount : pqGroup.TotalGroupCount); + + config.SetYcCloudId(cloudId); + config.SetYcFolderId(folderId); + config.SetYdbDatabaseId(databaseId); + config.SetYdbDatabasePath(databasePath); + + if (pqGroup.AlterData) { + config.SetVersion(pqGroup.AlterData->AlterVersion); + } + + for (const auto& pq : pqShard.Partitions) { + config.AddPartitionIds(pq.PqId); + + auto& partition = *config.AddPartitions(); + partition.SetPartitionId(pq.PqId); + if (pq.KeyRange) { + pq.KeyRange->SerializeToProto(*partition.MutableKeyRange()); + } + } + } + + static void MakeBootstrapConfig(NKikimrPQ::TBootstrapConfig& config, + const TTopicInfo& pqGroup, + TTxState::ETxType txType) + { + if (pqGroup.AlterData && pqGroup.AlterData->BootstrapConfig) { + Y_VERIFY(txType == TTxState::TxCreatePQGroup); + Y_VERIFY(ParseFromStringNoSizeLimit(config, pqGroup.AlterData->BootstrapConfig)); + } + } + + static void ParsePQTabletConfig(NKikimrPQ::TPQTabletConfig& config, + const TTopicInfo& pqGroup) + { + const TString* source = &pqGroup.TabletConfig; + if (pqGroup.AlterData) { + if (!pqGroup.AlterData->TabletConfig.empty()) + source = &pqGroup.AlterData->TabletConfig; + } + + if (!source->empty()) { + Y_VERIFY(ParseFromStringNoSizeLimit(config, *source)); + } + } }; class TPropose: public TSubOperationState { |