aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-10-19 21:46:49 +0300
committerilnaz <ilnaz@ydb.tech>2023-10-19 22:09:04 +0300
commitf5cc0cec5a39c851a989c36271226edae8a76ac4 (patch)
tree0697a3d510c5819bcf1cd3e28453eb19778061c8
parent551e835dcfdbb3255de090d52f01532d0949919b (diff)
downloadydb-f5cc0cec5a39c851a989c36271226edae8a76ac4.tar.gz
Create partition actors lazily KIKIMR-19779
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.cpp41
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.h3
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp48
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp45
4 files changed, 118 insertions, 19 deletions
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp
index 5d197b32b7f..d89dcf4fd34 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.cpp
+++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp
@@ -9,10 +9,9 @@
namespace NKikimr::NDataShard {
-void TBaseChangeSender::RegisterSender(THashMap<ui64, TSender>& senders, ui64 partitionId) {
- Y_ABORT_UNLESS(!senders.contains(partitionId));
- auto& sender = senders[partitionId];
- sender.ActorId = ActorOps->Register(CreateSender(partitionId));
+void TBaseChangeSender::LazyCreateSender(THashMap<ui64, TSender>& senders, ui64 partitionId) {
+ auto res = senders.emplace(partitionId, TSender{});
+ Y_ABORT_UNLESS(res.second);
for (const auto& [order, broadcast] : Broadcasting) {
if (AddBroadcastPartition(order, partitionId)) {
@@ -22,6 +21,14 @@ void TBaseChangeSender::RegisterSender(THashMap<ui64, TSender>& senders, ui64 pa
}
}
+void TBaseChangeSender::RegisterSender(ui64 partitionId) {
+ Y_ABORT_UNLESS(Senders.contains(partitionId));
+ auto& sender = Senders.at(partitionId);
+
+ Y_ABORT_UNLESS(!sender.ActorId);
+ sender.ActorId = ActorOps->RegisterWithSameMailbox(CreateSender(partitionId));
+}
+
void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds) {
THashMap<ui64, TSender> senders;
@@ -31,7 +38,7 @@ void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds)
senders.emplace(partitionId, std::move(it->second));
Senders.erase(it);
} else {
- RegisterSender(senders, partitionId);
+ LazyCreateSender(senders, partitionId);
}
}
@@ -39,7 +46,9 @@ void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds)
ReEnqueueRecords(sender);
ProcessBroadcasting(&TBaseChangeSender::RemoveBroadcastPartition,
partitionId, sender.Broadcasting);
- ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill());
+ if (sender.ActorId) {
+ ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill());
+ }
}
Senders = std::move(senders);
@@ -47,7 +56,7 @@ void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds)
void TBaseChangeSender::RecreateSenders(const TVector<ui64>& partitionIds) {
for (const auto& partitionId : partitionIds) {
- RegisterSender(Senders, partitionId);
+ LazyCreateSender(Senders, partitionId);
}
}
@@ -67,7 +76,9 @@ void TBaseChangeSender::CreateSenders(const TVector<ui64>& partitionIds, bool pa
void TBaseChangeSender::KillSenders() {
for (const auto& [_, sender] : std::exchange(Senders, {})) {
- ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill());
+ if (sender.ActorId) {
+ ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill());
+ }
}
}
@@ -145,6 +156,7 @@ void TBaseChangeSender::SendRecords() {
auto it = PendingSent.begin();
THashSet<ui64> sendTo;
+ THashSet<ui64> registrations;
bool needToResolve = false;
while (it != PendingSent.end()) {
@@ -158,6 +170,10 @@ void TBaseChangeSender::SendRecords() {
auto& sender = Senders.at(partitionId);
sender.Prepared.push_back(std::move(it->second));
+ if (!sender.ActorId) {
+ Y_ABORT_UNLESS(!sender.Ready);
+ registrations.insert(partitionId);
+ }
if (sender.Ready) {
sendTo.insert(partitionId);
}
@@ -167,6 +183,10 @@ void TBaseChangeSender::SendRecords() {
if (Senders.contains(partitionId)) {
auto& sender = Senders.at(partitionId);
sender.Prepared.push_back(std::move(it->second));
+ if (!sender.ActorId) {
+ Y_ABORT_UNLESS(!sender.Ready);
+ registrations.insert(partitionId);
+ }
if (sender.Ready) {
sendTo.insert(partitionId);
}
@@ -181,6 +201,10 @@ void TBaseChangeSender::SendRecords() {
it = PendingSent.erase(it);
}
+ for (const auto partitionId : registrations) {
+ RegisterSender(partitionId);
+ }
+
for (const auto partitionId : sendTo) {
SendPreparedRecords(partitionId);
}
@@ -265,6 +289,7 @@ void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) {
}
}
+ Y_ABORT_UNLESS(sender.ActorId);
ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {})));
}
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h
index 09856d460bf..f269cc17bd4 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.h
+++ b/ydb/core/tx/datashard/change_sender_common_ops.h
@@ -98,7 +98,8 @@ class TBaseChangeSender: public IChangeSender {
THashSet<ui64> CompletedPartitions;
};
- void RegisterSender(THashMap<ui64, TSender>& senders, ui64 partitionId);
+ void LazyCreateSender(THashMap<ui64, TSender>& senders, ui64 partitionId);
+ void RegisterSender(ui64 partitionId);
void CreateMissingSenders(const TVector<ui64>& partitionIds);
void RecreateSenders(const TVector<ui64>& partitionIds);
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index fa8a1caad53..7d7a47f509a 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -112,7 +112,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
}
void SenderShouldShakeHands(const TString& path, ui32 times, const TShardedTableOptions& opts,
- TMaybe<TShardedTableOptions::TIndex> addIndex = Nothing())
+ TMaybe<TShardedTableOptions::TIndex> addIndex, const TString& query)
{
const auto pathParts = SplitPath(path);
UNIT_ASSERT(pathParts.size() > 1);
@@ -147,9 +147,12 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
CreateShardedTable(server, sender, workingDir, tableName, opts);
if (addIndex) {
- AsyncAlterAddIndex(server, domainName, path, *addIndex);
+ WaitTxNotification(server, sender, AsyncAlterAddIndex(server, domainName, path, *addIndex));
}
+ // trigger initialization
+ ExecSQL(server, sender, query);
+
if (counter != times) {
TDispatchOptions opts;
opts.FinalEvents.emplace_back([&](IEventHandle&) {
@@ -160,7 +163,8 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
}
Y_UNIT_TEST(SenderShouldShakeHandsOnce) {
- SenderShouldShakeHands("/Root/Table", 1, TableWithIndex(SimpleAsyncIndex()));
+ SenderShouldShakeHands("/Root/Table", 1, TableWithIndex(SimpleAsyncIndex()), {},
+ "UPSERT INTO `/Root/Table` (pkey, ikey) VALUES (1, 10);");
}
Y_UNIT_TEST(SenderShouldShakeHandsTwice) {
@@ -173,12 +177,14 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
.Indexes({
{"by_i1key", {"i1key"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync},
{"by_i2key", {"i2key"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync},
- })
+ }), {},
+ "UPSERT INTO `/Root/Table` (pkey, i1key, i2key) VALUES (1, 10, 100);"
);
}
Y_UNIT_TEST(SenderShouldShakeHandsAfterAddingIndex) {
- SenderShouldShakeHands("/Root/Table", 1, TableWoIndexes(), SimpleAsyncIndex());
+ SenderShouldShakeHands("/Root/Table", 1, TableWoIndexes(), SimpleAsyncIndex(),
+ "UPSERT INTO `/Root/Table` (pkey, ikey) VALUES (1, 10);");
}
void ShouldDeliverChanges(const TString& path, const TShardedTableOptions& opts,
@@ -3179,6 +3185,38 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ Y_UNIT_TEST(SequentialSplitMerge) {
+ TTestPqEnv env(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), false);
+ SetSplitMergePartCountLimit(env.GetServer()->GetRuntime(), -1);
+
+ // split
+ auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
+ UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);
+
+ WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
+ AsyncSplitTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds.at(0), 4));
+
+ // merge
+ tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
+ UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 2);
+
+ WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
+ AsyncMergeTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds));
+
+ ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )");
+
+ WaitForContent(env.GetServer(), env.GetEdgeActor(), "/Root/Table/Stream", {
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"update":{"value":20},"key":[2]})",
+ R"({"update":{"value":30},"key":[3]})",
+ });
+ }
+
} // Cdc
} // NKikimr
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
index baa0cae9d03..2dbdca17ffb 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
@@ -570,6 +570,38 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
}
}
+ void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx,
+ const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<ui32>& recordIds)
+ {
+ auto tableDesc = DescribePath(runtime, tablePath, true, true);
+ const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
+ UNIT_ASSERT(partitionIdx < tablePartitions.size());
+
+ auto ev = MakeHolder<TEvDataShard::TEvUploadRowsRequest>();
+ ev->Record.SetTableId(tableDesc.GetPathId());
+
+ auto& scheme = *ev->Record.MutableRowScheme();
+ for (ui32 tag : keyTags) {
+ scheme.AddKeyColumnIds(tag);
+ }
+ for (ui32 tag : valueTags) {
+ scheme.AddValueColumnIds(tag);
+ }
+
+ for (ui32 i : recordIds) {
+ auto key = TVector<TCell>{TCell::Make(i)};
+ auto value = TVector<TCell>{TCell::Make(i)};
+
+ auto& row = *ev->Record.AddRows();
+ row.SetKeyColumns(TSerializedCellVec::Serialize(key));
+ row.SetValueColumns(TSerializedCellVec::Serialize(value));
+ }
+
+ const auto& sender = runtime.AllocateEdgeActor();
+ ForwardToTablet(runtime, tablePartitions[partitionIdx].GetDatashardId(), sender, ev.Release());
+ runtime.GrabEdgeEvent<TEvDataShard::TEvUploadRowsResponse>(sender);
+ }
+
Y_UNIT_TEST(SplitTable) {
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
@@ -577,8 +609,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
TInactiveZone inactive(activeZone);
TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
Name: "Table"
- Columns { Name: "key" Type: "Uint64" }
- Columns { Name: "value" Type: "Uint64" }
+ Columns { Name: "key" Type: "Uint32" }
+ Columns { Name: "value" Type: "Uint32" }
KeyColumnNames: ["key"]
)");
t.TestEnv->TestWaitNotification(runtime, t.TxId);
@@ -598,7 +630,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
SourceTabletId: %lu
SplitBoundary {
KeyPrefix {
- Tuple { Optional { Uint64: 2 } }
+ Tuple { Optional { Uint32: 2 } }
}
}
)", TTestTxConfig::FakeHiveTablets));
@@ -606,6 +638,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
{
TInactiveZone inactive(activeZone);
+ UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1});
+ UploadRows(runtime, "/MyRoot/Table", 1, {1}, {2}, {Max<ui32>()});
CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1});
}
});
@@ -618,8 +652,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
TInactiveZone inactive(activeZone);
TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
Name: "Table"
- Columns { Name: "key" Type: "Uint64" }
- Columns { Name: "value" Type: "Uint64" }
+ Columns { Name: "key" Type: "Uint32" }
+ Columns { Name: "value" Type: "Uint32" }
KeyColumnNames: ["key"]
UniformPartitionsCount: 2
PartitionConfig {
@@ -649,6 +683,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
{
TInactiveZone inactive(activeZone);
+ UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1, Max<ui32>()});
CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2});
}
});