summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <[email protected]>2023-10-07 16:59:50 +0300
committerilnaz <[email protected]>2023-10-07 17:22:23 +0300
commitfc0878d1e3ab56a34ddd53269ed1299e54358fed (patch)
tree7164a743631abc3d9123dcab5dd7c0f8db395199
parent00502eb58844fa0201623d8e5386dcfbf1408e86 (diff)
Fill bootstrap config just before sending KIKIMR-19307
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.h56
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp15
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp12
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp41
5 files changed, 105 insertions, 35 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
index 1d6b4428e17..2fd931b943e 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
@@ -695,6 +695,7 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans
const TTopicTabletInfo& pqShard,
const TString& topicName,
const TString& topicPath,
+ const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
const TString& cloudId,
const TString& folderId,
const TString& databaseId,
@@ -715,9 +716,10 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans
folderId,
databaseId,
databasePath);
- MakeBootstrapConfig(*event->Record.MutableConfig()->MutableBootstrapConfig(),
- pqGroup,
- txType);
+ if (bootstrapConfig) {
+ Y_VERIFY(txType == TTxState::TxCreatePQGroup);
+ event->Record.MutableConfig()->MutableBootstrapConfig()->CopyFrom(*bootstrapConfig);
+ }
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Propose configure PersQueue" <<
@@ -731,6 +733,7 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId
const TTopicTabletInfo& pqShard,
const TString& topicName,
const TString& topicPath,
+ const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
const TString& cloudId,
const TString& folderId,
const TString& databaseId,
@@ -750,9 +753,10 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId
folderId,
databaseId,
databasePath);
- MakeBootstrapConfig(*event->Record.MutableBootstrapConfig(),
- pqGroup,
- txType);
+ if (bootstrapConfig) {
+ Y_VERIFY(txType == TTxState::TxCreatePQGroup);
+ event->Record.MutableBootstrapConfig()->CopyFrom(*bootstrapConfig);
+ }
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Propose configure PersQueue" <<
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
index 735cc89563e..47c80bde218 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
@@ -2,9 +2,11 @@
#include "schemeshard__operation_part.h"
#include "schemeshard_impl.h"
+
+#include <ydb/core/base/subdomain.h>
+#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/tx_processing.h>
-#include <ydb/core/base/subdomain.h>
namespace NKikimr {
namespace NSchemeShard {
@@ -657,7 +659,39 @@ public:
}
TString databasePath = TPath::Init(context.SS->RootPathId(), context.SS).PathString();
- TString topicPath = TPath::Init(txState->TargetPathId, context.SS).PathString();
+ auto topicPath = TPath::Init(txState->TargetPathId, context.SS);
+
+ std::optional<NKikimrPQ::TBootstrapConfig> bootstrapConfig;
+ if (txState->TxType == TTxState::TxCreatePQGroup && topicPath.Parent().IsCdcStream()) {
+ bootstrapConfig.emplace();
+
+ auto tablePath = topicPath.Parent().Parent(); // table/cdc_stream/topic
+ Y_VERIFY(tablePath.IsResolved());
+
+ Y_VERIFY(context.SS->Tables.contains(tablePath.Base()->PathId));
+ auto table = context.SS->Tables.at(tablePath.Base()->PathId);
+
+ const auto& partitions = table->GetPartitions();
+
+ for (ui32 i = 0; i < partitions.size(); ++i) {
+ const auto& cur = partitions.at(i);
+
+ Y_VERIFY(context.SS->ShardInfos.contains(cur.ShardIdx));
+ const auto& shard = context.SS->ShardInfos.at(cur.ShardIdx);
+
+ auto& mg = *bootstrapConfig->AddExplicitMessageGroups();
+ mg.SetId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(shard.TabletID)));
+
+ if (i != partitions.size() - 1) {
+ mg.MutableKeyRange()->SetToBound(cur.EndOfRange);
+ }
+
+ if (i) {
+ const auto& prev = partitions.at(i - 1);
+ mg.MutableKeyRange()->SetFromBound(prev.EndOfRange);
+ }
+ }
+ }
for (auto shard : txState->Shards) {
TShardIdx idx = shard.Idx;
@@ -680,7 +714,8 @@ public:
*pqGroup,
*pqShard,
topicName,
- topicPath,
+ topicPath.PathString(),
+ bootstrapConfig,
cloudId,
folderId,
databaseId,
@@ -692,7 +727,8 @@ public:
*pqGroup,
*pqShard,
topicName,
- topicPath,
+ topicPath.PathString(),
+ bootstrapConfig,
cloudId,
folderId,
databaseId,
@@ -839,16 +875,6 @@ private:
}
}
- static void MakeBootstrapConfig(NKikimrPQ::TBootstrapConfig& config,
- const TTopicInfo& pqGroup,
- TTxState::ETxType txType)
- {
- if (pqGroup.AlterData && pqGroup.AlterData->BootstrapConfig) {
- Y_VERIFY(txType == TTxState::TxCreatePQGroup);
- Y_VERIFY(ParseFromStringNoSizeLimit(config, pqGroup.AlterData->BootstrapConfig));
- }
- }
-
static void ParsePQTabletConfig(NKikimrPQ::TPQTabletConfig& config,
const TTopicInfo& pqGroup)
{
@@ -869,6 +895,7 @@ private:
const TTopicTabletInfo& pqShard,
const TString& topicName,
const TString& topicPath,
+ const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
const TString& cloudId,
const TString& folderId,
const TString& databaseId,
@@ -881,6 +908,7 @@ private:
const TTopicTabletInfo& pqShard,
const TString& topicName,
const TString& topicPath,
+ const std::optional<NKikimrPQ::TBootstrapConfig>& bootstrapConfig,
const TString& cloudId,
const TString& folderId,
const TString& databaseId,
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index 55b6af0e5e9..5aa384ab1e9 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -3,7 +3,6 @@
#include "schemeshard_impl.h"
#include <ydb/core/engine/mkql_proto.h>
-#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/scheme/scheme_types_proto.h>
#define LOG_D(stream) LOG_DEBUG_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream)
@@ -796,16 +795,9 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
}
}
- auto& bootstrapConfig = *desc.MutableBootstrapConfig();
for (ui32 i = 0; i < partitions.size(); ++i) {
const auto& cur = partitions.at(i);
- Y_VERIFY(context.SS->ShardInfos.contains(cur.ShardIdx));
- const auto& shard = context.SS->ShardInfos.at(cur.ShardIdx);
-
- auto& mg = *bootstrapConfig.AddExplicitMessageGroups();
- mg.SetId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(shard.TabletID)));
-
if (i != partitions.size() - 1) {
TSerializedCellVec endKey(cur.EndOfRange);
Y_VERIFY(endKey.GetCells().size() <= table->KeyColumnIds.size());
@@ -820,13 +812,6 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
const bool ok = NMiniKQL::CellToValue(typeId, cell, *boundary.AddTuple(), errStr);
Y_VERIFY(ok, "Failed to build key tuple at position %" PRIu32 " error: %s", ki, errStr.data());
}
-
- mg.MutableKeyRange()->SetToBound(cur.EndOfRange);
- }
-
- if (i) {
- const auto& prev = partitions.at(i - 1);
- mg.MutableKeyRange()->SetFromBound(prev.EndOfRange);
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
index 15dc6e0aee6..d3d235a54f3 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp
@@ -504,6 +504,18 @@ public:
context.OnComplete.Dependence(parentTxId, OperationId.GetTxId());
}
+ if (parentPath.Base()->IsCdcStream()) {
+ auto tablePath = parentPath.Parent();
+ Y_VERIFY(tablePath.IsResolved());
+
+ Y_VERIFY(context.SS->Tables.contains(tablePath.Base()->PathId));
+ auto table = context.SS->Tables.at(tablePath.Base()->PathId);
+
+ for (const auto& splitOpId : table->GetSplitOpsInFlight()) {
+ context.OnComplete.Dependence(splitOpId.GetTxId(), OperationId.GetTxId());
+ }
+ }
+
context.SS->ChangeTxState(db, OperationId, TTxState::CreateParts);
context.OnComplete.ActivateTx(OperationId);
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
index 68fb9aa88af..baa0cae9d03 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
@@ -654,4 +654,45 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
});
}
+ Y_UNIT_TEST(RacySplitTableAndCreateStream) {
+ TTestWithReboots t;
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ {
+ TInactiveZone inactive(activeZone);
+ TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+ }
+
+ AsyncSplitTable(runtime, ++t.TxId, "/MyRoot/Table", Sprintf(R"(
+ SourceTabletId: %lu
+ SplitBoundary {
+ KeyPrefix {
+ Tuple { Optional { Uint64: 2 } }
+ }
+ }
+ )", TTestTxConfig::FakeHiveTablets));
+
+ AsyncCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ )");
+
+ t.TestEnv->TestWaitNotification(runtime, {t.TxId - 1, t.TxId});
+
+ {
+ TInactiveZone inactive(activeZone);
+ CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1});
+ }
+ });
+ }
+
} // TCdcStreamWithRebootsTests