aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-03-28 15:03:21 +0300
committerabcdef <akotov@ydb.tech>2023-03-28 15:03:21 +0300
commite0596228fcbe46ce0c5933242294c0228e9bd03d (patch)
tree8ff23ee8b34815ddf9e4571b973e891e52ba1b1d
parent33421d638103cc382ba851d2491740e2db576307 (diff)
downloadydb-e0596228fcbe46ce0c5933242294c0228e9bd03d.tar.gz
functions for creating PQ configs
Выделил функции для создания конфигов PQ. Они понадобятся позже для события `TEvPersQueue::TEvProposeTransaction`
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.h114
1 files changed, 75 insertions, 39 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
index df3d99dccf..21c651c5da 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
@@ -639,12 +639,6 @@ public:
"pqGroup is null"
<< ", pathId " << txState->TargetPathId);
- TString* tabletConfig = &pqGroup->TabletConfig;
- if (pqGroup->AlterData) {
- if (!pqGroup->AlterData->TabletConfig.empty())
- tabletConfig = &pqGroup->AlterData->TabletConfig;
- }
-
const TPathElement::TPtr dbRootEl = context.SS->PathsById.at(context.SS->RootPathId());
TString cloudId;
if (dbRootEl->UserAttrs->Attrs.contains("cloud_id")) {
@@ -660,6 +654,7 @@ public:
}
TString databasePath = TPath::Init(context.SS->RootPathId(), context.SS).PathString();
+ TString topicPath = TPath::Init(txState->TargetPathId, context.SS).PathString();
for (auto shard : txState->Shards) {
TShardIdx idx = shard.Idx;
@@ -678,36 +673,19 @@ public:
TAutoPtr<TEvPersQueue::TEvUpdateConfig> event(new TEvPersQueue::TEvUpdateConfig());
event->Record.SetTxId(ui64(OperationId.GetTxId()));
- if (!tabletConfig->empty()) {
- bool parseOk = ParseFromStringNoSizeLimit(*event->Record.MutableTabletConfig(), *tabletConfig);
- Y_VERIFY(parseOk);
- }
- event->Record.MutableTabletConfig()->SetTopicName(topicName);
- event->Record.MutableTabletConfig()->SetTopicPath(TPath::Init(txState->TargetPathId, context.SS).PathString());
- event->Record.MutableTabletConfig()->MutablePartitionConfig()->SetTotalPartitions(pqGroup->AlterData ? pqGroup->AlterData->TotalGroupCount : pqGroup->TotalGroupCount);
-
- event->Record.MutableTabletConfig()->SetYdbDatabaseId(databaseId);
- event->Record.MutableTabletConfig()->SetYcCloudId(cloudId);
- event->Record.MutableTabletConfig()->SetYcFolderId(folderId);
- event->Record.MutableTabletConfig()->SetYdbDatabasePath(databasePath);
-
- event->Record.MutableTabletConfig()->SetVersion(pqGroup->AlterData->AlterVersion);
- for (const auto& pq : pqShard->Partitions) {
- event->Record.MutableTabletConfig()->AddPartitionIds(pq.PqId);
-
- auto& partition = *event->Record.MutableTabletConfig()->AddPartitions();
- partition.SetPartitionId(pq.PqId);
- if (pq.KeyRange) {
- pq.KeyRange->SerializeToProto(*partition.MutableKeyRange());
- }
- }
-
- if (pqGroup->AlterData && pqGroup->AlterData->BootstrapConfig) {
- Y_VERIFY(txState->TxType == TTxState::TxCreatePQGroup);
- const bool ok = ParseFromStringNoSizeLimit(*event->Record.MutableBootstrapConfig(), pqGroup->AlterData->BootstrapConfig);
- Y_VERIFY(ok);
- }
+ MakePQTabletConfig(*event->Record.MutableTabletConfig(),
+ *pqGroup,
+ *pqShard,
+ topicName,
+ topicPath,
+ cloudId,
+ folderId,
+ databaseId,
+ databasePath);
+ MakeBootstrapConfig(*event->Record.MutableBootstrapConfig(),
+ *pqGroup,
+ txState->TxType);
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Propose configure PersQueue"
@@ -738,10 +716,7 @@ public:
TAutoPtr<TEvPersQueue::TEvUpdateBalancerConfig> event(new TEvPersQueue::TEvUpdateBalancerConfig());
event->Record.SetTxId(ui64(OperationId.GetTxId()));
- if (!tabletConfig->empty()) {
- bool parseOk = ParseFromStringNoSizeLimit(*event->Record.MutableTabletConfig(), *tabletConfig);
- Y_VERIFY(parseOk);
- }
+ ParsePQTabletConfig(*event->Record.MutableTabletConfig(), *pqGroup);
Y_VERIFY(pqGroup->AlterData);
@@ -789,6 +764,67 @@ public:
txState->UpdateShardsInProgress();
return false;
}
+
+private:
+ static void MakePQTabletConfig(NKikimrPQ::TPQTabletConfig& config,
+ const TTopicInfo& pqGroup,
+ const TTopicTabletInfo& pqShard,
+ const TString& topicName,
+ const TString& topicPath,
+ const TString& cloudId,
+ const TString& folderId,
+ const TString& databaseId,
+ const TString& databasePath)
+ {
+ ParsePQTabletConfig(config, pqGroup);
+
+ config.SetTopicName(topicName);
+ config.SetTopicPath(topicPath);
+ config.MutablePartitionConfig()->SetTotalPartitions(pqGroup.AlterData ? pqGroup.AlterData->TotalGroupCount : pqGroup.TotalGroupCount);
+
+ config.SetYcCloudId(cloudId);
+ config.SetYcFolderId(folderId);
+ config.SetYdbDatabaseId(databaseId);
+ config.SetYdbDatabasePath(databasePath);
+
+ if (pqGroup.AlterData) {
+ config.SetVersion(pqGroup.AlterData->AlterVersion);
+ }
+
+ for (const auto& pq : pqShard.Partitions) {
+ config.AddPartitionIds(pq.PqId);
+
+ auto& partition = *config.AddPartitions();
+ partition.SetPartitionId(pq.PqId);
+ if (pq.KeyRange) {
+ pq.KeyRange->SerializeToProto(*partition.MutableKeyRange());
+ }
+ }
+ }
+
+ static void MakeBootstrapConfig(NKikimrPQ::TBootstrapConfig& config,
+ const TTopicInfo& pqGroup,
+ TTxState::ETxType txType)
+ {
+ if (pqGroup.AlterData && pqGroup.AlterData->BootstrapConfig) {
+ Y_VERIFY(txType == TTxState::TxCreatePQGroup);
+ Y_VERIFY(ParseFromStringNoSizeLimit(config, pqGroup.AlterData->BootstrapConfig));
+ }
+ }
+
+ static void ParsePQTabletConfig(NKikimrPQ::TPQTabletConfig& config,
+ const TTopicInfo& pqGroup)
+ {
+ const TString* source = &pqGroup.TabletConfig;
+ if (pqGroup.AlterData) {
+ if (!pqGroup.AlterData->TabletConfig.empty())
+ source = &pqGroup.AlterData->TabletConfig;
+ }
+
+ if (!source->empty()) {
+ Y_VERIFY(ParseFromStringNoSizeLimit(config, *source));
+ }
+ }
};
class TPropose: public TSubOperationState {