diff options
author | Nikolay Shestakov <tesseract@ydb.tech> | 2024-10-18 22:52:25 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-18 22:52:25 +0500 |
commit | 7b9578cf0d27b671e203cf4f36551b593927c25c (patch) | |
tree | f9de10414eb4a0d18ae46f1f17c57a35caa7c124 | |
parent | 2a74bac2d2d3bccb4e10120f1ead805640ec9dd0 (diff) | |
download | ydb-7b9578cf0d27b671e203cf4f36551b593927c25c.tar.gz |
More test for autopartitionin of topics (#10613)
-rw-r--r-- | ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp | 89 |
1 files changed, 58 insertions, 31 deletions
diff --git a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp index 20f8b71ba5..6c68876bf6 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp @@ -929,9 +929,63 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); } + ui64 GetBalancerTabletId(TTopicSdkTestSetup& setup, const TString& topicPath) { + auto pathDescr = setup.GetServer().AnnoyingClient->Ls(topicPath)->Record.GetPathDescription().GetSelf(); + auto balancerTabletId = pathDescr.GetBalancerTabletID(); + Cerr << ">>>>> BalancerTabletID=" << balancerTabletId << Endl << Flush; + UNIT_ASSERT(balancerTabletId); + return balancerTabletId; + } + + void SplitPartition(TTopicSdkTestSetup& setup, const TString& topicPath, ui32 partitionId) { + auto balancerTabletId = GetBalancerTabletId(setup, topicPath); + auto edge = setup.GetRuntime().AllocateEdgeActor(); + setup.GetRuntime().SendToPipe(balancerTabletId, edge, new TEvPQ::TEvPartitionScaleStatusChanged(partitionId, NKikimrPQ::EScaleStatus::NEED_SPLIT)); + } + + void AssertPartitionCount(TTopicSdkTestSetup& setup, const TString& topicPath, size_t expectedCount) { + auto client = setup.MakeClient(); + auto describe = client.DescribeTopic(topicPath).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitions().size(), expectedCount); + } + + void WaitAndAssertPartitionCount(TTopicSdkTestSetup& setup, const TString& topicPath, size_t expectedCount) { + auto client = setup.MakeClient(); + size_t partitionCount = 0; + for (size_t i = 0; i < 10; ++i) { + Sleep(TDuration::Seconds(1)); + auto describe = client.DescribeTopic(topicPath).GetValueSync(); + partitionCount = describe.GetTopicDescription().GetPartitions().size(); + if (partitionCount == expectedCount) { + break; + } + } + UNIT_ASSERT_VALUES_EQUAL(partitionCount, expectedCount); + } + + Y_UNIT_TEST(WithDir_PartitionSplit_AutosplitByLoad) { + TTopicSdkTestSetup setup = CreateSetup(); + auto tableClient = setup.MakeTableClient(); + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + + setup.GetServer().AnnoyingClient->MkDir("/Root", "dir"); + + ExecuteQuery(session, R"( + --!syntax_v1 + CREATE TOPIC `/Root/dir/origin` + WITH ( + AUTO_PARTITIONING_STRATEGY = 'SCALE_UP', + MAX_ACTIVE_PARTITIONS = 50 + ); + )"); + + AssertPartitionCount(setup, "/Root/dir/origin", 1); + SplitPartition(setup, "/Root/dir/origin", 0); + WaitAndAssertPartitionCount(setup, "/Root/dir/origin", 3); + } + Y_UNIT_TEST(CDC_PartitionSplit_AutosplitByLoad) { TTopicSdkTestSetup setup = CreateSetup(); - auto client = setup.MakeClient(); auto tableClient = setup.MakeTableClient(); auto session = tableClient.CreateSession().GetValueSync().GetSession(); @@ -954,36 +1008,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { ); )"); - { - auto describe = client.DescribeTopic("/Root/origin/feed").GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1); - } - - ui64 balancerTabletId; - { - auto pathDescr = setup.GetServer().AnnoyingClient->Ls("/Root/origin/feed/streamImpl")->Record.GetPathDescription().GetSelf(); - balancerTabletId = pathDescr.GetBalancerTabletID(); - Cerr << ">>>>> BalancerTabletID=" << balancerTabletId << Endl << Flush; - UNIT_ASSERT(balancerTabletId); - } - - { - const auto edge = setup.GetRuntime().AllocateEdgeActor(); - setup.GetRuntime().SendToPipe(balancerTabletId, edge, new TEvPQ::TEvPartitionScaleStatusChanged(0, NKikimrPQ::EScaleStatus::NEED_SPLIT)); - } - - { - size_t partitionCount = 0; - for (size_t i = 0; i < 10; ++i) { - Sleep(TDuration::Seconds(1)); - auto describe = client.DescribeTopic("/Root/origin/feed").GetValueSync(); - partitionCount = describe.GetTopicDescription().GetPartitions().size(); - if (partitionCount == 3) { - break; - } - } - UNIT_ASSERT_VALUES_EQUAL(partitionCount, 3); - } + AssertPartitionCount(setup, "/Root/origin/feed", 1); + SplitPartition(setup, "/Root/origin/feed/streamImpl", 0); + WaitAndAssertPartitionCount(setup, "/Root/origin/feed", 3); } Y_UNIT_TEST(MidOfRange) { |