diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-10-19 21:46:49 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-10-19 22:09:04 +0300 |
commit | f5cc0cec5a39c851a989c36271226edae8a76ac4 (patch) | |
tree | 0697a3d510c5819bcf1cd3e28453eb19778061c8 | |
parent | 551e835dcfdbb3255de090d52f01532d0949919b (diff) | |
download | ydb-f5cc0cec5a39c851a989c36271226edae8a76ac4.tar.gz |
Create partition actors lazily KIKIMR-19779
4 files changed, 118 insertions, 19 deletions
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp index 5d197b32b7f..d89dcf4fd34 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp @@ -9,10 +9,9 @@ namespace NKikimr::NDataShard { -void TBaseChangeSender::RegisterSender(THashMap<ui64, TSender>& senders, ui64 partitionId) { - Y_ABORT_UNLESS(!senders.contains(partitionId)); - auto& sender = senders[partitionId]; - sender.ActorId = ActorOps->Register(CreateSender(partitionId)); +void TBaseChangeSender::LazyCreateSender(THashMap<ui64, TSender>& senders, ui64 partitionId) { + auto res = senders.emplace(partitionId, TSender{}); + Y_ABORT_UNLESS(res.second); for (const auto& [order, broadcast] : Broadcasting) { if (AddBroadcastPartition(order, partitionId)) { @@ -22,6 +21,14 @@ void TBaseChangeSender::RegisterSender(THashMap<ui64, TSender>& senders, ui64 pa } } +void TBaseChangeSender::RegisterSender(ui64 partitionId) { + Y_ABORT_UNLESS(Senders.contains(partitionId)); + auto& sender = Senders.at(partitionId); + + Y_ABORT_UNLESS(!sender.ActorId); + sender.ActorId = ActorOps->RegisterWithSameMailbox(CreateSender(partitionId)); +} + void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds) { THashMap<ui64, TSender> senders; @@ -31,7 +38,7 @@ void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds) senders.emplace(partitionId, std::move(it->second)); Senders.erase(it); } else { - RegisterSender(senders, partitionId); + LazyCreateSender(senders, partitionId); } } @@ -39,7 +46,9 @@ void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds) ReEnqueueRecords(sender); ProcessBroadcasting(&TBaseChangeSender::RemoveBroadcastPartition, partitionId, sender.Broadcasting); - ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill()); + if (sender.ActorId) { + ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill()); + } } Senders = std::move(senders); @@ -47,7 +56,7 @@ void TBaseChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds) void TBaseChangeSender::RecreateSenders(const TVector<ui64>& partitionIds) { for (const auto& partitionId : partitionIds) { - RegisterSender(Senders, partitionId); + LazyCreateSender(Senders, partitionId); } } @@ -67,7 +76,9 @@ void TBaseChangeSender::CreateSenders(const TVector<ui64>& partitionIds, bool pa void TBaseChangeSender::KillSenders() { for (const auto& [_, sender] : std::exchange(Senders, {})) { - ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill()); + if (sender.ActorId) { + ActorOps->Send(sender.ActorId, new TEvents::TEvPoisonPill()); + } } } @@ -145,6 +156,7 @@ void TBaseChangeSender::SendRecords() { auto it = PendingSent.begin(); THashSet<ui64> sendTo; + THashSet<ui64> registrations; bool needToResolve = false; while (it != PendingSent.end()) { @@ -158,6 +170,10 @@ void TBaseChangeSender::SendRecords() { auto& sender = Senders.at(partitionId); sender.Prepared.push_back(std::move(it->second)); + if (!sender.ActorId) { + Y_ABORT_UNLESS(!sender.Ready); + registrations.insert(partitionId); + } if (sender.Ready) { sendTo.insert(partitionId); } @@ -167,6 +183,10 @@ void TBaseChangeSender::SendRecords() { if (Senders.contains(partitionId)) { auto& sender = Senders.at(partitionId); sender.Prepared.push_back(std::move(it->second)); + if (!sender.ActorId) { + Y_ABORT_UNLESS(!sender.Ready); + registrations.insert(partitionId); + } if (sender.Ready) { sendTo.insert(partitionId); } @@ -181,6 +201,10 @@ void TBaseChangeSender::SendRecords() { it = PendingSent.erase(it); } + for (const auto partitionId : registrations) { + RegisterSender(partitionId); + } + for (const auto partitionId : sendTo) { SendPreparedRecords(partitionId); } @@ -265,6 +289,7 @@ void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) { } } + Y_ABORT_UNLESS(sender.ActorId); ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {}))); } diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h index 09856d460bf..f269cc17bd4 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.h +++ b/ydb/core/tx/datashard/change_sender_common_ops.h @@ -98,7 +98,8 @@ class TBaseChangeSender: public IChangeSender { THashSet<ui64> CompletedPartitions; }; - void RegisterSender(THashMap<ui64, TSender>& senders, ui64 partitionId); + void LazyCreateSender(THashMap<ui64, TSender>& senders, ui64 partitionId); + void RegisterSender(ui64 partitionId); void CreateMissingSenders(const TVector<ui64>& partitionIds); void RecreateSenders(const TVector<ui64>& partitionIds); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index fa8a1caad53..7d7a47f509a 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -112,7 +112,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { } void SenderShouldShakeHands(const TString& path, ui32 times, const TShardedTableOptions& opts, - TMaybe<TShardedTableOptions::TIndex> addIndex = Nothing()) + TMaybe<TShardedTableOptions::TIndex> addIndex, const TString& query) { const auto pathParts = SplitPath(path); UNIT_ASSERT(pathParts.size() > 1); @@ -147,9 +147,12 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { CreateShardedTable(server, sender, workingDir, tableName, opts); if (addIndex) { - AsyncAlterAddIndex(server, domainName, path, *addIndex); + WaitTxNotification(server, sender, AsyncAlterAddIndex(server, domainName, path, *addIndex)); } + // trigger initialization + ExecSQL(server, sender, query); + if (counter != times) { TDispatchOptions opts; opts.FinalEvents.emplace_back([&](IEventHandle&) { @@ -160,7 +163,8 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { } Y_UNIT_TEST(SenderShouldShakeHandsOnce) { - SenderShouldShakeHands("/Root/Table", 1, TableWithIndex(SimpleAsyncIndex())); + SenderShouldShakeHands("/Root/Table", 1, TableWithIndex(SimpleAsyncIndex()), {}, + "UPSERT INTO `/Root/Table` (pkey, ikey) VALUES (1, 10);"); } Y_UNIT_TEST(SenderShouldShakeHandsTwice) { @@ -173,12 +177,14 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { .Indexes({ {"by_i1key", {"i1key"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync}, {"by_i2key", {"i2key"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync}, - }) + }), {}, + "UPSERT INTO `/Root/Table` (pkey, i1key, i2key) VALUES (1, 10, 100);" ); } Y_UNIT_TEST(SenderShouldShakeHandsAfterAddingIndex) { - SenderShouldShakeHands("/Root/Table", 1, TableWoIndexes(), SimpleAsyncIndex()); + SenderShouldShakeHands("/Root/Table", 1, TableWoIndexes(), SimpleAsyncIndex(), + "UPSERT INTO `/Root/Table` (pkey, ikey) VALUES (1, 10);"); } void ShouldDeliverChanges(const TString& path, const TShardedTableOptions& opts, @@ -3179,6 +3185,38 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(SequentialSplitMerge) { + TTestPqEnv env(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), false); + SetSplitMergePartCountLimit(env.GetServer()->GetRuntime(), -1); + + // split + auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1); + + WaitTxNotification(env.GetServer(), env.GetEdgeActor(), + AsyncSplitTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds.at(0), 4)); + + // merge + tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 2); + + WaitTxNotification(env.GetServer(), env.GetEdgeActor(), + AsyncMergeTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds)); + + ExecSQL(env.GetServer(), env.GetEdgeActor(), R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )"); + + WaitForContent(env.GetServer(), env.GetEdgeActor(), "/Root/Table/Stream", { + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + R"({"update":{"value":30},"key":[3]})", + }); + } + } // Cdc } // NKikimr diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp index baa0cae9d03..2dbdca17ffb 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp @@ -570,6 +570,38 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { } } + void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, + const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<ui32>& recordIds) + { + auto tableDesc = DescribePath(runtime, tablePath, true, true); + const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions(); + UNIT_ASSERT(partitionIdx < tablePartitions.size()); + + auto ev = MakeHolder<TEvDataShard::TEvUploadRowsRequest>(); + ev->Record.SetTableId(tableDesc.GetPathId()); + + auto& scheme = *ev->Record.MutableRowScheme(); + for (ui32 tag : keyTags) { + scheme.AddKeyColumnIds(tag); + } + for (ui32 tag : valueTags) { + scheme.AddValueColumnIds(tag); + } + + for (ui32 i : recordIds) { + auto key = TVector<TCell>{TCell::Make(i)}; + auto value = TVector<TCell>{TCell::Make(i)}; + + auto& row = *ev->Record.AddRows(); + row.SetKeyColumns(TSerializedCellVec::Serialize(key)); + row.SetValueColumns(TSerializedCellVec::Serialize(value)); + } + + const auto& sender = runtime.AllocateEdgeActor(); + ForwardToTablet(runtime, tablePartitions[partitionIdx].GetDatashardId(), sender, ev.Release()); + runtime.GrabEdgeEvent<TEvDataShard::TEvUploadRowsResponse>(sender); + } + Y_UNIT_TEST(SplitTable) { TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { @@ -577,8 +609,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { TInactiveZone inactive(activeZone); TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "Table" - Columns { Name: "key" Type: "Uint64" } - Columns { Name: "value" Type: "Uint64" } + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Uint32" } KeyColumnNames: ["key"] )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); @@ -598,7 +630,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { SourceTabletId: %lu SplitBoundary { KeyPrefix { - Tuple { Optional { Uint64: 2 } } + Tuple { Optional { Uint32: 2 } } } } )", TTestTxConfig::FakeHiveTablets)); @@ -606,6 +638,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { { TInactiveZone inactive(activeZone); + UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1}); + UploadRows(runtime, "/MyRoot/Table", 1, {1}, {2}, {Max<ui32>()}); CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1}); } }); @@ -618,8 +652,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { TInactiveZone inactive(activeZone); TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "Table" - Columns { Name: "key" Type: "Uint64" } - Columns { Name: "value" Type: "Uint64" } + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Uint32" } KeyColumnNames: ["key"] UniformPartitionsCount: 2 PartitionConfig { @@ -649,6 +683,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { { TInactiveZone inactive(activeZone); + UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1, Max<ui32>()}); CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2}); } }); |