aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-09-14 12:36:09 +0300
committerilnaz <ilnaz@ydb.tech>2023-09-14 12:54:39 +0300
commit1d2693bc60fd2d95c0d1f9333515bbfbba09ca10 (patch)
treefe549c0926f737d23c50b3587330065f44d6fdb7
parent2af259c04c3a395f42998d78e1897129d66871fe (diff)
downloadydb-1d2693bc60fd2d95c0d1f9333515bbfbba09ca10.tar.gz
Split & merge tests with reboots KIKIMR-19307
-rw-r--r--ydb/core/tx/datashard/change_exchange_split.cpp30
-rw-r--r--ydb/core/tx/datashard/datashard__init.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_change_sending.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_split_src.cpp1
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp142
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ya.make1
10 files changed, 173 insertions, 7 deletions
diff --git a/ydb/core/tx/datashard/change_exchange_split.cpp b/ydb/core/tx/datashard/change_exchange_split.cpp
index 644640b45e..f889b400eb 100644
--- a/ydb/core/tx/datashard/change_exchange_split.cpp
+++ b/ydb/core/tx/datashard/change_exchange_split.cpp
@@ -19,17 +19,33 @@ namespace NKikimr {
namespace NDataShard {
class TCdcPartitionWorker: public TActorBootstrapped<TCdcPartitionWorker> {
+ TStringBuf GetLogPrefix() const {
+ if (!LogPrefix) {
+ LogPrefix = TStringBuilder()
+ << "[ChangeExchangeSplitCdcPartitionWorker]"
+ << "[" << SrcTabletId << "]"
+ << "[" << PartitionId << "]"
+ << SelfId() /* contains brackets */ << " ";
+ }
+
+ return LogPrefix.GetRef();
+ }
+
void Ack() {
+ LOG_I("Send ack");
Send(Parent, new TEvChangeExchange::TEvSplitAck());
PassAway();
}
void Leave() {
+ LOG_I("Leave");
Send(Parent, new TEvents::TEvGone());
PassAway();
}
void Handle(TEvPersQueue::TEvResponse::TPtr& ev) {
+ LOG_D("Handle " << ev->Get()->ToString());
+
const auto& response = ev->Get()->Record;
switch (response.GetStatus()) {
@@ -52,12 +68,14 @@ class TCdcPartitionWorker: public TActorBootstrapped<TCdcPartitionWorker> {
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
if (ev->Get()->TabletId == TabletId && ev->Get()->Status != NKikimrProto::OK) {
+ LOG_W("Pipe connection error");
Leave();
}
}
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
if (ev->Get()->TabletId == TabletId) {
+ LOG_W("Pipe disconnected");
Leave();
}
}
@@ -131,6 +149,7 @@ private:
const ui64 TabletId;
const ui64 SrcTabletId;
const TVector<ui64> DstTabletIds;
+ mutable TMaybe<TString> LogPrefix;
TActorId PipeClient;
@@ -375,12 +394,7 @@ class TCdcWorker: public TActorBootstrapped<TCdcWorker>, private TSchemeCacheHel
}
STATEFN(StateWork) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvChangeExchange::TEvSplitAck, Handle);
- hFunc(TEvents::TEvGone, Handle);
- default:
- return StateBase(ev);
- }
+ return StateBase(ev);
}
void Handle(TEvChangeExchange::TEvSplitAck::TPtr& ev) {
@@ -400,7 +414,7 @@ class TCdcWorker: public TActorBootstrapped<TCdcWorker>, private TSchemeCacheHel
Workers[it->second] = TActorId();
Pending.erase(it);
- if (Pending.empty()) {
+ if (!IsResolving() && Pending.empty()) {
Ack();
}
}
@@ -457,6 +471,8 @@ public:
STATEFN(StateBase) {
switch (ev->GetTypeRewrite()) {
+ hFunc(TEvChangeExchange::TEvSplitAck, Handle);
+ hFunc(TEvents::TEvGone, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}
diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp
index aa4ec73049..bb726a3b35 100644
--- a/ydb/core/tx/datashard/datashard__init.cpp
+++ b/ydb/core/tx/datashard/datashard__init.cpp
@@ -117,6 +117,7 @@ void TDataShard::TTxInit::Complete(const TActorContext &ctx) {
if (!Self->ChangesQueue) {
if (!Self->ChangeExchangeSplitter.Done()) {
+ Self->KillChangeSender(ctx);
Self->ChangeExchangeSplitter.DoSplit(ctx);
} else {
for (const auto dstTabletId : Self->ChangeSenderActivator.GetDstSet()) {
diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp
index 0b37d1a90d..139812dc5c 100644
--- a/ydb/core/tx/datashard/datashard_change_sending.cpp
+++ b/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -322,6 +322,7 @@ public:
}
if (ChangeExchangeSplit) {
+ Self->KillChangeSender(ctx);
Self->ChangeExchangeSplitter.DoSplit(ctx);
}
diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp
index 427fff74db..acb7d8ef5f 100644
--- a/ydb/core/tx/datashard/datashard_split_src.cpp
+++ b/ydb/core/tx/datashard/datashard_split_src.cpp
@@ -416,6 +416,7 @@ public:
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Sending snapshots from src for split OpId " << Self->SrcSplitOpId);
Self->SplitSrcSnapshotSender.DoSend(ctx);
if (ChangeExchangeSplit) {
+ Self->KillChangeSender(ctx);
Self->ChangeExchangeSplitter.DoSplit(ctx);
}
}
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.darwin-x86_64.txt
index 67e3b2aae7..cce08c1a57 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.darwin-x86_64.txt
@@ -21,6 +21,7 @@ target_link_libraries(ydb-core-tx-schemeshard-ut_cdc_stream_reboots PUBLIC
cpp-testing-unittest_main
core-tx-schemeshard
tx-schemeshard-ut_helpers
+ core-persqueue-writer
yql-sql-pg_dummy
)
target_link_options(ydb-core-tx-schemeshard-ut_cdc_stream_reboots PRIVATE
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-aarch64.txt
index cb547342de..200ad6c627 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-aarch64.txt
@@ -21,6 +21,7 @@ target_link_libraries(ydb-core-tx-schemeshard-ut_cdc_stream_reboots PUBLIC
cpp-testing-unittest_main
core-tx-schemeshard
tx-schemeshard-ut_helpers
+ core-persqueue-writer
yql-sql-pg_dummy
)
target_link_options(ydb-core-tx-schemeshard-ut_cdc_stream_reboots PRIVATE
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-x86_64.txt b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-x86_64.txt
index d407e66ce1..d7131eba60 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.linux-x86_64.txt
@@ -22,6 +22,7 @@ target_link_libraries(ydb-core-tx-schemeshard-ut_cdc_stream_reboots PUBLIC
cpp-testing-unittest_main
core-tx-schemeshard
tx-schemeshard-ut_helpers
+ core-persqueue-writer
yql-sql-pg_dummy
)
target_link_options(ydb-core-tx-schemeshard-ut_cdc_stream_reboots PRIVATE
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.windows-x86_64.txt b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.windows-x86_64.txt
index 92671b8dc4..6ea88bea7e 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/CMakeLists.windows-x86_64.txt
@@ -21,6 +21,7 @@ target_link_libraries(ydb-core-tx-schemeshard-ut_cdc_stream_reboots PUBLIC
cpp-testing-unittest_main
core-tx-schemeshard
tx-schemeshard-ut_helpers
+ core-persqueue-writer
yql-sql-pg_dummy
)
target_sources(ydb-core-tx-schemeshard-ut_cdc_stream_reboots PRIVATE
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
index 8f6dcd2266..68fb9aa88a 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
@@ -1,4 +1,5 @@
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
+#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <contrib/libs/protobuf/src/google/protobuf/text_format.h>
@@ -512,4 +513,145 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
});
}
+ struct TItem {
+ TString Path;
+ ui32 nPartitions;
+ };
+
+ void CheckRegistrations(TTestActorRuntime& runtime, const TItem& table, const TItem& topic) {
+ auto tableDesc = DescribePath(runtime, table.Path, true, true);
+ const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
+ UNIT_ASSERT_VALUES_EQUAL(tablePartitions.size(), table.nPartitions);
+
+ auto topicDesc = DescribePrivatePath(runtime, topic.Path);
+ const auto& topicPartitions = topicDesc.GetPathDescription().GetPersQueueGroup().GetPartitions();
+ UNIT_ASSERT_VALUES_EQUAL(topicPartitions.size(), topic.nPartitions);
+
+ while (true) {
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ bool done = true;
+
+ for (ui32 i = 0; i < topic.nPartitions; ++i) {
+ auto request = MakeHolder<TEvPersQueue::TEvRequest>();
+ {
+ auto& record = *request->Record.MutablePartitionRequest();
+ record.SetPartition(topicPartitions[i].GetPartitionId());
+ auto& cmd = *record.MutableCmdGetMaxSeqNo();
+ for (const auto& tablePartition : tablePartitions) {
+ cmd.AddSourceId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(tablePartition.GetDatashardId())));
+ }
+ }
+
+ const auto& sender = runtime.AllocateEdgeActor();
+ ForwardToTablet(runtime, topicPartitions[i].GetTabletId(), sender, request.Release());
+
+ auto response = runtime.GrabEdgeEvent<TEvPersQueue::TEvResponse>(sender);
+ {
+ const auto& record = response->Get()->Record.GetPartitionResponse();
+ const auto& result = record.GetCmdGetMaxSeqNoResult().GetSourceIdInfo();
+
+ UNIT_ASSERT_VALUES_EQUAL(result.size(), table.nPartitions);
+ for (const auto& item: result) {
+ done &= item.GetState() == NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED;
+ if (!done) {
+ break;
+ }
+ }
+ }
+
+ if (!done) {
+ break;
+ }
+ }
+
+ if (done) {
+ break;
+ }
+ }
+ }
+
+ Y_UNIT_TEST(SplitTable) {
+ TTestWithReboots t;
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ {
+ TInactiveZone inactive(activeZone);
+ TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+ }
+
+ TestSplitTable(runtime, ++t.TxId, "/MyRoot/Table", Sprintf(R"(
+ SourceTabletId: %lu
+ SplitBoundary {
+ KeyPrefix {
+ Tuple { Optional { Uint64: 2 } }
+ }
+ }
+ )", TTestTxConfig::FakeHiveTablets));
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ {
+ TInactiveZone inactive(activeZone);
+ CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1});
+ }
+ });
+ }
+
+ Y_UNIT_TEST(MergeTable) {
+ TTestWithReboots t;
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ {
+ TInactiveZone inactive(activeZone);
+ TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ UniformPartitionsCount: 2
+ PartitionConfig {
+ PartitioningPolicy {
+ MinPartitionsCount: 1
+ }
+ }
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+ }
+
+ TestSplitTable(runtime, ++t.TxId, "/MyRoot/Table", Sprintf(R"(
+ SourceTabletId: %lu
+ SourceTabletId: %lu
+ )", TTestTxConfig::FakeHiveTablets + 0, TTestTxConfig::FakeHiveTablets + 1));
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ {
+ TInactiveZone inactive(activeZone);
+ CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2});
+ }
+ });
+ }
+
} // TCdcStreamWithRebootsTests
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ya.make b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ya.make
index 40d1d54204..c9c5b955f5 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ya.make
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ya.make
@@ -15,6 +15,7 @@ ENDIF()
PEERDIR(
ydb/core/tx/schemeshard/ut_helpers
+ ydb/core/persqueue/writer
ydb/library/yql/sql/pg_dummy
)