diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-09-14 12:36:09 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-09-14 12:54:39 +0300 |
commit | 1d2693bc60fd2d95c0d1f9333515bbfbba09ca10 (patch) | |
tree | fe549c0926f737d23c50b3587330065f44d6fdb7 | |
parent | 2af259c04c3a395f42998d78e1897129d66871fe (diff) | |
download | ydb-1d2693bc60fd2d95c0d1f9333515bbfbba09ca10.tar.gz |
Split & merge tests with reboots KIKIMR-19307
10 files changed, 173 insertions, 7 deletions
diff --git a/ydb/core/tx/datashard/change_exchange_split.cpp b/ydb/core/tx/datashard/change_exchange_split.cpp index 644640b45e..f889b400eb 100644 --- a/ydb/core/tx/datashard/change_exchange_split.cpp +++ b/ydb/core/tx/datashard/change_exchange_split.cpp @@ -19,17 +19,33 @@ namespace NKikimr { namespace NDataShard { class TCdcPartitionWorker: public TActorBootstrapped<TCdcPartitionWorker> { + TStringBuf GetLogPrefix() const { + if (!LogPrefix) { + LogPrefix = TStringBuilder() + << "[ChangeExchangeSplitCdcPartitionWorker]" + << "[" << SrcTabletId << "]" + << "[" << PartitionId << "]" + << SelfId() /* contains brackets */ << " "; + } + + return LogPrefix.GetRef(); + } + void Ack() { + LOG_I("Send ack"); Send(Parent, new TEvChangeExchange::TEvSplitAck()); PassAway(); } void Leave() { + LOG_I("Leave"); Send(Parent, new TEvents::TEvGone()); PassAway(); } void Handle(TEvPersQueue::TEvResponse::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + const auto& response = ev->Get()->Record; switch (response.GetStatus()) { @@ -52,12 +68,14 @@ class TCdcPartitionWorker: public TActorBootstrapped<TCdcPartitionWorker> { void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) { if (ev->Get()->TabletId == TabletId && ev->Get()->Status != NKikimrProto::OK) { + LOG_W("Pipe connection error"); Leave(); } } void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) { if (ev->Get()->TabletId == TabletId) { + LOG_W("Pipe disconnected"); Leave(); } } @@ -131,6 +149,7 @@ private: const ui64 TabletId; const ui64 SrcTabletId; const TVector<ui64> DstTabletIds; + mutable TMaybe<TString> LogPrefix; TActorId PipeClient; @@ -375,12 +394,7 @@ class TCdcWorker: public TActorBootstrapped<TCdcWorker>, private TSchemeCacheHel } STATEFN(StateWork) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvChangeExchange::TEvSplitAck, Handle); - hFunc(TEvents::TEvGone, Handle); - default: - return StateBase(ev); - } + return StateBase(ev); } void Handle(TEvChangeExchange::TEvSplitAck::TPtr& ev) { @@ -400,7 +414,7 @@ class TCdcWorker: public TActorBootstrapped<TCdcWorker>, private TSchemeCacheHel Workers[it->second] = TActorId(); Pending.erase(it); - if (Pending.empty()) { + if (!IsResolving() && Pending.empty()) { Ack(); } } @@ -457,6 +471,8 @@ public: STATEFN(StateBase) { switch (ev->GetTypeRewrite()) { + hFunc(TEvChangeExchange::TEvSplitAck, Handle); + hFunc(TEvents::TEvGone, Handle); sFunc(TEvents::TEvPoison, PassAway); } } diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp index aa4ec73049..bb726a3b35 100644 --- a/ydb/core/tx/datashard/datashard__init.cpp +++ b/ydb/core/tx/datashard/datashard__init.cpp @@ -117,6 +117,7 @@ void TDataShard::TTxInit::Complete(const TActorContext &ctx) { if (!Self->ChangesQueue) { if (!Self->ChangeExchangeSplitter.Done()) { + Self->KillChangeSender(ctx); Self->ChangeExchangeSplitter.DoSplit(ctx); } else { for (const auto dstTabletId : Self->ChangeSenderActivator.GetDstSet()) { diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp index 0b37d1a90d..139812dc5c 100644 --- a/ydb/core/tx/datashard/datashard_change_sending.cpp +++ b/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -322,6 +322,7 @@ public: } if (ChangeExchangeSplit) { + Self->KillChangeSender(ctx); Self->ChangeExchangeSplitter.DoSplit(ctx); } diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp index 427fff74db..acb7d8ef5f 100644 --- a/ydb/core/tx/datashard/datashard_split_src.cpp +++ b/ydb/core/tx/datashard/datashard_split_src.cpp @@ -416,6 +416,7 @@ public: LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Sending snapshots from src for split OpId " << Self->SrcSplitOpId); Self->SplitSrcSnapshotSender.DoSend(ctx); if (ChangeExchangeSplit) { + Self->KillChangeSender(ctx); Self->ChangeExchangeSplitter.DoSplit(ctx); } } diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.darwin-x86_64.txt index 67e3b2aae7..cce08c1a57 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.darwin-x86_64.txt @@ -21,6 +21,7 @@ target_link_libraries(ydb-core-tx-schemeshard-ut_cdc_stream_reboots PUBLIC cpp-testing-unittest_main core-tx-schemeshard tx-schemeshard-ut_helpers + core-persqueue-writer yql-sql-pg_dummy ) target_link_options(ydb-core-tx-schemeshard-ut_cdc_stream_reboots PRIVATE diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-aarch64.txt index cb547342de..200ad6c627 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-aarch64.txt @@ -21,6 +21,7 @@ target_link_libraries(ydb-core-tx-schemeshard-ut_cdc_stream_reboots PUBLIC cpp-testing-unittest_main core-tx-schemeshard tx-schemeshard-ut_helpers + core-persqueue-writer yql-sql-pg_dummy ) target_link_options(ydb-core-tx-schemeshard-ut_cdc_stream_reboots PRIVATE diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-x86_64.txt b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-x86_64.txt index d407e66ce1..d7131eba60 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-x86_64.txt @@ -22,6 +22,7 @@ target_link_libraries(ydb-core-tx-schemeshard-ut_cdc_stream_reboots PUBLIC cpp-testing-unittest_main core-tx-schemeshard tx-schemeshard-ut_helpers + core-persqueue-writer yql-sql-pg_dummy ) target_link_options(ydb-core-tx-schemeshard-ut_cdc_stream_reboots PRIVATE diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.windows-x86_64.txt b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.windows-x86_64.txt index 92671b8dc4..6ea88bea7e 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.windows-x86_64.txt @@ -21,6 +21,7 @@ target_link_libraries(ydb-core-tx-schemeshard-ut_cdc_stream_reboots PUBLIC cpp-testing-unittest_main core-tx-schemeshard tx-schemeshard-ut_helpers + core-persqueue-writer yql-sql-pg_dummy ) target_sources(ydb-core-tx-schemeshard-ut_cdc_stream_reboots PRIVATE diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp index 8f6dcd2266..68fb9aa88a 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp @@ -1,4 +1,5 @@ #include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> +#include <ydb/core/persqueue/writer/source_id_encoding.h> #include <contrib/libs/protobuf/src/google/protobuf/text_format.h> @@ -512,4 +513,145 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { }); } + struct TItem { + TString Path; + ui32 nPartitions; + }; + + void CheckRegistrations(TTestActorRuntime& runtime, const TItem& table, const TItem& topic) { + auto tableDesc = DescribePath(runtime, table.Path, true, true); + const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions(); + UNIT_ASSERT_VALUES_EQUAL(tablePartitions.size(), table.nPartitions); + + auto topicDesc = DescribePrivatePath(runtime, topic.Path); + const auto& topicPartitions = topicDesc.GetPathDescription().GetPersQueueGroup().GetPartitions(); + UNIT_ASSERT_VALUES_EQUAL(topicPartitions.size(), topic.nPartitions); + + while (true) { + runtime.SimulateSleep(TDuration::Seconds(1)); + bool done = true; + + for (ui32 i = 0; i < topic.nPartitions; ++i) { + auto request = MakeHolder<TEvPersQueue::TEvRequest>(); + { + auto& record = *request->Record.MutablePartitionRequest(); + record.SetPartition(topicPartitions[i].GetPartitionId()); + auto& cmd = *record.MutableCmdGetMaxSeqNo(); + for (const auto& tablePartition : tablePartitions) { + cmd.AddSourceId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(tablePartition.GetDatashardId()))); + } + } + + const auto& sender = runtime.AllocateEdgeActor(); + ForwardToTablet(runtime, topicPartitions[i].GetTabletId(), sender, request.Release()); + + auto response = runtime.GrabEdgeEvent<TEvPersQueue::TEvResponse>(sender); + { + const auto& record = response->Get()->Record.GetPartitionResponse(); + const auto& result = record.GetCmdGetMaxSeqNoResult().GetSourceIdInfo(); + + UNIT_ASSERT_VALUES_EQUAL(result.size(), table.nPartitions); + for (const auto& item: result) { + done &= item.GetState() == NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED; + if (!done) { + break; + } + } + } + + if (!done) { + break; + } + } + + if (done) { + break; + } + } + } + + Y_UNIT_TEST(SplitTable) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + } + + TestSplitTable(runtime, ++t.TxId, "/MyRoot/Table", Sprintf(R"( + SourceTabletId: %lu + SplitBoundary { + KeyPrefix { + Tuple { Optional { Uint64: 2 } } + } + } + )", TTestTxConfig::FakeHiveTablets)); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1}); + } + }); + } + + Y_UNIT_TEST(MergeTable) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + UniformPartitionsCount: 2 + PartitionConfig { + PartitioningPolicy { + MinPartitionsCount: 1 + } + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + } + + TestSplitTable(runtime, ++t.TxId, "/MyRoot/Table", Sprintf(R"( + SourceTabletId: %lu + SourceTabletId: %lu + )", TTestTxConfig::FakeHiveTablets + 0, TTestTxConfig::FakeHiveTablets + 1)); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2}); + } + }); + } + } // TCdcStreamWithRebootsTests diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ya.make b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ya.make index 40d1d54204..c9c5b955f5 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ya.make +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ya.make @@ -15,6 +15,7 @@ ENDIF() PEERDIR( ydb/core/tx/schemeshard/ut_helpers + ydb/core/persqueue/writer ydb/library/yql/sql/pg_dummy ) |