diff options
author | ilnaz <[email protected]> | 2023-10-07 16:59:50 +0300 |
---|---|---|
committer | ilnaz <[email protected]> | 2023-10-07 17:22:23 +0300 |
commit | fc0878d1e3ab56a34ddd53269ed1299e54358fed (patch) | |
tree | 7164a743631abc3d9123dcab5dd7c0f8db395199 | |
parent | 00502eb58844fa0201623d8e5386dcfbf1408e86 (diff) |
Fill bootstrap config just before sending KIKIMR-19307
5 files changed, 105 insertions, 35 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp index 1d6b4428e17..2fd931b943e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp @@ -695,6 +695,7 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans const TTopicTabletInfo& pqShard, const TString& topicName, const TString& topicPath, + const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig, const TString& cloudId, const TString& folderId, const TString& databaseId, @@ -715,9 +716,10 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans folderId, databaseId, databasePath); - MakeBootstrapConfig(*event->Record.MutableConfig()->MutableBootstrapConfig(), - pqGroup, - txType); + if (bootstrapConfig) { + Y_VERIFY(txType == TTxState::TxCreatePQGroup); + event->Record.MutableConfig()->MutableBootstrapConfig()->CopyFrom(*bootstrapConfig); + } LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Propose configure PersQueue" << @@ -731,6 +733,7 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId const TTopicTabletInfo& pqShard, const TString& topicName, const TString& topicPath, + const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig, const TString& cloudId, const TString& folderId, const TString& databaseId, @@ -750,9 +753,10 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId folderId, databaseId, databasePath); - MakeBootstrapConfig(*event->Record.MutableBootstrapConfig(), - pqGroup, - txType); + if (bootstrapConfig) { + Y_VERIFY(txType == TTxState::TxCreatePQGroup); + event->Record.MutableBootstrapConfig()->CopyFrom(*bootstrapConfig); + } LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Propose configure PersQueue" << diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index 735cc89563e..47c80bde218 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -2,9 +2,11 @@ #include "schemeshard__operation_part.h" #include "schemeshard_impl.h" + +#include <ydb/core/base/subdomain.h> +#include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/core/tx/columnshard/columnshard.h> #include <ydb/core/tx/tx_processing.h> -#include <ydb/core/base/subdomain.h> namespace NKikimr { namespace NSchemeShard { @@ -657,7 +659,39 @@ public: } TString databasePath = TPath::Init(context.SS->RootPathId(), context.SS).PathString(); - TString topicPath = TPath::Init(txState->TargetPathId, context.SS).PathString(); + auto topicPath = TPath::Init(txState->TargetPathId, context.SS); + + std::optional<NKikimrPQ::TBootstrapConfig> bootstrapConfig; + if (txState->TxType == TTxState::TxCreatePQGroup && topicPath.Parent().IsCdcStream()) { + bootstrapConfig.emplace(); + + auto tablePath = topicPath.Parent().Parent(); // table/cdc_stream/topic + Y_VERIFY(tablePath.IsResolved()); + + Y_VERIFY(context.SS->Tables.contains(tablePath.Base()->PathId)); + auto table = context.SS->Tables.at(tablePath.Base()->PathId); + + const auto& partitions = table->GetPartitions(); + + for (ui32 i = 0; i < partitions.size(); ++i) { + const auto& cur = partitions.at(i); + + Y_VERIFY(context.SS->ShardInfos.contains(cur.ShardIdx)); + const auto& shard = context.SS->ShardInfos.at(cur.ShardIdx); + + auto& mg = *bootstrapConfig->AddExplicitMessageGroups(); + mg.SetId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(shard.TabletID))); + + if (i != partitions.size() - 1) { + mg.MutableKeyRange()->SetToBound(cur.EndOfRange); + } + + if (i) { + const auto& prev = partitions.at(i - 1); + mg.MutableKeyRange()->SetFromBound(prev.EndOfRange); + } + } + } for (auto shard : txState->Shards) { TShardIdx idx = shard.Idx; @@ -680,7 +714,8 @@ public: *pqGroup, *pqShard, topicName, - topicPath, + topicPath.PathString(), + bootstrapConfig, cloudId, folderId, databaseId, @@ -692,7 +727,8 @@ public: *pqGroup, *pqShard, topicName, - topicPath, + topicPath.PathString(), + bootstrapConfig, cloudId, folderId, databaseId, @@ -839,16 +875,6 @@ private: } } - static void MakeBootstrapConfig(NKikimrPQ::TBootstrapConfig& config, - const TTopicInfo& pqGroup, - TTxState::ETxType txType) - { - if (pqGroup.AlterData && pqGroup.AlterData->BootstrapConfig) { - Y_VERIFY(txType == TTxState::TxCreatePQGroup); - Y_VERIFY(ParseFromStringNoSizeLimit(config, pqGroup.AlterData->BootstrapConfig)); - } - } - static void ParsePQTabletConfig(NKikimrPQ::TPQTabletConfig& config, const TTopicInfo& pqGroup) { @@ -869,6 +895,7 @@ private: const TTopicTabletInfo& pqShard, const TString& topicName, const TString& topicPath, + const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig, const TString& cloudId, const TString& folderId, const TString& databaseId, @@ -881,6 +908,7 @@ private: const TTopicTabletInfo& pqShard, const TString& topicName, const TString& topicPath, + const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig, const TString& cloudId, const TString& folderId, const TString& databaseId, diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 55b6af0e5e9..5aa384ab1e9 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -3,7 +3,6 @@ #include "schemeshard_impl.h" #include <ydb/core/engine/mkql_proto.h> -#include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/core/scheme/scheme_types_proto.h> #define LOG_D(stream) LOG_DEBUG_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) @@ -796,16 +795,9 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran } } - auto& bootstrapConfig = *desc.MutableBootstrapConfig(); for (ui32 i = 0; i < partitions.size(); ++i) { const auto& cur = partitions.at(i); - Y_VERIFY(context.SS->ShardInfos.contains(cur.ShardIdx)); - const auto& shard = context.SS->ShardInfos.at(cur.ShardIdx); - - auto& mg = *bootstrapConfig.AddExplicitMessageGroups(); - mg.SetId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(shard.TabletID))); - if (i != partitions.size() - 1) { TSerializedCellVec endKey(cur.EndOfRange); Y_VERIFY(endKey.GetCells().size() <= table->KeyColumnIds.size()); @@ -820,13 +812,6 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran const bool ok = NMiniKQL::CellToValue(typeId, cell, *boundary.AddTuple(), errStr); Y_VERIFY(ok, "Failed to build key tuple at position %" PRIu32 " error: %s", ki, errStr.data()); } - - mg.MutableKeyRange()->SetToBound(cur.EndOfRange); - } - - if (i) { - const auto& prev = partitions.at(i - 1); - mg.MutableKeyRange()->SetFromBound(prev.EndOfRange); } } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp index 15dc6e0aee6..d3d235a54f3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp @@ -504,6 +504,18 @@ public: context.OnComplete.Dependence(parentTxId, OperationId.GetTxId()); } + if (parentPath.Base()->IsCdcStream()) { + auto tablePath = parentPath.Parent(); + Y_VERIFY(tablePath.IsResolved()); + + Y_VERIFY(context.SS->Tables.contains(tablePath.Base()->PathId)); + auto table = context.SS->Tables.at(tablePath.Base()->PathId); + + for (const auto& splitOpId : table->GetSplitOpsInFlight()) { + context.OnComplete.Dependence(splitOpId.GetTxId(), OperationId.GetTxId()); + } + } + context.SS->ChangeTxState(db, OperationId, TTxState::CreateParts); context.OnComplete.ActivateTx(OperationId); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp index 68fb9aa88af..baa0cae9d03 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp @@ -654,4 +654,45 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { }); } + Y_UNIT_TEST(RacySplitTableAndCreateStream) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + } + + AsyncSplitTable(runtime, ++t.TxId, "/MyRoot/Table", Sprintf(R"( + SourceTabletId: %lu + SplitBoundary { + KeyPrefix { + Tuple { Optional { Uint64: 2 } } + } + } + )", TTestTxConfig::FakeHiveTablets)); + + AsyncCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + + t.TestEnv->TestWaitNotification(runtime, {t.TxId - 1, t.TxId}); + + { + TInactiveZone inactive(activeZone); + CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1}); + } + }); + } + } // TCdcStreamWithRebootsTests |